This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/kvstore/wal/writer.go

211 lines
3.8 KiB
Go

package wal
import (
"database/sql"
"encoding/binary"
"log"
"net"
"sync"
"time"
_ "github.com/mattn/go-sqlite3"
)
type insertJob struct {
Collection string
ID uint64
Store bool
Data []byte
Ready *sync.WaitGroup
}
type Writer struct {
primary bool
db *sql.DB
insert *sql.Stmt
lock sync.Mutex
running bool
insertQ chan insertJob
doneWG sync.WaitGroup
recvLock sync.Mutex
}
func NewWriterPrimary(walPath string) *Writer {
return newWriter(walPath, true)
}
func NewWriterSecondary(walPath string) *Writer {
return newWriter(walPath, false)
}
func newWriter(walPath string, primary bool) *Writer {
db := initWAL(walPath)
insert, err := db.Prepare(sqlWALInsert)
must(err)
w := &Writer{
primary: primary,
db: db,
insert: insert,
}
if primary {
w.start()
}
return w
}
func (w *Writer) Close() {
if w.db == nil {
return
}
w.stop()
w.db.Close()
w.db = nil
}
func (w *Writer) Store(collection string, id uint64, data []byte) {
if !w.primary {
panic("Store called on secondary.")
}
job := insertJob{
Collection: collection,
ID: id,
Store: true,
Data: data,
Ready: &sync.WaitGroup{},
}
job.Ready.Add(1)
w.insertQ <- job
job.Ready.Wait()
}
func (w *Writer) Delete(collection string, id uint64) {
if !w.primary {
panic("Delete called on secondary.")
}
job := insertJob{
Collection: collection,
ID: id,
Store: false,
Ready: &sync.WaitGroup{},
}
job.Ready.Add(1)
w.insertQ <- job
job.Ready.Wait()
}
// Called single-threaded from RecvWAL.
func (w *Writer) storeAsync(collection string, id uint64, data []byte) {
w.insertQ <- insertJob{
Collection: collection,
ID: id,
Store: true,
Data: data,
}
}
// Called single-threaded from RecvWAL.
func (w *Writer) deleteAsync(collection string, id uint64) {
w.insertQ <- insertJob{
Collection: collection,
ID: id,
Store: false,
}
}
func (w *Writer) MaxSeqNum() (n uint64) {
w.db.QueryRow(sqlWALMaxSeqNum).Scan(&n)
return
}
func (w *Writer) RecvWAL(conn net.Conn) {
defer conn.Close()
if w.primary {
panic("RecvWAL called on primary.")
}
if !w.recvLock.TryLock() {
log.Printf("Multiple calls to RecvWAL. Dropping connection.")
return
}
defer w.recvLock.Unlock()
headerBuf := make([]byte, recHeaderSize)
buf := make([]byte, 8)
afterSeqNum := w.MaxSeqNum()
expectedSeqNum := afterSeqNum + 1
// Send fromID to the conn.
conn.SetWriteDeadline(time.Now().Add(connTimeout))
binary.LittleEndian.PutUint64(buf, afterSeqNum)
if _, err := conn.Write(buf); err != nil {
log.Printf("RecvWAL failed to send after sequence number: %v", err)
return
}
conn.SetWriteDeadline(time.Time{})
// Start processing inserts.
w.start()
defer w.stop()
for {
conn.SetReadDeadline(time.Now().Add(connTimeout))
if _, err := conn.Read(headerBuf); err != nil {
log.Printf("RecvWAL failed to read header: %v", err)
return
}
rec, colLen, dataLen := decodeRecHeader(headerBuf)
// Heartbeat.
if rec.SeqNum == 0 {
continue
}
if rec.SeqNum != expectedSeqNum {
log.Printf("Expected sequence number %d but got %d.",
expectedSeqNum, rec.SeqNum)
return
}
expectedSeqNum++
if cap(buf) < colLen {
buf = make([]byte, colLen)
}
buf = buf[:colLen]
if _, err := conn.Read(buf); err != nil {
log.Printf("RecvWAL failed to collection name: %v", err)
return
}
rec.Collection = string(buf)
if rec.Store {
rec.Data = make([]byte, dataLen)
if _, err := conn.Read(rec.Data); err != nil {
log.Printf("RecvWAL failed to data: %v", err)
return
}
}
if rec.Store {
w.storeAsync(rec.Collection, rec.ID, rec.Data)
} else {
w.deleteAsync(rec.Collection, rec.ID)
}
}
}
func (w *Writer) DeleteBefore(seconds int64) {
_, err := w.db.Exec(sqlWALDeleteQuery, time.Now().Unix()-seconds)
must(err)
}