package kvstore import ( "database/sql" "sync" "time" _ "github.com/mattn/go-sqlite3" ) type KV struct { primary bool dbPath string db *sql.DB maxSeqNumStmt *sql.Stmt logIterateStmt *sql.Stmt w *writer onStore func(string, uint64, []byte) onDelete func(string, uint64) closeLock sync.Mutex recvLock sync.Mutex } func newKV( dir string, primary bool, onStore func(string, uint64, []byte), onDelete func(string, uint64), ) *KV { kv := &KV{ dbPath: dbPath(dir), primary: primary, onStore: onStore, onDelete: onDelete, } opts := `?_journal=WAL` db, err := sql.Open("sqlite3", kv.dbPath+opts) must(err) _, err = db.Exec(sqlSchema) must(err) kv.maxSeqNumStmt, err = db.Prepare(sqlMaxSeqNumGet) must(err) kv.logIterateStmt, err = db.Prepare(sqlLogIterate) must(err) _, err = db.Exec(sqlSchema) must(err) kv.db = db if kv.primary { kv.w = newWriter(kv.db) kv.w.Start(kv.MaxSeqNum()) } return kv } func NewPrimary(dir string) *KV { return newKV(dir, true, nil, nil) } func NewSecondary( dir string, onStore func(collection string, id uint64, data []byte), onDelete func(collection string, id uint64), ) *KV { return newKV(dir, false, onStore, onDelete) } func (kv *KV) Primary() bool { return kv.primary } func (kv *KV) MaxSeqNum() (seqNum uint64) { must(kv.maxSeqNumStmt.QueryRow().Scan(&seqNum)) return seqNum } func (kv *KV) Iterate(collection string, each func(id uint64, data []byte)) { rows, err := kv.db.Query(sqlKVIterate, collection) must(err) defer rows.Close() var ( id uint64 data []byte ) for rows.Next() { must(rows.Scan(&id, &data)) each(id, data) } } func (kv *KV) Close() { kv.closeLock.Lock() defer kv.closeLock.Unlock() if kv.w != nil { kv.w.Stop() } if kv.db != nil { kv.db.Close() kv.db = nil } } func (kv *KV) Store(collection string, id uint64, data []byte) { kv.w.Store(collection, id, data) } func (kv *KV) Delete(collection string, id uint64) { kv.w.Delete(collection, id) } func (kv *KV) CleanBefore(seconds int64) { _, err := kv.db.Exec(sqlCleanQuery, time.Now().Unix()-seconds) must(err) }