package kvstore import ( "database/sql" "sync" "time" ) type writer struct { db *sql.DB modQ chan modJob stop chan struct{} wg sync.WaitGroup } func newWriter(db *sql.DB) *writer { return &writer{ db: db, stop: make(chan struct{}, 1), modQ: make(chan modJob, modQSize), } } func (w *writer) Start(maxSeqNum uint64) { w.wg.Add(1) go w.run(maxSeqNum) } func (w *writer) Stop() { select { case w.stop <- struct{}{}: default: } w.wg.Wait() } // Takes ownership of the incoming data. func (w *writer) Store(collection string, id uint64, data []byte) { job := modJob{ Collection: collection, ID: id, Store: true, Data: data, Ready: &sync.WaitGroup{}, } job.Ready.Add(1) w.modQ <- job job.Ready.Wait() } func (w *writer) Delete(collection string, id uint64) { job := modJob{ Collection: collection, ID: id, Store: false, Ready: &sync.WaitGroup{}, } job.Ready.Add(1) w.modQ <- job job.Ready.Wait() } // Takes ownership of the incoming data. func (w *writer) StoreAsync(collection string, id uint64, data []byte) { w.modQ <- modJob{ Collection: collection, ID: id, Store: true, Data: data, } } func (w *writer) DeleteAsync(collection string, id uint64) { w.modQ <- modJob{ Collection: collection, ID: id, Store: false, } } func (w *writer) run(maxSeqNum uint64) { defer w.wg.Done() var ( mod modJob tx *sql.Tx insertData *sql.Stmt insertKV *sql.Stmt deleteData *sql.Stmt deleteKV *sql.Stmt insertLog *sql.Stmt err error newSeqNum uint64 now int64 wgs = make([]*sync.WaitGroup, 10) ) BEGIN: insertData = nil deleteData = nil newSeqNum = maxSeqNum wgs = wgs[:0] select { case mod = <-w.modQ: case <-w.stop: return } tx, err = w.db.Begin() must(err) now = time.Now().Unix() insertLog, err = tx.Prepare(sqlInsertLog) must(err) LOOP: if mod.Ready != nil { wgs = append(wgs, mod.Ready) } newSeqNum++ if mod.Store { goto STORE } else { goto DELETE } STORE: if insertData == nil { insertData, err = tx.Prepare(sqlInsertData) must(err) insertKV, err = tx.Prepare(sqlInsertKV) must(err) } _, err = insertData.Exec(newSeqNum, mod.Data) must(err) _, err = insertKV.Exec(mod.Collection, mod.ID, newSeqNum) must(err) _, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, true) must(err) RecycleDataBuf(mod.Data) goto NEXT DELETE: if deleteData == nil { deleteData, err = tx.Prepare(sqlDeleteData) must(err) deleteKV, err = tx.Prepare(sqlDeleteKV) must(err) } _, err = deleteData.Exec(mod.Collection, mod.ID) must(err) _, err = deleteKV.Exec(mod.Collection, mod.ID) must(err) _, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, false) must(err) goto NEXT NEXT: select { case mod = <-w.modQ: goto LOOP default: } goto COMMIT COMMIT: must(tx.Commit()) maxSeqNum = newSeqNum for i := range wgs { wgs[i].Done() } goto BEGIN }