This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/kvstore/store.go

232 lines
3.8 KiB
Go

package kvstore
import (
"database/sql"
"fmt"
"net"
"sync"
"time"
"git.crumpington.com/private/mdb/kvstore/wal"
_ "github.com/mattn/go-sqlite3"
)
type KV struct {
primary bool
lockPath string
dataPath string
walPath string
w *wal.Writer
f *wal.Follower
db *sql.DB
stop chan struct{}
done sync.WaitGroup
onStore func(string, uint64, []byte)
onDelete func(string, uint64)
closeLock sync.Mutex
shippingLock sync.Mutex
}
func (kv *KV) init() {
opts := `?_journal=WAL`
db, err := sql.Open("sqlite3", kv.dataPath+opts)
must(err)
_, err = db.Exec(sqlSchema)
must(err)
if kv.primary {
kv.w = wal.NewWriterPrimary(kv.walPath)
} else {
kv.w = wal.NewWriterSecondary(kv.walPath)
}
kv.f = wal.NewFollower(kv.walPath)
kv.db = db
kv.stop = make(chan struct{})
kv.commit()
}
func (kv *KV) start() {
// Spawn follower in background to write data from WAL to data.
kv.done.Add(1)
go kv.background()
}
func NewPrimary(dir string) *KV {
kv := &KV{
primary: true,
dataPath: dataPath(dir),
walPath: walPath(dir),
}
kv.init()
kv.start()
return kv
}
func NewSecondary(
dir string,
onStore func(collection string, id uint64, data []byte),
onDelete func(collection string, id uint64),
) *KV {
kv := &KV{
primary: false,
dataPath: dataPath(dir),
walPath: walPath(dir),
}
kv.init()
kv.onStore = onStore
kv.onDelete = onDelete
kv.start()
return kv
}
func (kv *KV) Primary() bool {
return kv.primary
}
func (kv *KV) MaxSeqNum() (seqNum uint64) {
kv.db.QueryRow(sqlMaxSeqNumGet).Scan(&seqNum)
return seqNum
}
func (kv *KV) WALMaxSeqNum() uint64 {
return kv.w.MaxSeqNum()
}
func (kv *KV) Iterate(collection string, each func(id uint64, data []byte)) {
rows, err := kv.db.Query(sqlKVIterate, collection)
must(err)
defer rows.Close()
var (
id uint64
data []byte
)
for rows.Next() {
must(rows.Scan(&id, &data))
each(id, data)
}
}
func (kv *KV) Close() {
kv.closeLock.Lock()
defer kv.closeLock.Unlock()
if kv.w == nil {
return
}
kv.stop <- struct{}{}
kv.done.Wait()
kv.w.Close()
kv.f.Close()
kv.db.Close()
kv.w = nil
kv.f = nil
kv.db = nil
}
func (kv *KV) Store(collection string, id uint64, data []byte) {
if !kv.primary {
panic("Store called on secondary.")
}
kv.w.Store(collection, id, data)
}
func (kv *KV) Delete(collection string, id uint64) {
if !kv.primary {
panic("Delete called on secondary.")
}
kv.w.Delete(collection, id)
}
func (kv *KV) SyncSend(conn net.Conn) {
if !kv.primary {
panic("SyncSend called on secondary.")
}
kv.f.SendWAL(conn)
}
func (kv *KV) SyncRecv(conn net.Conn) {
if kv.primary {
panic("SyncRecv called on primary.")
}
if !kv.shippingLock.TryLock() {
return
}
defer kv.shippingLock.Unlock()
kv.w.RecvWAL(conn)
}
func (kv *KV) background() {
defer kv.done.Done()
commitTicker := time.NewTicker(commitInterval)
defer commitTicker.Stop()
cleanTicker := time.NewTicker(cleanInterval)
defer cleanTicker.Stop()
for {
select {
case <-commitTicker.C:
kv.commit()
case <-cleanTicker.C:
kv.w.DeleteBefore(cleanBeforeSecs)
case <-kv.stop:
return
}
}
}
func (kv *KV) commit() {
maxSeqNum := kv.MaxSeqNum()
if maxSeqNum == kv.f.MaxSeqNum() {
return
}
tx, err := kv.db.Begin()
must(err)
upsert, err := tx.Prepare(sqlKVUpsert)
must(err)
delete, err := tx.Prepare(sqlKVDelete)
must(err)
err = kv.f.Replay(maxSeqNum, func(rec wal.Record) error {
if rec.SeqNum != maxSeqNum+1 {
return fmt.Errorf("expected sequence number %d but got %d", maxSeqNum+1, rec.SeqNum)
}
if rec.Store {
_, err = upsert.Exec(rec.Collection, rec.ID, rec.Data)
} else {
_, err = delete.Exec(rec.Collection, rec.ID)
}
maxSeqNum = rec.SeqNum
return err
})
must(err)
_, err = tx.Exec(sqlMaxSeqNumSet, maxSeqNum)
must(err)
must(tx.Commit())
}