192 lines
3.0 KiB
Go
192 lines
3.0 KiB
Go
package kvstore
|
|
|
|
import (
|
|
"database/sql"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
type writer struct {
|
|
db *sql.DB
|
|
modQ chan modJob
|
|
stop chan struct{}
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func newWriter(db *sql.DB) *writer {
|
|
return &writer{
|
|
db: db,
|
|
stop: make(chan struct{}, 1),
|
|
modQ: make(chan modJob, modQSize),
|
|
}
|
|
}
|
|
|
|
func (w *writer) Start(maxSeqNum uint64) {
|
|
w.wg.Add(1)
|
|
go w.run(maxSeqNum)
|
|
}
|
|
|
|
func (w *writer) Stop() {
|
|
select {
|
|
case w.stop <- struct{}{}:
|
|
default:
|
|
}
|
|
w.wg.Wait()
|
|
}
|
|
|
|
// Takes ownership of the incoming data.
|
|
func (w *writer) Store(collection string, id uint64, data []byte) {
|
|
job := modJob{
|
|
Collection: collection,
|
|
ID: id,
|
|
Store: true,
|
|
Data: data,
|
|
Ready: &sync.WaitGroup{},
|
|
}
|
|
job.Ready.Add(1)
|
|
w.modQ <- job
|
|
job.Ready.Wait()
|
|
}
|
|
|
|
func (w *writer) Delete(collection string, id uint64) {
|
|
job := modJob{
|
|
Collection: collection,
|
|
ID: id,
|
|
Store: false,
|
|
Ready: &sync.WaitGroup{},
|
|
}
|
|
job.Ready.Add(1)
|
|
w.modQ <- job
|
|
job.Ready.Wait()
|
|
}
|
|
|
|
// Takes ownership of the incoming data.
|
|
func (w *writer) StoreAsync(collection string, id uint64, data []byte) {
|
|
w.modQ <- modJob{
|
|
Collection: collection,
|
|
ID: id,
|
|
Store: true,
|
|
Data: data,
|
|
}
|
|
}
|
|
|
|
func (w *writer) DeleteAsync(collection string, id uint64) {
|
|
w.modQ <- modJob{
|
|
Collection: collection,
|
|
ID: id,
|
|
Store: false,
|
|
}
|
|
}
|
|
|
|
func (w *writer) run(maxSeqNum uint64) {
|
|
defer w.wg.Done()
|
|
|
|
var (
|
|
mod modJob
|
|
tx *sql.Tx
|
|
insertData *sql.Stmt
|
|
insertKV *sql.Stmt
|
|
deleteData *sql.Stmt
|
|
deleteKV *sql.Stmt
|
|
insertLog *sql.Stmt
|
|
err error
|
|
newSeqNum uint64
|
|
now int64
|
|
wgs = make([]*sync.WaitGroup, 10)
|
|
)
|
|
|
|
BEGIN:
|
|
|
|
insertData = nil
|
|
deleteData = nil
|
|
|
|
newSeqNum = maxSeqNum
|
|
wgs = wgs[:0]
|
|
|
|
select {
|
|
case mod = <-w.modQ:
|
|
case <-w.stop:
|
|
return
|
|
}
|
|
|
|
tx, err = w.db.Begin()
|
|
must(err)
|
|
|
|
now = time.Now().Unix()
|
|
|
|
insertLog, err = tx.Prepare(sqlInsertLog)
|
|
must(err)
|
|
|
|
LOOP:
|
|
|
|
if mod.Ready != nil {
|
|
wgs = append(wgs, mod.Ready)
|
|
}
|
|
|
|
newSeqNum++
|
|
if mod.Store {
|
|
goto STORE
|
|
} else {
|
|
goto DELETE
|
|
}
|
|
|
|
STORE:
|
|
|
|
if insertData == nil {
|
|
insertData, err = tx.Prepare(sqlInsertData)
|
|
must(err)
|
|
insertKV, err = tx.Prepare(sqlInsertKV)
|
|
must(err)
|
|
}
|
|
|
|
_, err = insertData.Exec(newSeqNum, mod.Data)
|
|
must(err)
|
|
_, err = insertKV.Exec(mod.Collection, mod.ID, newSeqNum)
|
|
must(err)
|
|
_, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, true)
|
|
must(err)
|
|
|
|
RecycleDataBuf(mod.Data)
|
|
|
|
goto NEXT
|
|
|
|
DELETE:
|
|
|
|
if deleteData == nil {
|
|
deleteData, err = tx.Prepare(sqlDeleteData)
|
|
must(err)
|
|
deleteKV, err = tx.Prepare(sqlDeleteKV)
|
|
must(err)
|
|
}
|
|
|
|
_, err = deleteData.Exec(mod.Collection, mod.ID)
|
|
must(err)
|
|
_, err = deleteKV.Exec(mod.Collection, mod.ID)
|
|
must(err)
|
|
_, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, false)
|
|
must(err)
|
|
|
|
goto NEXT
|
|
|
|
NEXT:
|
|
|
|
select {
|
|
case mod = <-w.modQ:
|
|
goto LOOP
|
|
default:
|
|
}
|
|
|
|
goto COMMIT
|
|
|
|
COMMIT:
|
|
|
|
must(tx.Commit())
|
|
maxSeqNum = newSeqNum
|
|
|
|
for i := range wgs {
|
|
wgs[i].Done()
|
|
}
|
|
|
|
goto BEGIN
|
|
}
|