package wal import ( "io" "os" "path/filepath" "strconv" "sync" "time" "git.crumpington.com/public/jldb/lib/atomicheader" "git.crumpington.com/public/jldb/lib/errs" ) type Config struct { SegMinCount int64 SegMaxAgeSec int64 } type WAL struct { rootDir string conf Config lock sync.Mutex // Protects the fields below. closed bool header walHeader headerWriter *atomicheader.Handler f *os.File // WAL header. segments map[int64]*segment // Used by the iterator. seg *segment // Current segment. } func Create(rootDir string, firstSeqNum int64, conf Config) (*WAL, error) { w := &WAL{rootDir: rootDir, conf: conf} seg, err := createSegment(w.segmentPath(1), 1, firstSeqNum, 0) if err != nil { return nil, err } defer seg.Close() f, err := os.Create(w.headerPath()) if err != nil { return nil, errs.IO.WithErr(err) } defer f.Close() if err := atomicheader.Init(f); err != nil { return nil, err } handler, err := atomicheader.Open(f) if err != nil { return nil, err } header := walHeader{ FirstSegmentID: 1, LastSegmentID: 1, } err = handler.Write(func(page []byte) error { header.WriteTo(page) return nil }) if err != nil { return nil, err } return Open(rootDir, conf) } func Open(rootDir string, conf Config) (*WAL, error) { w := &WAL{rootDir: rootDir, conf: conf} f, err := os.OpenFile(w.headerPath(), os.O_RDWR, 0600) if err != nil { return nil, errs.IO.WithErr(err) } handler, err := atomicheader.Open(f) if err != nil { f.Close() return nil, err } var header walHeader err = handler.Read(func(page []byte) error { header.ReadFrom(page) return nil }) if err != nil { f.Close() return nil, err } w.header = header w.headerWriter = handler w.f = f w.segments = map[int64]*segment{} for segID := header.FirstSegmentID; segID < header.LastSegmentID+1; segID++ { segID := segID seg, err := openSegment(w.segmentPath(segID), segID) if err != nil { w.Close() return nil, err } w.segments[segID] = seg } w.seg = w.segments[header.LastSegmentID] if err := w.grow(); err != nil { w.Close() return nil, err } return w, nil } func (w *WAL) Close() error { w.lock.Lock() defer w.lock.Unlock() if w.closed { return nil } w.closed = true for _, seg := range w.segments { seg.Close() delete(w.segments, seg.ID) } w.f.Close() return nil } func (w *WAL) Info() (info Info) { w.lock.Lock() defer w.lock.Unlock() h := w.header info.FirstSeqNum = w.segments[h.FirstSegmentID].Header().FirstSeqNum lastHeader := w.segments[h.LastSegmentID].Header() info.LastSeqNum = lastHeader.LastSeqNum info.LastTimestampMS = lastHeader.LastTimestampMS return } func (w *WAL) Append(dataSize int64, r io.Reader) (int64, int64, error) { return w.appendRecord(Record{ SeqNum: -1, TimestampMS: time.Now().UnixMilli(), DataSize: dataSize, Reader: r, }) } func (w *WAL) appendRecord(rec Record) (int64, int64, error) { w.lock.Lock() defer w.lock.Unlock() if w.closed { return 0, 0, errs.Closed } if err := w.grow(); err != nil { return 0, 0, err } return w.seg.appendRecord(rec) } func (w *WAL) Iterator(fromSeqNum int64) (Iterator, error) { w.lock.Lock() defer w.lock.Unlock() if w.closed { return nil, errs.Closed } header := w.header var seg *segment getSeg := func(id int64) (*segment, error) { w.lock.Lock() defer w.lock.Unlock() if w.closed { return nil, errs.Closed } return w.segments[id], nil } if fromSeqNum == -1 { seg = w.segments[header.FirstSegmentID] return newWALIterator(getSeg, seg, fromSeqNum) } // Seek to the appropriate segment. seg = w.segments[header.FirstSegmentID] for seg != nil { h := seg.Header() if fromSeqNum >= h.FirstSeqNum && fromSeqNum <= h.LastSeqNum+1 { return newWALIterator(getSeg, seg, fromSeqNum) } seg = w.segments[seg.ID+1] } return nil, errs.NotFound } func (w *WAL) DeleteBefore(timestamp, keepSeqNum int64) error { for { seg, err := w.removeSeg(timestamp, keepSeqNum) if err != nil || seg == nil { return err } id := seg.ID os.RemoveAll(w.segmentPath(id)) seg.Close() } } func (w *WAL) removeSeg(timestamp, keepSeqNum int64) (*segment, error) { w.lock.Lock() defer w.lock.Unlock() header := w.header if header.FirstSegmentID == header.LastSegmentID { return nil, nil // Nothing to delete now. } id := header.FirstSegmentID seg := w.segments[id] if seg == nil { return nil, errs.Unexpected.WithMsg("segment %d not found", id) } segHeader := seg.Header() if seg == w.seg || segHeader.ArchivedAt > timestamp { return nil, nil // Nothing to delete now. } if segHeader.LastSeqNum >= keepSeqNum { return nil, nil } header.FirstSegmentID = id + 1 err := w.headerWriter.Write(func(page []byte) error { header.WriteTo(page) return nil }) if err != nil { return nil, err } w.header = header delete(w.segments, id) return seg, nil } func (w *WAL) grow() error { segHeader := w.seg.Header() if segHeader.ArchivedAt == 0 { if (segHeader.LastSeqNum - segHeader.FirstSeqNum) < w.conf.SegMinCount { return nil } if time.Now().Unix()-segHeader.CreatedAt < w.conf.SegMaxAgeSec { return nil } } newSegID := w.seg.ID + 1 firstSeqNum := segHeader.LastSeqNum + 1 timestampMS := segHeader.LastTimestampMS newSeg, err := createSegment(w.segmentPath(newSegID), newSegID, firstSeqNum, timestampMS) if err != nil { return err } walHeader := w.header walHeader.LastSegmentID = newSegID err = w.headerWriter.Write(func(page []byte) error { walHeader.WriteTo(page) return nil }) if err != nil { newSeg.Close() return err } if err := w.seg.Archive(); err != nil { newSeg.Close() return err } w.seg = newSeg w.segments[newSeg.ID] = newSeg w.header = walHeader return nil } func (w *WAL) headerPath() string { return filepath.Join(w.rootDir, "header") } func (w *WAL) segmentPath(segID int64) string { return filepath.Join(w.rootDir, "seg."+strconv.FormatInt(segID, 10)) }