jldb/mdb/db.go

190 lines
4.0 KiB
Go

package mdb
import (
"fmt"
"net/http"
"os"
"sync"
"sync/atomic"
"time"
"git.crumpington.com/public/jldb/lib/errs"
"git.crumpington.com/public/jldb/lib/rep"
"git.crumpington.com/public/jldb/mdb/change"
"git.crumpington.com/public/jldb/mdb/pfile"
)
type Config struct {
RootDir string
Primary bool
ReplicationPSK string
NetTimeout time.Duration // Default is 1 minute.
// WAL settings.
WALSegMinCount int64 // Minimum Change sets in a segment. Default is 1024.
WALSegMaxAgeSec int64 // Maximum age of a segment. Default is 1 hour.
WALSegGCAgeSec int64 // Segment age for garbage collection. Default is 7 days.
// Necessary for secondary.
PrimaryEndpoint string
// MaxConcurrentUpdates restricts the number of concurently running updates,
// and also limits the maximum number of changes that may be aggregated in
// the WAL.
//
// Default is 32.
MaxConcurrentUpdates int
}
func (c Config) repConfig() rep.Config {
return rep.Config{
RootDir: repDirPath(c.RootDir),
Primary: c.Primary,
ReplicationPSK: c.ReplicationPSK,
NetTimeout: c.NetTimeout,
WALSegMinCount: c.WALSegMinCount,
WALSegMaxAgeSec: c.WALSegMaxAgeSec,
WALSegGCAgeSec: c.WALSegGCAgeSec,
PrimaryEndpoint: c.PrimaryEndpoint,
}
}
type Database struct {
rep *rep.Replicator
rootDir string
conf Config
pf *pfile.File
idx *pfile.Index
changes []change.Change
// The Snapshot stored here is read-only. It will be replaced as needed by
// the txAggregator (primary), or the follower (secondary).
snapshot *atomic.Pointer[Snapshot]
collections map[uint64]collection
stop chan struct{}
done *sync.WaitGroup
txModPool chan txMod
modChan chan txMod
}
func New(conf Config) *Database {
if conf.NetTimeout <= 0 {
conf.NetTimeout = time.Minute
}
if conf.MaxConcurrentUpdates <= 0 {
conf.MaxConcurrentUpdates = 32
}
db := &Database{
rootDir: conf.RootDir,
conf: conf,
snapshot: &atomic.Pointer[Snapshot]{},
collections: map[uint64]collection{},
stop: make(chan struct{}),
done: &sync.WaitGroup{},
txModPool: make(chan txMod, conf.MaxConcurrentUpdates),
modChan: make(chan txMod),
}
db.snapshot.Store(newSnapshot())
for i := 0; i < conf.MaxConcurrentUpdates; i++ {
db.txModPool <- txMod{Resp: make(chan error, 1)}
}
return db
}
func (db *Database) Open() (err error) {
if err := os.MkdirAll(db.rootDir, 0700); err != nil {
return errs.IO.WithErr(err)
}
db.rep, err = rep.Open(
rep.App{
SendState: db.repSendState,
RecvState: db.repRecvState,
InitStorage: db.repInitStorage,
Replay: db.repReplay,
LoadFromStorage: db.repLoadFromStorage,
Apply: db.repApply,
},
db.conf.repConfig())
if err != nil {
return err
}
if db.conf.Primary {
db.done.Add(1)
go db.runTXAggreagtor()
}
return nil
}
func (db *Database) Close() error {
select {
case <-db.stop:
return nil
default:
}
close(db.stop)
db.rep.Close()
db.done.Wait()
db.snapshot = nil
db.collections = nil
return nil
}
func (db *Database) Snapshot() *Snapshot {
return db.snapshot.Load()
}
func (db *Database) Update(update func(tx *Snapshot) error) error {
if !db.conf.Primary {
return errs.ReadOnly.WithMsg("cannot update secondary directly")
}
mod := <-db.txModPool
mod.Update = update
db.modChan <- mod
err := <-mod.Resp
db.txModPool <- mod
return err
}
func (db *Database) Info() Info {
tx := db.Snapshot()
repInfo := db.rep.Info()
return Info{
SeqNum: tx.seqNum,
TimestampMS: tx.timestampMS,
WALFirstSeqNum: repInfo.WALFirstSeqNum,
WALLastSeqNum: repInfo.WALLastSeqNum,
WALLastTimestampMS: repInfo.WALLastTimestampMS,
}
}
func (db *Database) addCollection(id uint64, c collection, collectionState any) {
if _, ok := db.collections[id]; ok {
panic(fmt.Sprintf("Collection %s uses duplicate ID %d.", c.Name(), id))
}
db.collections[id] = c
db.snapshot.Load().addCollection(id, collectionState)
}
func (db *Database) Handle(w http.ResponseWriter, r *http.Request) {
db.rep.Handle(w, r)
}