252 lines
4.7 KiB
Go
252 lines
4.7 KiB
Go
package wal
|
|
|
|
import (
|
|
"bufio"
|
|
"io"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.crumpington.com/public/jldb/lib/atomicheader"
|
|
"git.crumpington.com/public/jldb/lib/errs"
|
|
)
|
|
|
|
type segment struct {
|
|
ID int64
|
|
|
|
lock sync.Mutex
|
|
|
|
closed bool
|
|
header segmentHeader
|
|
headWriter *atomicheader.Handler
|
|
f *os.File
|
|
notifyMux *notifyMux
|
|
|
|
// For non-archived segments.
|
|
w *bufio.Writer
|
|
}
|
|
|
|
func createSegment(path string, id, firstSeqNum, timestampMS int64) (*segment, error) {
|
|
f, err := os.Create(path)
|
|
if err != nil {
|
|
return nil, errs.IO.WithErr(err)
|
|
}
|
|
defer f.Close()
|
|
|
|
if err := atomicheader.Init(f); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
handler, err := atomicheader.Open(f)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
header := segmentHeader{
|
|
CreatedAt: time.Now().Unix(),
|
|
FirstSeqNum: firstSeqNum,
|
|
LastSeqNum: firstSeqNum - 1,
|
|
LastTimestampMS: timestampMS,
|
|
InsertAt: atomicheader.ReservedBytes,
|
|
}
|
|
|
|
err = handler.Write(func(page []byte) error {
|
|
header.WriteTo(page)
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return openSegment(path, id)
|
|
}
|
|
|
|
func openSegment(path string, id int64) (*segment, error) {
|
|
f, err := os.OpenFile(path, os.O_RDWR, 0600)
|
|
if err != nil {
|
|
return nil, errs.IO.WithErr(err)
|
|
}
|
|
|
|
handler, err := atomicheader.Open(f)
|
|
if err != nil {
|
|
f.Close()
|
|
return nil, err
|
|
}
|
|
|
|
var header segmentHeader
|
|
err = handler.Read(func(page []byte) error {
|
|
header.ReadFrom(page)
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
f.Close()
|
|
return nil, err
|
|
}
|
|
|
|
if _, err := f.Seek(header.InsertAt, io.SeekStart); err != nil {
|
|
f.Close()
|
|
return nil, errs.IO.WithErr(err)
|
|
}
|
|
|
|
seg := &segment{
|
|
ID: id,
|
|
header: header,
|
|
headWriter: handler,
|
|
f: f,
|
|
notifyMux: newNotifyMux(),
|
|
}
|
|
|
|
if header.ArchivedAt == 0 {
|
|
seg.w = bufio.NewWriterSize(f, 1024*1024)
|
|
}
|
|
|
|
return seg, nil
|
|
}
|
|
|
|
// Append appends the data from r to the log atomically. If an error is
|
|
// returned, the caller should check for errs.Fatal. If a fatal error occurs,
|
|
// the segment should no longer be used.
|
|
func (seg *segment) Append(dataSize int64, r io.Reader) (int64, int64, error) {
|
|
return seg.appendRecord(Record{
|
|
SeqNum: -1,
|
|
TimestampMS: time.Now().UnixMilli(),
|
|
DataSize: dataSize,
|
|
Reader: r,
|
|
})
|
|
}
|
|
|
|
func (seg *segment) Header() segmentHeader {
|
|
seg.lock.Lock()
|
|
defer seg.lock.Unlock()
|
|
return seg.header
|
|
}
|
|
|
|
// appendRecord appends a record in an atomic fashion. Do not use the segment
|
|
// after a fatal error.
|
|
func (seg *segment) appendRecord(rec Record) (int64, int64, error) {
|
|
seg.lock.Lock()
|
|
defer seg.lock.Unlock()
|
|
|
|
header := seg.header // Copy.
|
|
|
|
if seg.closed {
|
|
return 0, 0, errs.Closed
|
|
}
|
|
|
|
if header.ArchivedAt != 0 {
|
|
return 0, 0, errs.Archived
|
|
}
|
|
|
|
if rec.SeqNum == -1 {
|
|
rec.SeqNum = header.LastSeqNum + 1
|
|
} else if rec.SeqNum != header.LastSeqNum+1 {
|
|
return 0, 0, errs.Unexpected.WithMsg(
|
|
"expected sequence number %d but got %d",
|
|
header.LastSeqNum+1,
|
|
rec.SeqNum)
|
|
}
|
|
|
|
seg.w.Reset(writerAtToWriter(seg.f, header.InsertAt))
|
|
|
|
n, err := rec.writeTo(seg.w)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
if err := seg.w.Flush(); err != nil {
|
|
return 0, 0, ioErrOrEOF(err)
|
|
}
|
|
|
|
// Write new header to sync.
|
|
header.LastSeqNum = rec.SeqNum
|
|
header.LastTimestampMS = rec.TimestampMS
|
|
header.InsertAt += n
|
|
|
|
err = seg.headWriter.Write(func(page []byte) error {
|
|
header.WriteTo(page)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
|
|
seg.header = header
|
|
seg.notifyMux.Notify(false, header)
|
|
|
|
return rec.SeqNum, rec.TimestampMS, nil
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (seg *segment) Archive() error {
|
|
seg.lock.Lock()
|
|
defer seg.lock.Unlock()
|
|
|
|
header := seg.header // Copy
|
|
if header.ArchivedAt != 0 {
|
|
return nil
|
|
}
|
|
|
|
header.ArchivedAt = time.Now().Unix()
|
|
err := seg.headWriter.Write(func(page []byte) error {
|
|
header.WriteTo(page)
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
seg.w = nil // We won't be writing any more.
|
|
|
|
seg.header = header
|
|
seg.notifyMux.Notify(false, header)
|
|
return nil
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (seg *segment) Iterator(fromSeqNum int64) (Iterator, error) {
|
|
seg.lock.Lock()
|
|
defer seg.lock.Unlock()
|
|
|
|
if seg.closed {
|
|
return nil, errs.Closed
|
|
}
|
|
|
|
f, err := os.Open(seg.f.Name())
|
|
if err != nil {
|
|
return nil, errs.IO.WithErr(err)
|
|
}
|
|
|
|
header := seg.header
|
|
if fromSeqNum == -1 {
|
|
fromSeqNum = header.FirstSeqNum
|
|
}
|
|
|
|
return newSegmentIterator(
|
|
f,
|
|
fromSeqNum,
|
|
seg.notifyMux.NewRecvr(header))
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (seg *segment) Close() error {
|
|
seg.lock.Lock()
|
|
defer seg.lock.Unlock()
|
|
|
|
if seg.closed {
|
|
return nil
|
|
}
|
|
|
|
seg.closed = true
|
|
|
|
header := seg.header
|
|
seg.notifyMux.Notify(true, header)
|
|
seg.f.Close()
|
|
|
|
return nil
|
|
}
|