package kvstore import ( "database/sql" "fmt" "net" "sync" "time" "git.crumpington.com/private/mdb/kvstore/wal" _ "github.com/mattn/go-sqlite3" ) type KV struct { primary bool lockPath string dataPath string walPath string w *wal.Writer f *wal.Follower db *sql.DB stop chan struct{} done sync.WaitGroup onStore func(string, uint64, []byte) onDelete func(string, uint64) closeLock sync.Mutex shippingLock sync.Mutex } func (kv *KV) init() { opts := `?_journal=WAL` db, err := sql.Open("sqlite3", kv.dataPath+opts) must(err) _, err = db.Exec(sqlSchema) must(err) if kv.primary { kv.w = wal.NewWriterPrimary(kv.walPath) } else { kv.w = wal.NewWriterSecondary(kv.walPath) } kv.f = wal.NewFollower(kv.walPath) kv.db = db kv.stop = make(chan struct{}) kv.commit() } func (kv *KV) start() { // Spawn follower in background to write data from WAL to data. kv.done.Add(1) go kv.background() } func NewPrimary(dir string) *KV { kv := &KV{ primary: true, dataPath: dataPath(dir), walPath: walPath(dir), } kv.init() kv.start() return kv } func NewSecondary( dir string, onStore func(collection string, id uint64, data []byte), onDelete func(collection string, id uint64), ) *KV { kv := &KV{ primary: false, dataPath: dataPath(dir), walPath: walPath(dir), } kv.init() kv.onStore = onStore kv.onDelete = onDelete kv.start() return kv } func (kv *KV) Primary() bool { return kv.primary } func (kv *KV) MaxSeqNum() (seqNum uint64) { kv.db.QueryRow(sqlMaxSeqNumGet).Scan(&seqNum) return seqNum } func (kv *KV) WALMaxSeqNum() uint64 { return kv.w.MaxSeqNum() } func (kv *KV) Iterate(collection string, each func(id uint64, data []byte)) { rows, err := kv.db.Query(sqlKVIterate, collection) must(err) defer rows.Close() var ( id uint64 data []byte ) for rows.Next() { must(rows.Scan(&id, &data)) each(id, data) } } func (kv *KV) Close() { kv.closeLock.Lock() defer kv.closeLock.Unlock() if kv.w == nil { return } kv.stop <- struct{}{} kv.done.Wait() kv.w.Close() kv.f.Close() kv.db.Close() kv.w = nil kv.f = nil kv.db = nil } func (kv *KV) Store(collection string, id uint64, data []byte) { if !kv.primary { panic("Store called on secondary.") } kv.w.Store(collection, id, data) } func (kv *KV) Delete(collection string, id uint64) { if !kv.primary { panic("Delete called on secondary.") } kv.w.Delete(collection, id) } func (kv *KV) SyncSend(conn net.Conn) { if !kv.primary { panic("SyncSend called on secondary.") } kv.f.SendWAL(conn) } func (kv *KV) SyncRecv(conn net.Conn) { if kv.primary { panic("SyncRecv called on primary.") } if !kv.shippingLock.TryLock() { return } defer kv.shippingLock.Unlock() kv.w.RecvWAL(conn) } func (kv *KV) background() { defer kv.done.Done() commitTicker := time.NewTicker(commitInterval) defer commitTicker.Stop() cleanTicker := time.NewTicker(cleanInterval) defer cleanTicker.Stop() for { select { case <-commitTicker.C: kv.commit() case <-cleanTicker.C: kv.w.DeleteBefore(cleanBeforeSecs) case <-kv.stop: return } } } func (kv *KV) commit() { maxSeqNum := kv.MaxSeqNum() if maxSeqNum == kv.f.MaxSeqNum() { return } tx, err := kv.db.Begin() must(err) upsert, err := tx.Prepare(sqlKVUpsert) must(err) delete, err := tx.Prepare(sqlKVDelete) must(err) err = kv.f.Replay(maxSeqNum, func(rec wal.Record) error { if rec.SeqNum != maxSeqNum+1 { return fmt.Errorf("expected sequence number %d but got %d", maxSeqNum+1, rec.SeqNum) } if rec.Store { _, err = upsert.Exec(rec.Collection, rec.ID, rec.Data) } else { _, err = delete.Exec(rec.Collection, rec.ID) } maxSeqNum = rec.SeqNum return err }) must(err) _, err = tx.Exec(sqlMaxSeqNumSet, maxSeqNum) must(err) must(tx.Commit()) }