This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/kvstore/writer.go

192 lines
3.0 KiB
Go

package kvstore
import (
"database/sql"
"sync"
"time"
)
type writer struct {
db *sql.DB
modQ chan modJob
stop chan struct{}
wg sync.WaitGroup
}
func newWriter(db *sql.DB) *writer {
return &writer{
db: db,
stop: make(chan struct{}, 1),
modQ: make(chan modJob, modQSize),
}
}
func (w *writer) Start(maxSeqNum uint64) {
w.wg.Add(1)
go w.run(maxSeqNum)
}
func (w *writer) Stop() {
select {
case w.stop <- struct{}{}:
default:
}
w.wg.Wait()
}
// Takes ownership of the incoming data.
func (w *writer) Store(collection string, id uint64, data []byte) {
job := modJob{
Collection: collection,
ID: id,
Store: true,
Data: data,
Ready: &sync.WaitGroup{},
}
job.Ready.Add(1)
w.modQ <- job
job.Ready.Wait()
}
func (w *writer) Delete(collection string, id uint64) {
job := modJob{
Collection: collection,
ID: id,
Store: false,
Ready: &sync.WaitGroup{},
}
job.Ready.Add(1)
w.modQ <- job
job.Ready.Wait()
}
// Takes ownership of the incoming data.
func (w *writer) StoreAsync(collection string, id uint64, data []byte) {
w.modQ <- modJob{
Collection: collection,
ID: id,
Store: true,
Data: data,
}
}
func (w *writer) DeleteAsync(collection string, id uint64) {
w.modQ <- modJob{
Collection: collection,
ID: id,
Store: false,
}
}
func (w *writer) run(maxSeqNum uint64) {
defer w.wg.Done()
var (
mod modJob
tx *sql.Tx
insertData *sql.Stmt
insertKV *sql.Stmt
deleteData *sql.Stmt
deleteKV *sql.Stmt
insertLog *sql.Stmt
err error
newSeqNum uint64
now int64
wgs = make([]*sync.WaitGroup, 10)
)
BEGIN:
insertData = nil
deleteData = nil
newSeqNum = maxSeqNum
wgs = wgs[:0]
select {
case mod = <-w.modQ:
case <-w.stop:
return
}
tx, err = w.db.Begin()
must(err)
now = time.Now().Unix()
insertLog, err = tx.Prepare(sqlInsertLog)
must(err)
LOOP:
if mod.Ready != nil {
wgs = append(wgs, mod.Ready)
}
newSeqNum++
if mod.Store {
goto STORE
} else {
goto DELETE
}
STORE:
if insertData == nil {
insertData, err = tx.Prepare(sqlInsertData)
must(err)
insertKV, err = tx.Prepare(sqlInsertKV)
must(err)
}
_, err = insertData.Exec(newSeqNum, mod.Data)
must(err)
_, err = insertKV.Exec(mod.Collection, mod.ID, newSeqNum)
must(err)
_, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, true)
must(err)
RecycleDataBuf(mod.Data)
goto NEXT
DELETE:
if deleteData == nil {
deleteData, err = tx.Prepare(sqlDeleteData)
must(err)
deleteKV, err = tx.Prepare(sqlDeleteKV)
must(err)
}
_, err = deleteData.Exec(mod.Collection, mod.ID)
must(err)
_, err = deleteKV.Exec(mod.Collection, mod.ID)
must(err)
_, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, false)
must(err)
goto NEXT
NEXT:
select {
case mod = <-w.modQ:
goto LOOP
default:
}
goto COMMIT
COMMIT:
must(tx.Commit())
maxSeqNum = newSeqNum
for i := range wgs {
wgs[i].Done()
}
goto BEGIN
}