parent
d90be1a6b5
commit
9b47a64827
9
codec.go
9
codec.go
|
@ -1,7 +1,10 @@
|
|||
package mdb
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
|
||||
"git.crumpington.com/private/mdb/kvstore"
|
||||
)
|
||||
|
||||
func decode[T any](data []byte) *T {
|
||||
|
@ -11,7 +14,7 @@ func decode[T any](data []byte) *T {
|
|||
}
|
||||
|
||||
func encode(item any) []byte {
|
||||
buf, err := json.Marshal(item)
|
||||
must(err)
|
||||
return buf
|
||||
w := bytes.NewBuffer(kvstore.GetDataBuf(0)[:0])
|
||||
must(json.NewEncoder(w).Encode(item))
|
||||
return w.Bytes()
|
||||
}
|
||||
|
|
|
@ -1,9 +1,36 @@
|
|||
package kvstore
|
||||
|
||||
import "time"
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
connTimeout = 16 * time.Second
|
||||
heartbeatInterval = 4 * time.Second
|
||||
pollInterval = 500 * time.Millisecond
|
||||
connTimeout = 16 * time.Second
|
||||
pollInterval = 500 * time.Millisecond
|
||||
modQSize = 1024
|
||||
|
||||
bufferPool = make(chan []byte, 1024)
|
||||
poolBufSize = 4096
|
||||
)
|
||||
|
||||
func GetDataBuf(size int) []byte {
|
||||
if size > poolBufSize {
|
||||
return make([]byte, size)
|
||||
}
|
||||
select {
|
||||
case b := <-bufferPool:
|
||||
return b[:size]
|
||||
default:
|
||||
return make([]byte, poolBufSize)[:size]
|
||||
}
|
||||
}
|
||||
|
||||
func RecycleDataBuf(b []byte) {
|
||||
if cap(b) != poolBufSize {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case bufferPool <- b:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,23 +22,20 @@ func (kv *KV) SyncRecv(conn net.Conn) {
|
|||
// It's important that we stop when this routine exits so that
|
||||
// all queued writes are committed to the database before syncing
|
||||
// has a chance to restart.
|
||||
//kv.startWriteLoop()
|
||||
//defer kv.stopWriteLoop()
|
||||
|
||||
w := newWriter(kv.db)
|
||||
w.Start(kv.MaxSeqNum())
|
||||
defer w.Stop()
|
||||
|
||||
headerBuf := make([]byte, recHeaderSize)
|
||||
buf := make([]byte, 8)
|
||||
nameBuf := make([]byte, 32)
|
||||
|
||||
afterSeqNum := kv.MaxSeqNum()
|
||||
expectedSeqNum := afterSeqNum + 1
|
||||
|
||||
// Send fromID to the conn.
|
||||
conn.SetWriteDeadline(time.Now().Add(connTimeout))
|
||||
binary.LittleEndian.PutUint64(buf, afterSeqNum)
|
||||
if _, err := conn.Write(buf); err != nil {
|
||||
binary.LittleEndian.PutUint64(nameBuf, afterSeqNum)
|
||||
if _, err := conn.Write(nameBuf); err != nil {
|
||||
log.Printf("RecvWAL failed to send after sequence number: %v", err)
|
||||
return
|
||||
}
|
||||
|
@ -51,7 +48,7 @@ func (kv *KV) SyncRecv(conn net.Conn) {
|
|||
log.Printf("RecvWAL failed to read header: %v", err)
|
||||
return
|
||||
}
|
||||
rec, colLen, dataLen := decodeRecHeader(headerBuf)
|
||||
rec, nameLen, dataLen := decodeRecHeader(headerBuf)
|
||||
|
||||
// Heartbeat.
|
||||
if rec.SeqNum == 0 {
|
||||
|
@ -65,29 +62,29 @@ func (kv *KV) SyncRecv(conn net.Conn) {
|
|||
}
|
||||
expectedSeqNum++
|
||||
|
||||
if cap(buf) < colLen {
|
||||
buf = make([]byte, colLen)
|
||||
if cap(nameBuf) < nameLen {
|
||||
nameBuf = make([]byte, nameLen)
|
||||
}
|
||||
buf = buf[:colLen]
|
||||
nameBuf = nameBuf[:nameLen]
|
||||
|
||||
if _, err := conn.Read(buf); err != nil {
|
||||
if _, err := conn.Read(nameBuf); err != nil {
|
||||
log.Printf("RecvWAL failed to read collection name: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
rec.Collection = string(buf)
|
||||
rec.Collection = string(nameBuf)
|
||||
|
||||
if rec.Store {
|
||||
rec.Data = make([]byte, dataLen)
|
||||
rec.Data = GetDataBuf(dataLen)
|
||||
if _, err := conn.Read(rec.Data); err != nil {
|
||||
log.Printf("RecvWAL failed to read data: %v", err)
|
||||
return
|
||||
}
|
||||
w.StoreAsync(rec.Collection, rec.ID, rec.Data)
|
||||
kv.onStore(rec.Collection, rec.ID, rec.Data)
|
||||
w.StoreAsync(rec.Collection, rec.ID, rec.Data)
|
||||
} else {
|
||||
w.DeleteAsync(rec.Collection, rec.ID)
|
||||
kv.onDelete(rec.Collection, rec.ID)
|
||||
w.DeleteAsync(rec.Collection, rec.ID)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ func (kv *KV) SyncSend(conn net.Conn) {
|
|||
seqNumBuf = make([]byte, 8)
|
||||
headerBuf = make([]byte, recHeaderSize)
|
||||
empty = make([]byte, recHeaderSize)
|
||||
tStart time.Time
|
||||
err error
|
||||
)
|
||||
|
||||
|
@ -29,14 +28,13 @@ func (kv *KV) SyncSend(conn net.Conn) {
|
|||
|
||||
POLL:
|
||||
|
||||
conn.SetWriteDeadline(time.Now().Add(connTimeout))
|
||||
tStart = time.Now()
|
||||
for time.Since(tStart) < heartbeatInterval {
|
||||
for i := 0; i < 8; i++ {
|
||||
if kv.MaxSeqNum() > afterSeqNum {
|
||||
goto REPLAY
|
||||
}
|
||||
time.Sleep(pollInterval)
|
||||
}
|
||||
|
||||
goto HEARTBEAT
|
||||
|
||||
HEARTBEAT:
|
||||
|
@ -91,6 +89,9 @@ func (kv *KV) replay(afterSeqNum uint64, each func(rec record) error) error {
|
|||
must(err)
|
||||
defer rows.Close()
|
||||
|
||||
rec.Data = GetDataBuf(0)
|
||||
defer RecycleDataBuf(rec.Data)
|
||||
|
||||
for rows.Next() {
|
||||
must(rows.Scan(
|
||||
&rec.SeqNum,
|
||||
|
@ -98,6 +99,7 @@ func (kv *KV) replay(afterSeqNum uint64, each func(rec record) error) error {
|
|||
&rec.ID,
|
||||
&rec.Store,
|
||||
&rec.Data))
|
||||
|
||||
if err = each(rec); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ func newWriter(db *sql.DB) *writer {
|
|||
return &writer{
|
||||
db: db,
|
||||
stop: make(chan struct{}, 1),
|
||||
modQ: make(chan modJob, 1024),
|
||||
modQ: make(chan modJob, modQSize),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -34,6 +34,7 @@ func (w *writer) Stop() {
|
|||
w.wg.Wait()
|
||||
}
|
||||
|
||||
// Takes ownership of the incoming data.
|
||||
func (w *writer) Store(collection string, id uint64, data []byte) {
|
||||
job := modJob{
|
||||
Collection: collection,
|
||||
|
@ -59,6 +60,7 @@ func (w *writer) Delete(collection string, id uint64) {
|
|||
job.Ready.Wait()
|
||||
}
|
||||
|
||||
// Takes ownership of the incoming data.
|
||||
func (w *writer) StoreAsync(collection string, id uint64, data []byte) {
|
||||
w.modQ <- modJob{
|
||||
Collection: collection,
|
||||
|
@ -80,7 +82,7 @@ func (w *writer) run(maxSeqNum uint64) {
|
|||
defer w.wg.Done()
|
||||
|
||||
var (
|
||||
job modJob
|
||||
mod modJob
|
||||
tx *sql.Tx
|
||||
insertData *sql.Stmt
|
||||
insertKV *sql.Stmt
|
||||
|
@ -102,7 +104,7 @@ BEGIN:
|
|||
wgs = wgs[:0]
|
||||
|
||||
select {
|
||||
case job = <-w.modQ:
|
||||
case mod = <-w.modQ:
|
||||
case <-w.stop:
|
||||
return
|
||||
}
|
||||
|
@ -117,12 +119,12 @@ BEGIN:
|
|||
|
||||
LOOP:
|
||||
|
||||
if job.Ready != nil {
|
||||
wgs = append(wgs, job.Ready)
|
||||
if mod.Ready != nil {
|
||||
wgs = append(wgs, mod.Ready)
|
||||
}
|
||||
|
||||
newSeqNum++
|
||||
if job.Store {
|
||||
if mod.Store {
|
||||
goto STORE
|
||||
} else {
|
||||
goto DELETE
|
||||
|
@ -137,13 +139,15 @@ STORE:
|
|||
must(err)
|
||||
}
|
||||
|
||||
_, err = insertData.Exec(newSeqNum, job.Data)
|
||||
_, err = insertData.Exec(newSeqNum, mod.Data)
|
||||
must(err)
|
||||
_, err = insertKV.Exec(job.Collection, job.ID, newSeqNum)
|
||||
_, err = insertKV.Exec(mod.Collection, mod.ID, newSeqNum)
|
||||
must(err)
|
||||
_, err = insertLog.Exec(newSeqNum, now, job.Collection, job.ID, true)
|
||||
_, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, true)
|
||||
must(err)
|
||||
|
||||
RecycleDataBuf(mod.Data)
|
||||
|
||||
goto NEXT
|
||||
|
||||
DELETE:
|
||||
|
@ -155,11 +159,11 @@ DELETE:
|
|||
must(err)
|
||||
}
|
||||
|
||||
_, err = deleteData.Exec(job.Collection, job.ID)
|
||||
_, err = deleteData.Exec(mod.Collection, mod.ID)
|
||||
must(err)
|
||||
_, err = deleteKV.Exec(job.Collection, job.ID)
|
||||
_, err = deleteKV.Exec(mod.Collection, mod.ID)
|
||||
must(err)
|
||||
_, err = insertLog.Exec(newSeqNum, now, job.Collection, job.ID, false)
|
||||
_, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, false)
|
||||
must(err)
|
||||
|
||||
goto NEXT
|
||||
|
@ -167,7 +171,7 @@ DELETE:
|
|||
NEXT:
|
||||
|
||||
select {
|
||||
case job = <-w.modQ:
|
||||
case mod = <-w.modQ:
|
||||
goto LOOP
|
||||
default:
|
||||
}
|
||||
|
|
Reference in New Issue