jldb/lib/wal/segment-iterator.go

167 lines
2.9 KiB
Go
Raw Permalink Normal View History

2023-10-13 09:43:27 +00:00
package wal
import (
"os"
"time"
2023-10-16 08:50:19 +00:00
"git.crumpington.com/public/jldb/lib/atomicheader"
"git.crumpington.com/public/jldb/lib/errs"
2023-10-13 09:43:27 +00:00
)
type segmentIterator struct {
f *os.File
recvr stateRecvr
state segmentState
offset int64
err error
rec Record
ticker *time.Ticker // Ticker if timeout has been set.
tickerC <-chan time.Time // Ticker channel if timeout has been set.
}
func newSegmentIterator(
f *os.File,
fromSeqNum int64,
recvr stateRecvr,
) (
Iterator,
error,
) {
it := &segmentIterator{
f: f,
recvr: recvr,
state: <-recvr.C,
}
if err := it.seekToSeqNum(fromSeqNum); err != nil {
it.Close()
return nil, err
}
it.rec.SeqNum = fromSeqNum - 1
it.ticker = time.NewTicker(time.Second)
it.tickerC = it.ticker.C
return it, nil
}
func (it *segmentIterator) seekToSeqNum(fromSeqNum int64) error {
state := it.state
// Is the requested sequence number out-of-range?
if fromSeqNum < state.FirstSeqNum || fromSeqNum > state.LastSeqNum+1 {
return errs.NotFound.WithMsg("sequence number not in segment")
}
// Seek to start.
it.offset = atomicheader.ReservedBytes
// Seek to first seq num - we're already there.
if fromSeqNum == it.state.FirstSeqNum {
return nil
}
for {
if err := it.readRecord(); err != nil {
return err
}
it.offset += it.rec.serializedSize()
if it.rec.SeqNum == fromSeqNum-1 {
return nil
}
}
}
func (it *segmentIterator) Close() {
it.f.Close()
it.recvr.Close()
}
// Next returns true if there's a record available to read via it.Record().
//
// If Next returns false, the caller should check the error value with
// it.Error().
func (it *segmentIterator) Next(timeout time.Duration) bool {
if it.err != nil {
return false
}
// Get new state if available.
select {
case it.state = <-it.recvr.C:
default:
}
if it.state.Closed {
it.err = errs.Closed
return false
}
if it.rec.SeqNum < it.state.LastSeqNum {
if it.err = it.readRecord(); it.err != nil {
return false
}
it.offset += it.rec.serializedSize()
return true
}
if it.state.Archived {
it.err = errs.EOFArchived
return false
}
if timeout <= 0 {
return false // Nothing to return.
}
// Wait for new record, or timeout.
it.ticker.Reset(timeout)
// Get new state if available.
select {
case it.state = <-it.recvr.C:
// OK
case <-it.tickerC:
return false // Timeout, no error.
}
if it.state.Closed {
it.err = errs.Closed
return false
}
if it.rec.SeqNum < it.state.LastSeqNum {
if it.err = it.readRecord(); it.err != nil {
return false
}
it.offset += it.rec.serializedSize()
return true
}
if it.state.Archived {
it.err = errs.EOFArchived
return false
}
return false
}
func (it *segmentIterator) Record() Record {
return it.rec
}
func (it *segmentIterator) Error() error {
return it.err
}
func (it *segmentIterator) readRecord() error {
return it.rec.readFrom(readerAtToReader(it.f, it.offset))
}