jldb/lib/rep/replicator.go

243 lines
5.1 KiB
Go

package rep
import (
"io"
"log"
"net"
"os"
"sync"
"sync/atomic"
"time"
"git.crumpington.com/public/jldb/lib/atomicheader"
"git.crumpington.com/public/jldb/lib/errs"
"git.crumpington.com/public/jldb/lib/wal"
)
type Config struct {
RootDir string
Primary bool
ReplicationPSK string
NetTimeout time.Duration // Default is 1 minute.
// WAL settings.
WALSegMinCount int64 // Minimum Change sets in a segment. Default is 1024.
WALSegMaxAgeSec int64 // Maximum age of a segment. Default is 1 hour.
WALSegGCAgeSec int64 // Segment age for garbage collection. Default is 7 days.
// If true, Append won't return until a successful App.Apply.
SynchronousAppend bool
// Necessary for secondary.
PrimaryEndpoint string
}
type App struct {
// SendState: The primary may need to send storage state to a secondary node.
SendState func(conn net.Conn) error
// (1) RecvState: Secondary nodes may need to load state from the primary if
// the WAL is too far behind.
RecvState func(conn net.Conn) error
// (2) InitStorage: Prepare application storage for possible calls to
// Replay.
InitStorage func() error
// (3) Replay: write the change to storage. Replay must be idempotent.
Replay func(rec wal.Record) error
// (4) LoadFromStorage: load the application's state from it's persistent
// storage.
LoadFromStorage func() error
// (5) Apply: write the change to persistent storage. Apply must be
// idempotent. In normal operation each change is applied exactly once.
Apply func(rec wal.Record) error
}
type Replicator struct {
app App
conf Config
lockFile *os.File
pskBytes [64]byte // 64 ascii characters. See pskToBytes.
wal *wal.WAL
appendNotify chan struct{}
// lock protects state. The lock is held when replaying (R), following (R),
// and sending state (W).
stateFile *os.File
state *atomic.Pointer[localState]
stateHandler *atomicheader.Handler
stop chan struct{}
done *sync.WaitGroup
client *client // For secondary connection to primary.
}
func Open(app App, conf Config) (*Replicator, error) {
rep := &Replicator{
app: app,
conf: conf,
state: &atomic.Pointer[localState]{},
stop: make(chan struct{}),
done: &sync.WaitGroup{},
appendNotify: make(chan struct{}, 1),
}
rep.loadConfigDefaults()
rep.state.Store(&localState{})
rep.client = newClient(rep.conf.PrimaryEndpoint, rep.conf.ReplicationPSK, rep.conf.NetTimeout)
if err := rep.initDirectories(); err != nil {
log.Printf("Failed to init directories: %v", err)
return nil, err
}
if err := rep.acquireLock(); err != nil {
rep.Close()
log.Printf("Failed to acquire lock: %v", err)
return nil, err
}
if err := rep.loadLocalState(); err != nil {
rep.Close()
log.Printf("Failed to load local state: %v", err)
return nil, err
}
if err := rep.openWAL(); err != nil {
rep.Close()
log.Printf("Failed to open WAL: %v", err)
return nil, err
}
if err := rep.recvStateIfNecessary(); err != nil {
rep.Close()
log.Printf("Failed to recv state: %v", err)
return nil, err
}
if err := rep.app.InitStorage(); err != nil {
rep.Close()
log.Printf("Failed to init storage: %v", err)
return nil, err
}
if err := rep.replay(); err != nil {
rep.Close()
log.Printf("Failed to replay: %v", err)
return nil, err
}
if err := rep.app.LoadFromStorage(); err != nil {
rep.Close()
log.Printf("Failed to load from storage: %v", err)
return nil, err
}
rep.startWALGC()
rep.startWALFollower()
if !rep.conf.Primary {
rep.startWALRecvr()
}
return rep, nil
}
func (rep *Replicator) Append(size int64, r io.Reader) (int64, int64, error) {
if !rep.conf.Primary {
return 0, 0, errs.NotAllowed.WithMsg("cannot write to secondary")
}
seqNum, timestampMS, err := rep.wal.Append(size, r)
if err != nil {
return 0, 0, err
}
if !rep.conf.SynchronousAppend {
return seqNum, timestampMS, nil
}
<-rep.appendNotify
return seqNum, timestampMS, nil
}
func (rep *Replicator) Primary() bool {
return rep.conf.Primary
}
func (rep *Replicator) ack(seqNum, timestampMS int64) error {
state := rep.getState()
state.SeqNum = seqNum
state.TimestampMS = timestampMS
return rep.setState(state)
}
func (rep *Replicator) getState() localState {
return *rep.state.Load()
}
func (rep *Replicator) setState(state localState) error {
err := rep.stateHandler.Write(func(page []byte) error {
state.writeTo(page)
return nil
})
if err != nil {
return err
}
rep.state.Store(&state)
return nil
}
func (rep *Replicator) Info() Info {
state := rep.getState()
walInfo := rep.wal.Info()
return Info{
AppSeqNum: state.SeqNum,
AppTimestampMS: state.TimestampMS,
WALFirstSeqNum: walInfo.FirstSeqNum,
WALLastSeqNum: walInfo.LastSeqNum,
WALLastTimestampMS: walInfo.LastTimestampMS,
}
}
func (rep *Replicator) Close() error {
if rep.stopped() {
return nil
}
close(rep.stop)
rep.done.Wait()
if rep.lockFile != nil {
rep.lockFile.Close()
}
if rep.wal != nil {
rep.wal.Close()
}
if rep.client != nil {
rep.client.Close()
}
return nil
}
func (rep *Replicator) stopped() bool {
select {
case <-rep.stop:
return true
default:
return false
}
}