185 lines
4.0 KiB
Go
185 lines
4.0 KiB
Go
|
package mdb
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"git.crumpington.com/public/jldb/lib/errs"
|
||
|
"git.crumpington.com/public/jldb/lib/rep"
|
||
|
"git.crumpington.com/public/jldb/mdb/change"
|
||
|
"git.crumpington.com/public/jldb/mdb/pfile"
|
||
|
"net/http"
|
||
|
"os"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type Config struct {
|
||
|
RootDir string
|
||
|
Primary bool
|
||
|
ReplicationPSK string
|
||
|
NetTimeout time.Duration // Default is 1 minute.
|
||
|
|
||
|
// WAL settings.
|
||
|
WALSegMinCount int64 // Minimum Change sets in a segment. Default is 1024.
|
||
|
WALSegMaxAgeSec int64 // Maximum age of a segment. Default is 1 hour.
|
||
|
WALSegGCAgeSec int64 // Segment age for garbage collection. Default is 7 days.
|
||
|
|
||
|
// Necessary for secondary.
|
||
|
PrimaryEndpoint string
|
||
|
|
||
|
// MaxConcurrentUpdates restricts the number of concurently running updates,
|
||
|
// and also limits the maximum number of changes that may be aggregated in
|
||
|
// the WAL.
|
||
|
//
|
||
|
// Default is 32.
|
||
|
MaxConcurrentUpdates int
|
||
|
}
|
||
|
|
||
|
func (c Config) repConfig() rep.Config {
|
||
|
return rep.Config{
|
||
|
RootDir: repDirPath(c.RootDir),
|
||
|
Primary: c.Primary,
|
||
|
ReplicationPSK: c.ReplicationPSK,
|
||
|
NetTimeout: c.NetTimeout,
|
||
|
WALSegMinCount: c.WALSegMinCount,
|
||
|
WALSegMaxAgeSec: c.WALSegMaxAgeSec,
|
||
|
WALSegGCAgeSec: c.WALSegGCAgeSec,
|
||
|
PrimaryEndpoint: c.PrimaryEndpoint,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type Database struct {
|
||
|
rep *rep.Replicator
|
||
|
rootDir string
|
||
|
conf Config
|
||
|
|
||
|
pf *pfile.File
|
||
|
idx *pfile.Index
|
||
|
|
||
|
changes []change.Change
|
||
|
|
||
|
// The Snapshot stored here is read-only. It will be replaced as needed by
|
||
|
// the txAggregator (primary), or the follower (secondary).
|
||
|
snapshot *atomic.Pointer[Snapshot]
|
||
|
collections map[uint64]collection
|
||
|
|
||
|
stop chan struct{}
|
||
|
done *sync.WaitGroup
|
||
|
|
||
|
txModPool chan txMod
|
||
|
|
||
|
modChan chan txMod
|
||
|
}
|
||
|
|
||
|
func New(conf Config) *Database {
|
||
|
if conf.MaxConcurrentUpdates <= 0 {
|
||
|
conf.MaxConcurrentUpdates = 32
|
||
|
}
|
||
|
|
||
|
db := &Database{
|
||
|
rootDir: conf.RootDir,
|
||
|
conf: conf,
|
||
|
snapshot: &atomic.Pointer[Snapshot]{},
|
||
|
collections: map[uint64]collection{},
|
||
|
stop: make(chan struct{}),
|
||
|
done: &sync.WaitGroup{},
|
||
|
txModPool: make(chan txMod, conf.MaxConcurrentUpdates),
|
||
|
modChan: make(chan txMod),
|
||
|
}
|
||
|
|
||
|
db.snapshot.Store(newSnapshot())
|
||
|
|
||
|
for i := 0; i < conf.MaxConcurrentUpdates; i++ {
|
||
|
db.txModPool <- txMod{Resp: make(chan error, 1)}
|
||
|
}
|
||
|
|
||
|
return db
|
||
|
}
|
||
|
|
||
|
func (db *Database) Open() (err error) {
|
||
|
if err := os.MkdirAll(db.rootDir, 0700); err != nil {
|
||
|
return errs.IO.WithErr(err)
|
||
|
}
|
||
|
|
||
|
db.rep, err = rep.Open(
|
||
|
rep.App{
|
||
|
SendState: db.repSendState,
|
||
|
RecvState: db.repRecvState,
|
||
|
InitStorage: db.repInitStorage,
|
||
|
Replay: db.repReplay,
|
||
|
LoadFromStorage: db.repLoadFromStorage,
|
||
|
Apply: db.repApply,
|
||
|
},
|
||
|
db.conf.repConfig())
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
if db.conf.Primary {
|
||
|
db.done.Add(1)
|
||
|
go db.runTXAggreagtor()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (db *Database) Close() error {
|
||
|
select {
|
||
|
case <-db.stop:
|
||
|
return nil
|
||
|
default:
|
||
|
}
|
||
|
|
||
|
close(db.stop)
|
||
|
db.rep.Close()
|
||
|
db.done.Wait()
|
||
|
|
||
|
db.snapshot = nil
|
||
|
db.collections = nil
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (db *Database) Snapshot() *Snapshot {
|
||
|
return db.snapshot.Load()
|
||
|
}
|
||
|
|
||
|
func (db *Database) Update(update func(tx *Snapshot) error) error {
|
||
|
if !db.conf.Primary {
|
||
|
return errs.ReadOnly.WithMsg("cannot update secondary directly")
|
||
|
}
|
||
|
|
||
|
mod := <-db.txModPool
|
||
|
mod.Update = update
|
||
|
|
||
|
db.modChan <- mod
|
||
|
err := <-mod.Resp
|
||
|
db.txModPool <- mod
|
||
|
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
func (db *Database) Info() Info {
|
||
|
tx := db.Snapshot()
|
||
|
repInfo := db.rep.Info()
|
||
|
|
||
|
return Info{
|
||
|
SeqNum: tx.seqNum,
|
||
|
TimestampMS: tx.timestampMS,
|
||
|
WALFirstSeqNum: repInfo.WALFirstSeqNum,
|
||
|
WALLastSeqNum: repInfo.WALLastSeqNum,
|
||
|
WALLastTimestampMS: repInfo.WALLastTimestampMS,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (db *Database) addCollection(id uint64, c collection, collectionState any) {
|
||
|
if _, ok := db.collections[id]; ok {
|
||
|
panic(fmt.Sprintf("Collection %s uses duplicate ID %d.", c.Name(), id))
|
||
|
}
|
||
|
db.collections[id] = c
|
||
|
db.snapshot.Load().addCollection(id, collectionState)
|
||
|
}
|
||
|
|
||
|
func (db *Database) Handle(w http.ResponseWriter, r *http.Request) {
|
||
|
db.rep.Handle(w, r)
|
||
|
}
|