package wal import ( "bufio" "io" "os" "sync" "time" "git.crumpington.com/public/jldb/lib/atomicheader" "git.crumpington.com/public/jldb/lib/errs" ) type segment struct { ID int64 lock sync.Mutex closed bool header segmentHeader headWriter *atomicheader.Handler f *os.File notifyMux *notifyMux // For non-archived segments. w *bufio.Writer } func createSegment(path string, id, firstSeqNum, timestampMS int64) (*segment, error) { f, err := os.Create(path) if err != nil { return nil, errs.IO.WithErr(err) } defer f.Close() if err := atomicheader.Init(f); err != nil { return nil, err } handler, err := atomicheader.Open(f) if err != nil { return nil, err } header := segmentHeader{ CreatedAt: time.Now().Unix(), FirstSeqNum: firstSeqNum, LastSeqNum: firstSeqNum - 1, LastTimestampMS: timestampMS, InsertAt: atomicheader.ReservedBytes, } err = handler.Write(func(page []byte) error { header.WriteTo(page) return nil }) if err != nil { return nil, err } return openSegment(path, id) } func openSegment(path string, id int64) (*segment, error) { f, err := os.OpenFile(path, os.O_RDWR, 0600) if err != nil { return nil, errs.IO.WithErr(err) } handler, err := atomicheader.Open(f) if err != nil { f.Close() return nil, err } var header segmentHeader err = handler.Read(func(page []byte) error { header.ReadFrom(page) return nil }) if err != nil { f.Close() return nil, err } if _, err := f.Seek(header.InsertAt, io.SeekStart); err != nil { f.Close() return nil, errs.IO.WithErr(err) } seg := &segment{ ID: id, header: header, headWriter: handler, f: f, notifyMux: newNotifyMux(), } if header.ArchivedAt == 0 { seg.w = bufio.NewWriterSize(f, 1024*1024) } return seg, nil } // Append appends the data from r to the log atomically. If an error is // returned, the caller should check for errs.Fatal. If a fatal error occurs, // the segment should no longer be used. func (seg *segment) Append(dataSize int64, r io.Reader) (int64, int64, error) { return seg.appendRecord(Record{ SeqNum: -1, TimestampMS: time.Now().UnixMilli(), DataSize: dataSize, Reader: r, }) } func (seg *segment) Header() segmentHeader { seg.lock.Lock() defer seg.lock.Unlock() return seg.header } // appendRecord appends a record in an atomic fashion. Do not use the segment // after a fatal error. func (seg *segment) appendRecord(rec Record) (int64, int64, error) { seg.lock.Lock() defer seg.lock.Unlock() header := seg.header // Copy. if seg.closed { return 0, 0, errs.Closed } if header.ArchivedAt != 0 { return 0, 0, errs.Archived } if rec.SeqNum == -1 { rec.SeqNum = header.LastSeqNum + 1 } else if rec.SeqNum != header.LastSeqNum+1 { return 0, 0, errs.Unexpected.WithMsg( "expected sequence number %d but got %d", header.LastSeqNum+1, rec.SeqNum) } seg.w.Reset(writerAtToWriter(seg.f, header.InsertAt)) n, err := rec.writeTo(seg.w) if err != nil { return 0, 0, err } if err := seg.w.Flush(); err != nil { return 0, 0, ioErrOrEOF(err) } // Write new header to sync. header.LastSeqNum = rec.SeqNum header.LastTimestampMS = rec.TimestampMS header.InsertAt += n err = seg.headWriter.Write(func(page []byte) error { header.WriteTo(page) return nil }) if err != nil { return 0, 0, err } seg.header = header seg.notifyMux.Notify(false, header) return rec.SeqNum, rec.TimestampMS, nil } // ---------------------------------------------------------------------------- func (seg *segment) Archive() error { seg.lock.Lock() defer seg.lock.Unlock() header := seg.header // Copy if header.ArchivedAt != 0 { return nil } header.ArchivedAt = time.Now().Unix() err := seg.headWriter.Write(func(page []byte) error { header.WriteTo(page) return nil }) if err != nil { return err } seg.w = nil // We won't be writing any more. seg.header = header seg.notifyMux.Notify(false, header) return nil } // ---------------------------------------------------------------------------- func (seg *segment) Iterator(fromSeqNum int64) (Iterator, error) { seg.lock.Lock() defer seg.lock.Unlock() if seg.closed { return nil, errs.Closed } f, err := os.Open(seg.f.Name()) if err != nil { return nil, errs.IO.WithErr(err) } header := seg.header if fromSeqNum == -1 { fromSeqNum = header.FirstSeqNum } return newSegmentIterator( f, fromSeqNum, seg.notifyMux.NewRecvr(header)) } // ---------------------------------------------------------------------------- func (seg *segment) Close() error { seg.lock.Lock() defer seg.lock.Unlock() if seg.closed { return nil } seg.closed = true header := seg.header seg.notifyMux.Notify(true, header) seg.f.Close() return nil }