jldb/lib/wal/segment.go

252 lines
4.7 KiB
Go

package wal
import (
"bufio"
"io"
"os"
"sync"
"time"
"git.crumpington.com/public/jldb/lib/atomicheader"
"git.crumpington.com/public/jldb/lib/errs"
)
type segment struct {
ID int64
lock sync.Mutex
closed bool
header segmentHeader
headWriter *atomicheader.Handler
f *os.File
notifyMux *notifyMux
// For non-archived segments.
w *bufio.Writer
}
func createSegment(path string, id, firstSeqNum, timestampMS int64) (*segment, error) {
f, err := os.Create(path)
if err != nil {
return nil, errs.IO.WithErr(err)
}
defer f.Close()
if err := atomicheader.Init(f); err != nil {
return nil, err
}
handler, err := atomicheader.Open(f)
if err != nil {
return nil, err
}
header := segmentHeader{
CreatedAt: time.Now().Unix(),
FirstSeqNum: firstSeqNum,
LastSeqNum: firstSeqNum - 1,
LastTimestampMS: timestampMS,
InsertAt: atomicheader.ReservedBytes,
}
err = handler.Write(func(page []byte) error {
header.WriteTo(page)
return nil
})
if err != nil {
return nil, err
}
return openSegment(path, id)
}
func openSegment(path string, id int64) (*segment, error) {
f, err := os.OpenFile(path, os.O_RDWR, 0600)
if err != nil {
return nil, errs.IO.WithErr(err)
}
handler, err := atomicheader.Open(f)
if err != nil {
f.Close()
return nil, err
}
var header segmentHeader
err = handler.Read(func(page []byte) error {
header.ReadFrom(page)
return nil
})
if err != nil {
f.Close()
return nil, err
}
if _, err := f.Seek(header.InsertAt, io.SeekStart); err != nil {
f.Close()
return nil, errs.IO.WithErr(err)
}
seg := &segment{
ID: id,
header: header,
headWriter: handler,
f: f,
notifyMux: newNotifyMux(),
}
if header.ArchivedAt == 0 {
seg.w = bufio.NewWriterSize(f, 1024*1024)
}
return seg, nil
}
// Append appends the data from r to the log atomically. If an error is
// returned, the caller should check for errs.Fatal. If a fatal error occurs,
// the segment should no longer be used.
func (seg *segment) Append(dataSize int64, r io.Reader) (int64, int64, error) {
return seg.appendRecord(Record{
SeqNum: -1,
TimestampMS: time.Now().UnixMilli(),
DataSize: dataSize,
Reader: r,
})
}
func (seg *segment) Header() segmentHeader {
seg.lock.Lock()
defer seg.lock.Unlock()
return seg.header
}
// appendRecord appends a record in an atomic fashion. Do not use the segment
// after a fatal error.
func (seg *segment) appendRecord(rec Record) (int64, int64, error) {
seg.lock.Lock()
defer seg.lock.Unlock()
header := seg.header // Copy.
if seg.closed {
return 0, 0, errs.Closed
}
if header.ArchivedAt != 0 {
return 0, 0, errs.Archived
}
if rec.SeqNum == -1 {
rec.SeqNum = header.LastSeqNum + 1
} else if rec.SeqNum != header.LastSeqNum+1 {
return 0, 0, errs.Unexpected.WithMsg(
"expected sequence number %d but got %d",
header.LastSeqNum+1,
rec.SeqNum)
}
seg.w.Reset(writerAtToWriter(seg.f, header.InsertAt))
n, err := rec.writeTo(seg.w)
if err != nil {
return 0, 0, err
}
if err := seg.w.Flush(); err != nil {
return 0, 0, ioErrOrEOF(err)
}
// Write new header to sync.
header.LastSeqNum = rec.SeqNum
header.LastTimestampMS = rec.TimestampMS
header.InsertAt += n
err = seg.headWriter.Write(func(page []byte) error {
header.WriteTo(page)
return nil
})
if err != nil {
return 0, 0, err
}
seg.header = header
seg.notifyMux.Notify(false, header)
return rec.SeqNum, rec.TimestampMS, nil
}
// ----------------------------------------------------------------------------
func (seg *segment) Archive() error {
seg.lock.Lock()
defer seg.lock.Unlock()
header := seg.header // Copy
if header.ArchivedAt != 0 {
return nil
}
header.ArchivedAt = time.Now().Unix()
err := seg.headWriter.Write(func(page []byte) error {
header.WriteTo(page)
return nil
})
if err != nil {
return err
}
seg.w = nil // We won't be writing any more.
seg.header = header
seg.notifyMux.Notify(false, header)
return nil
}
// ----------------------------------------------------------------------------
func (seg *segment) Iterator(fromSeqNum int64) (Iterator, error) {
seg.lock.Lock()
defer seg.lock.Unlock()
if seg.closed {
return nil, errs.Closed
}
f, err := os.Open(seg.f.Name())
if err != nil {
return nil, errs.IO.WithErr(err)
}
header := seg.header
if fromSeqNum == -1 {
fromSeqNum = header.FirstSeqNum
}
return newSegmentIterator(
f,
fromSeqNum,
seg.notifyMux.NewRecvr(header))
}
// ----------------------------------------------------------------------------
func (seg *segment) Close() error {
seg.lock.Lock()
defer seg.lock.Unlock()
if seg.closed {
return nil
}
seg.closed = true
header := seg.header
seg.notifyMux.Notify(true, header)
seg.f.Close()
return nil
}