package wal import ( "os" "time" "git.crumpington.com/public/jldb/lib/atomicheader" "git.crumpington.com/public/jldb/lib/errs" ) type segmentIterator struct { f *os.File recvr stateRecvr state segmentState offset int64 err error rec Record ticker *time.Ticker // Ticker if timeout has been set. tickerC <-chan time.Time // Ticker channel if timeout has been set. } func newSegmentIterator( f *os.File, fromSeqNum int64, recvr stateRecvr, ) ( Iterator, error, ) { it := &segmentIterator{ f: f, recvr: recvr, state: <-recvr.C, } if err := it.seekToSeqNum(fromSeqNum); err != nil { it.Close() return nil, err } it.rec.SeqNum = fromSeqNum - 1 it.ticker = time.NewTicker(time.Second) it.tickerC = it.ticker.C return it, nil } func (it *segmentIterator) seekToSeqNum(fromSeqNum int64) error { state := it.state // Is the requested sequence number out-of-range? if fromSeqNum < state.FirstSeqNum || fromSeqNum > state.LastSeqNum+1 { return errs.NotFound.WithMsg("sequence number not in segment") } // Seek to start. it.offset = atomicheader.ReservedBytes // Seek to first seq num - we're already there. if fromSeqNum == it.state.FirstSeqNum { return nil } for { if err := it.readRecord(); err != nil { return err } it.offset += it.rec.serializedSize() if it.rec.SeqNum == fromSeqNum-1 { return nil } } } func (it *segmentIterator) Close() { it.f.Close() it.recvr.Close() } // Next returns true if there's a record available to read via it.Record(). // // If Next returns false, the caller should check the error value with // it.Error(). func (it *segmentIterator) Next(timeout time.Duration) bool { if it.err != nil { return false } // Get new state if available. select { case it.state = <-it.recvr.C: default: } if it.state.Closed { it.err = errs.Closed return false } if it.rec.SeqNum < it.state.LastSeqNum { if it.err = it.readRecord(); it.err != nil { return false } it.offset += it.rec.serializedSize() return true } if it.state.Archived { it.err = errs.EOFArchived return false } if timeout <= 0 { return false // Nothing to return. } // Wait for new record, or timeout. it.ticker.Reset(timeout) // Get new state if available. select { case it.state = <-it.recvr.C: // OK case <-it.tickerC: return false // Timeout, no error. } if it.state.Closed { it.err = errs.Closed return false } if it.rec.SeqNum < it.state.LastSeqNum { if it.err = it.readRecord(); it.err != nil { return false } it.offset += it.rec.serializedSize() return true } if it.state.Archived { it.err = errs.EOFArchived return false } return false } func (it *segmentIterator) Record() Record { return it.rec } func (it *segmentIterator) Error() error { return it.err } func (it *segmentIterator) readRecord() error { return it.rec.readFrom(readerAtToReader(it.f, it.offset)) }