From 9b47a64827ac4bffeedabd50119d10881716b12a Mon Sep 17 00:00:00 2001 From: jdl Date: Sat, 30 Jul 2022 05:58:35 +0200 Subject: [PATCH] Code cleanup, buffer pool --- codec.go | 9 ++++++--- kvstore/globals.go | 35 +++++++++++++++++++++++++++++++---- kvstore/sync-recv.go | 27 ++++++++++++--------------- kvstore/sync-send.go | 10 ++++++---- kvstore/writer.go | 30 +++++++++++++++++------------- 5 files changed, 72 insertions(+), 39 deletions(-) diff --git a/codec.go b/codec.go index d8f4ad6..0a9c48b 100644 --- a/codec.go +++ b/codec.go @@ -1,7 +1,10 @@ package mdb import ( + "bytes" "encoding/json" + + "git.crumpington.com/private/mdb/kvstore" ) func decode[T any](data []byte) *T { @@ -11,7 +14,7 @@ func decode[T any](data []byte) *T { } func encode(item any) []byte { - buf, err := json.Marshal(item) - must(err) - return buf + w := bytes.NewBuffer(kvstore.GetDataBuf(0)[:0]) + must(json.NewEncoder(w).Encode(item)) + return w.Bytes() } diff --git a/kvstore/globals.go b/kvstore/globals.go index 2b648ad..2436b37 100644 --- a/kvstore/globals.go +++ b/kvstore/globals.go @@ -1,9 +1,36 @@ package kvstore -import "time" +import ( + "time" +) var ( - connTimeout = 16 * time.Second - heartbeatInterval = 4 * time.Second - pollInterval = 500 * time.Millisecond + connTimeout = 16 * time.Second + pollInterval = 500 * time.Millisecond + modQSize = 1024 + + bufferPool = make(chan []byte, 1024) + poolBufSize = 4096 ) + +func GetDataBuf(size int) []byte { + if size > poolBufSize { + return make([]byte, size) + } + select { + case b := <-bufferPool: + return b[:size] + default: + return make([]byte, poolBufSize)[:size] + } +} + +func RecycleDataBuf(b []byte) { + if cap(b) != poolBufSize { + return + } + select { + case bufferPool <- b: + default: + } +} diff --git a/kvstore/sync-recv.go b/kvstore/sync-recv.go index b7c54ed..c016800 100644 --- a/kvstore/sync-recv.go +++ b/kvstore/sync-recv.go @@ -22,23 +22,20 @@ func (kv *KV) SyncRecv(conn net.Conn) { // It's important that we stop when this routine exits so that // all queued writes are committed to the database before syncing // has a chance to restart. - //kv.startWriteLoop() - //defer kv.stopWriteLoop() - w := newWriter(kv.db) w.Start(kv.MaxSeqNum()) defer w.Stop() headerBuf := make([]byte, recHeaderSize) - buf := make([]byte, 8) + nameBuf := make([]byte, 32) afterSeqNum := kv.MaxSeqNum() expectedSeqNum := afterSeqNum + 1 // Send fromID to the conn. conn.SetWriteDeadline(time.Now().Add(connTimeout)) - binary.LittleEndian.PutUint64(buf, afterSeqNum) - if _, err := conn.Write(buf); err != nil { + binary.LittleEndian.PutUint64(nameBuf, afterSeqNum) + if _, err := conn.Write(nameBuf); err != nil { log.Printf("RecvWAL failed to send after sequence number: %v", err) return } @@ -51,7 +48,7 @@ func (kv *KV) SyncRecv(conn net.Conn) { log.Printf("RecvWAL failed to read header: %v", err) return } - rec, colLen, dataLen := decodeRecHeader(headerBuf) + rec, nameLen, dataLen := decodeRecHeader(headerBuf) // Heartbeat. if rec.SeqNum == 0 { @@ -65,29 +62,29 @@ func (kv *KV) SyncRecv(conn net.Conn) { } expectedSeqNum++ - if cap(buf) < colLen { - buf = make([]byte, colLen) + if cap(nameBuf) < nameLen { + nameBuf = make([]byte, nameLen) } - buf = buf[:colLen] + nameBuf = nameBuf[:nameLen] - if _, err := conn.Read(buf); err != nil { + if _, err := conn.Read(nameBuf); err != nil { log.Printf("RecvWAL failed to read collection name: %v", err) return } - rec.Collection = string(buf) + rec.Collection = string(nameBuf) if rec.Store { - rec.Data = make([]byte, dataLen) + rec.Data = GetDataBuf(dataLen) if _, err := conn.Read(rec.Data); err != nil { log.Printf("RecvWAL failed to read data: %v", err) return } - w.StoreAsync(rec.Collection, rec.ID, rec.Data) kv.onStore(rec.Collection, rec.ID, rec.Data) + w.StoreAsync(rec.Collection, rec.ID, rec.Data) } else { - w.DeleteAsync(rec.Collection, rec.ID) kv.onDelete(rec.Collection, rec.ID) + w.DeleteAsync(rec.Collection, rec.ID) } } } diff --git a/kvstore/sync-send.go b/kvstore/sync-send.go index 71438ad..0737c42 100644 --- a/kvstore/sync-send.go +++ b/kvstore/sync-send.go @@ -14,7 +14,6 @@ func (kv *KV) SyncSend(conn net.Conn) { seqNumBuf = make([]byte, 8) headerBuf = make([]byte, recHeaderSize) empty = make([]byte, recHeaderSize) - tStart time.Time err error ) @@ -29,14 +28,13 @@ func (kv *KV) SyncSend(conn net.Conn) { POLL: - conn.SetWriteDeadline(time.Now().Add(connTimeout)) - tStart = time.Now() - for time.Since(tStart) < heartbeatInterval { + for i := 0; i < 8; i++ { if kv.MaxSeqNum() > afterSeqNum { goto REPLAY } time.Sleep(pollInterval) } + goto HEARTBEAT HEARTBEAT: @@ -91,6 +89,9 @@ func (kv *KV) replay(afterSeqNum uint64, each func(rec record) error) error { must(err) defer rows.Close() + rec.Data = GetDataBuf(0) + defer RecycleDataBuf(rec.Data) + for rows.Next() { must(rows.Scan( &rec.SeqNum, @@ -98,6 +99,7 @@ func (kv *KV) replay(afterSeqNum uint64, each func(rec record) error) error { &rec.ID, &rec.Store, &rec.Data)) + if err = each(rec); err != nil { return err } diff --git a/kvstore/writer.go b/kvstore/writer.go index 6b2a722..9fed4a2 100644 --- a/kvstore/writer.go +++ b/kvstore/writer.go @@ -17,7 +17,7 @@ func newWriter(db *sql.DB) *writer { return &writer{ db: db, stop: make(chan struct{}, 1), - modQ: make(chan modJob, 1024), + modQ: make(chan modJob, modQSize), } } @@ -34,6 +34,7 @@ func (w *writer) Stop() { w.wg.Wait() } +// Takes ownership of the incoming data. func (w *writer) Store(collection string, id uint64, data []byte) { job := modJob{ Collection: collection, @@ -59,6 +60,7 @@ func (w *writer) Delete(collection string, id uint64) { job.Ready.Wait() } +// Takes ownership of the incoming data. func (w *writer) StoreAsync(collection string, id uint64, data []byte) { w.modQ <- modJob{ Collection: collection, @@ -80,7 +82,7 @@ func (w *writer) run(maxSeqNum uint64) { defer w.wg.Done() var ( - job modJob + mod modJob tx *sql.Tx insertData *sql.Stmt insertKV *sql.Stmt @@ -102,7 +104,7 @@ BEGIN: wgs = wgs[:0] select { - case job = <-w.modQ: + case mod = <-w.modQ: case <-w.stop: return } @@ -117,12 +119,12 @@ BEGIN: LOOP: - if job.Ready != nil { - wgs = append(wgs, job.Ready) + if mod.Ready != nil { + wgs = append(wgs, mod.Ready) } newSeqNum++ - if job.Store { + if mod.Store { goto STORE } else { goto DELETE @@ -137,13 +139,15 @@ STORE: must(err) } - _, err = insertData.Exec(newSeqNum, job.Data) + _, err = insertData.Exec(newSeqNum, mod.Data) must(err) - _, err = insertKV.Exec(job.Collection, job.ID, newSeqNum) + _, err = insertKV.Exec(mod.Collection, mod.ID, newSeqNum) must(err) - _, err = insertLog.Exec(newSeqNum, now, job.Collection, job.ID, true) + _, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, true) must(err) + RecycleDataBuf(mod.Data) + goto NEXT DELETE: @@ -155,11 +159,11 @@ DELETE: must(err) } - _, err = deleteData.Exec(job.Collection, job.ID) + _, err = deleteData.Exec(mod.Collection, mod.ID) must(err) - _, err = deleteKV.Exec(job.Collection, job.ID) + _, err = deleteKV.Exec(mod.Collection, mod.ID) must(err) - _, err = insertLog.Exec(newSeqNum, now, job.Collection, job.ID, false) + _, err = insertLog.Exec(newSeqNum, now, mod.Collection, mod.ID, false) must(err) goto NEXT @@ -167,7 +171,7 @@ DELETE: NEXT: select { - case job = <-w.modQ: + case mod = <-w.modQ: goto LOOP default: }