package mdb import ( "fmt" "net/http" "os" "sync" "sync/atomic" "time" "git.crumpington.com/public/jldb/lib/errs" "git.crumpington.com/public/jldb/lib/rep" "git.crumpington.com/public/jldb/mdb/change" "git.crumpington.com/public/jldb/mdb/pfile" ) type Config struct { RootDir string Primary bool ReplicationPSK string NetTimeout time.Duration // Default is 1 minute. // WAL settings. WALSegMinCount int64 // Minimum Change sets in a segment. Default is 1024. WALSegMaxAgeSec int64 // Maximum age of a segment. Default is 1 hour. WALSegGCAgeSec int64 // Segment age for garbage collection. Default is 7 days. // Necessary for secondary. PrimaryEndpoint string // MaxConcurrentUpdates restricts the number of concurently running updates, // and also limits the maximum number of changes that may be aggregated in // the WAL. // // Default is 32. MaxConcurrentUpdates int } func (c Config) repConfig() rep.Config { return rep.Config{ RootDir: repDirPath(c.RootDir), Primary: c.Primary, ReplicationPSK: c.ReplicationPSK, NetTimeout: c.NetTimeout, WALSegMinCount: c.WALSegMinCount, WALSegMaxAgeSec: c.WALSegMaxAgeSec, WALSegGCAgeSec: c.WALSegGCAgeSec, PrimaryEndpoint: c.PrimaryEndpoint, } } type Database struct { rep *rep.Replicator rootDir string conf Config pf *pfile.File idx *pfile.Index changes []change.Change // The Snapshot stored here is read-only. It will be replaced as needed by // the txAggregator (primary), or the follower (secondary). snapshot *atomic.Pointer[Snapshot] collections map[uint64]collection stop chan struct{} done *sync.WaitGroup txModPool chan txMod modChan chan txMod } func New(conf Config) *Database { if conf.NetTimeout <= 0 { conf.NetTimeout = time.Minute } if conf.MaxConcurrentUpdates <= 0 { conf.MaxConcurrentUpdates = 32 } db := &Database{ rootDir: conf.RootDir, conf: conf, snapshot: &atomic.Pointer[Snapshot]{}, collections: map[uint64]collection{}, stop: make(chan struct{}), done: &sync.WaitGroup{}, txModPool: make(chan txMod, conf.MaxConcurrentUpdates), modChan: make(chan txMod), } db.snapshot.Store(newSnapshot()) for i := 0; i < conf.MaxConcurrentUpdates; i++ { db.txModPool <- txMod{Resp: make(chan error, 1)} } return db } func (db *Database) Open() (err error) { if err := os.MkdirAll(db.rootDir, 0700); err != nil { return errs.IO.WithErr(err) } db.rep, err = rep.Open( rep.App{ SendState: db.repSendState, RecvState: db.repRecvState, InitStorage: db.repInitStorage, Replay: db.repReplay, LoadFromStorage: db.repLoadFromStorage, Apply: db.repApply, }, db.conf.repConfig()) if err != nil { return err } if db.conf.Primary { db.done.Add(1) go db.runTXAggreagtor() } return nil } func (db *Database) Close() error { select { case <-db.stop: return nil default: } close(db.stop) db.rep.Close() db.done.Wait() db.snapshot = nil db.collections = nil return nil } func (db *Database) Snapshot() *Snapshot { return db.snapshot.Load() } func (db *Database) Update(update func(tx *Snapshot) error) error { if !db.conf.Primary { return errs.ReadOnly.WithMsg("cannot update secondary directly") } mod := <-db.txModPool mod.Update = update db.modChan <- mod err := <-mod.Resp db.txModPool <- mod return err } func (db *Database) Info() Info { tx := db.Snapshot() repInfo := db.rep.Info() return Info{ SeqNum: tx.seqNum, TimestampMS: tx.timestampMS, WALFirstSeqNum: repInfo.WALFirstSeqNum, WALLastSeqNum: repInfo.WALLastSeqNum, WALLastTimestampMS: repInfo.WALLastTimestampMS, } } func (db *Database) addCollection(id uint64, c collection, collectionState any) { if _, ok := db.collections[id]; ok { panic(fmt.Sprintf("Collection %s uses duplicate ID %d.", c.Name(), id)) } db.collections[id] = c db.snapshot.Load().addCollection(id, collectionState) } func (db *Database) Handle(w http.ResponseWriter, r *http.Request) { db.rep.Handle(w, r) }