129 lines
2.1 KiB
Go
129 lines
2.1 KiB
Go
package kvstore
|
|
|
|
import (
|
|
"database/sql"
|
|
"sync"
|
|
"time"
|
|
|
|
_ "github.com/mattn/go-sqlite3"
|
|
)
|
|
|
|
type KV struct {
|
|
primary bool
|
|
dbPath string
|
|
|
|
db *sql.DB
|
|
|
|
maxSeqNumStmt *sql.Stmt
|
|
logIterateStmt *sql.Stmt
|
|
|
|
w *writer
|
|
|
|
onStore func(string, uint64, []byte)
|
|
onDelete func(string, uint64)
|
|
|
|
closeLock sync.Mutex
|
|
recvLock sync.Mutex
|
|
}
|
|
|
|
func newKV(
|
|
dir string,
|
|
primary bool,
|
|
onStore func(string, uint64, []byte),
|
|
onDelete func(string, uint64),
|
|
) *KV {
|
|
kv := &KV{
|
|
dbPath: dbPath(dir),
|
|
primary: primary,
|
|
onStore: onStore,
|
|
onDelete: onDelete,
|
|
}
|
|
|
|
opts := `?_journal=WAL`
|
|
db, err := sql.Open("sqlite3", kv.dbPath+opts)
|
|
must(err)
|
|
|
|
_, err = db.Exec(sqlSchema)
|
|
must(err)
|
|
|
|
kv.maxSeqNumStmt, err = db.Prepare(sqlMaxSeqNumGet)
|
|
must(err)
|
|
kv.logIterateStmt, err = db.Prepare(sqlLogIterate)
|
|
must(err)
|
|
|
|
_, err = db.Exec(sqlSchema)
|
|
must(err)
|
|
|
|
kv.db = db
|
|
|
|
if kv.primary {
|
|
kv.w = newWriter(kv.db)
|
|
kv.w.Start(kv.MaxSeqNum())
|
|
}
|
|
return kv
|
|
}
|
|
|
|
func NewPrimary(dir string) *KV {
|
|
return newKV(dir, true, nil, nil)
|
|
}
|
|
|
|
func NewSecondary(
|
|
dir string,
|
|
onStore func(collection string, id uint64, data []byte),
|
|
onDelete func(collection string, id uint64),
|
|
) *KV {
|
|
return newKV(dir, false, onStore, onDelete)
|
|
}
|
|
|
|
func (kv *KV) Primary() bool {
|
|
return kv.primary
|
|
}
|
|
|
|
func (kv *KV) MaxSeqNum() (seqNum uint64) {
|
|
must(kv.maxSeqNumStmt.QueryRow().Scan(&seqNum))
|
|
return seqNum
|
|
}
|
|
|
|
func (kv *KV) Iterate(collection string, each func(id uint64, data []byte)) {
|
|
rows, err := kv.db.Query(sqlKVIterate, collection)
|
|
must(err)
|
|
defer rows.Close()
|
|
|
|
var (
|
|
id uint64
|
|
data []byte
|
|
)
|
|
|
|
for rows.Next() {
|
|
must(rows.Scan(&id, &data))
|
|
each(id, data)
|
|
}
|
|
}
|
|
|
|
func (kv *KV) Close() {
|
|
kv.closeLock.Lock()
|
|
defer kv.closeLock.Unlock()
|
|
|
|
if kv.w != nil {
|
|
kv.w.Stop()
|
|
}
|
|
|
|
if kv.db != nil {
|
|
kv.db.Close()
|
|
kv.db = nil
|
|
}
|
|
}
|
|
|
|
func (kv *KV) Store(collection string, id uint64, data []byte) {
|
|
kv.w.Store(collection, id, data)
|
|
}
|
|
|
|
func (kv *KV) Delete(collection string, id uint64) {
|
|
kv.w.Delete(collection, id)
|
|
}
|
|
|
|
func (kv *KV) CleanBefore(seconds int64) {
|
|
_, err := kv.db.Exec(sqlCleanQuery, time.Now().Unix()-seconds)
|
|
must(err)
|
|
}
|