This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/wal/writer.go

177 lines
2.6 KiB
Go

package wal
import (
"database/sql"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
type insertJob struct {
Collection string
ID uint64
Store bool
Data []byte
Ready *sync.WaitGroup
}
type Writer struct {
db *sql.DB
insert *sql.Stmt
insertQ chan insertJob
doneWG sync.WaitGroup
}
func NewWriter(walPath string) *Writer {
db := initWAL(walPath)
insert, err := db.Prepare(sqlWALInsert)
must(err)
w := &Writer{
db: db,
insert: insert,
insertQ: make(chan insertJob, 1024),
}
var maxSeqNum uint64
row := db.QueryRow(sqlWALMaxSeqNum)
must(row.Scan(&maxSeqNum))
w.doneWG.Add(1)
go w.insertProc(maxSeqNum)
return w
}
func (w *Writer) Close() {
if w.db == nil {
return
}
close(w.insertQ)
w.doneWG.Wait()
w.db.Close()
w.db = nil
}
func (w *Writer) Store(collection string, id uint64, data []byte) {
job := insertJob{
Collection: collection,
ID: id,
Store: true,
Data: data,
Ready: &sync.WaitGroup{},
}
job.Ready.Add(1)
w.insertQ <- job
job.Ready.Wait()
}
func (w *Writer) Delete(collection string, id uint64) {
job := insertJob{
Collection: collection,
ID: id,
Store: false,
Ready: &sync.WaitGroup{},
}
job.Ready.Add(1)
w.insertQ <- job
job.Ready.Wait()
}
// Called single-threaded from RecvWAL.
func (w *Writer) storeAsync(collection string, id uint64, data []byte) {
w.insertQ <- insertJob{
Collection: collection,
ID: id,
Store: true,
Data: data,
}
}
// Called single-threaded from RecvWAL.
func (w *Writer) deleteAsync(collection string, id uint64) {
w.insertQ <- insertJob{
Collection: collection,
ID: id,
Store: false,
}
}
func (w *Writer) MaxSeqNum() (n uint64) {
w.db.QueryRow(sqlWALMaxSeqNum).Scan(&n)
return
}
func (w *Writer) insertProc(maxSeqNum uint64) {
defer w.doneWG.Done()
var (
job insertJob
tx *sql.Tx
insert *sql.Stmt
ok bool
err error
newSeqNum uint64
now int64
wgs = make([]*sync.WaitGroup, 10)
)
var ()
BEGIN:
newSeqNum = maxSeqNum
wgs = wgs[:0]
job, ok = <-w.insertQ
if !ok {
return
}
tx, err = w.db.Begin()
must(err)
insert, err = tx.Prepare(sqlWALInsert)
must(err)
now = time.Now().Unix()
LOOP:
newSeqNum++
_, err = insert.Exec(
newSeqNum,
now,
job.Collection,
job.ID,
job.Store,
job.Data)
must(err)
if job.Ready != nil {
wgs = append(wgs, job.Ready)
}
select {
case job, ok = <-w.insertQ:
if ok {
goto LOOP
}
default:
}
goto COMMIT
COMMIT:
must(tx.Commit())
maxSeqNum = newSeqNum
for i := range wgs {
wgs[i].Done()
}
goto BEGIN
}