master
jdl 2022-07-25 23:16:58 +02:00
parent 3695fd5018
commit 0e0050db9e
7 changed files with 47 additions and 153 deletions

View File

@ -4,6 +4,7 @@ An in-process, in-memory database for Go.
## TO DO
* [ ] kvstore: tests
* [ ] wal: writer lock via flock?
## Structure

5
wal/dep-graph.sh Executable file
View File

@ -0,0 +1,5 @@
#!/bin/bash
godepgraph -s . > .deps.dot && xdot .deps.dot
rm .deps.dot

View File

@ -36,7 +36,7 @@ func TestFollower(t *testing.T) {
t.Run(name, func(t *testing.T) {
walPath := randPath() + ".wal"
defer os.RemoveAll(walPath)
w := NewWriter(walPath, true)
w := newWriter(walPath, true)
defer w.Close()
f := NewFollower(walPath)
defer f.Close()

View File

@ -2,9 +2,6 @@ package wal
import (
"encoding/binary"
"log"
"net"
"time"
)
const recHeaderSize = 22
@ -48,133 +45,3 @@ func decodeRecHeader(header []byte) (rec Record, colLen, dataLen int) {
return
}
func SendWAL(walPath string, conn net.Conn) {
defer conn.Close()
buf := make([]byte, 8)
headerBuf := make([]byte, recHeaderSize)
empty := make([]byte, recHeaderSize)
// Read the fromID from the conn.
conn.SetReadDeadline(time.Now().Add(16 * time.Second))
if _, err := conn.Read(buf[:8]); err != nil {
log.Printf("SendWAL failed to read from ID: %v", err)
return
}
afterSeqNum := binary.LittleEndian.Uint64(buf[:8])
follower := NewFollower(walPath)
defer follower.Close()
for {
conn.SetWriteDeadline(time.Now().Add(16 * time.Second))
// Nothing to do.
if follower.MaxSeqNum() <= afterSeqNum {
if _, err := conn.Write(empty); err != nil {
log.Printf("SendWAL failed to send heartbeat: %v", err)
return
}
time.Sleep(time.Second)
continue
}
err := follower.Replay(afterSeqNum, func(rec Record) error {
afterSeqNum = rec.SeqNum
encodeRecordHeader(rec, headerBuf)
if _, err := conn.Write(headerBuf); err != nil {
log.Printf("SendWAL failed to send header %v", err)
return err
}
if _, err := conn.Write([]byte(rec.Collection)); err != nil {
log.Printf("SendWAL failed to send collection name %v", err)
return err
}
if !rec.Store {
return nil
}
if _, err := conn.Write(rec.Data); err != nil {
log.Printf("SendWAL failed to send data %v", err)
return err
}
return nil
})
if err != nil {
return
}
}
}
func RecvWAL(walPath string, conn net.Conn) {
defer conn.Close()
headerBuf := make([]byte, recHeaderSize)
buf := make([]byte, 8)
w := NewWriter(walPath, true)
defer w.Close()
afterSeqNum := w.MaxSeqNum()
expectedSeqNum := afterSeqNum + 1
// Send fromID to the conn.
conn.SetWriteDeadline(time.Now().Add(time.Minute))
binary.LittleEndian.PutUint64(buf, afterSeqNum)
if _, err := conn.Write(buf); err != nil {
log.Printf("RecvWAL failed to send after sequence number: %v", err)
return
}
conn.SetWriteDeadline(time.Time{})
for {
conn.SetReadDeadline(time.Now().Add(time.Minute))
if _, err := conn.Read(headerBuf); err != nil {
log.Printf("RecvWAL failed to read header: %v", err)
return
}
rec, colLen, dataLen := decodeRecHeader(headerBuf)
// Heartbeat.
if rec.SeqNum == 0 {
continue
}
if rec.SeqNum != expectedSeqNum {
log.Printf("Expected sequence number %d but got %d.",
expectedSeqNum, rec.SeqNum)
return
}
expectedSeqNum++
if cap(buf) < colLen {
buf = make([]byte, colLen)
}
buf = buf[:colLen]
if _, err := conn.Read(buf); err != nil {
log.Printf("RecvWAL failed to collection name: %v", err)
return
}
rec.Collection = string(buf)
if rec.Store {
rec.Data = make([]byte, dataLen)
if _, err := conn.Read(rec.Data); err != nil {
log.Printf("RecvWAL failed to data: %v", err)
return
}
}
if rec.Store {
w.storeAsync(rec.Collection, rec.ID, rec.Data)
} else {
w.deleteAsync(rec.Collection, rec.ID)
}
}
}

View File

@ -10,7 +10,7 @@ import (
"git.crumpington.com/private/mdb/testconn"
)
func TestShip(t *testing.T) {
func TestShipping(t *testing.T) {
run := func(name string, inner func(
t *testing.T,
wWALPath string,
@ -21,7 +21,7 @@ func TestShip(t *testing.T) {
t.Run(name, func(t *testing.T) {
wWALPath := randPath() + ".wal"
fWALPath := randPath() + ".wal"
w := NewWriter(wWALPath, true)
w := NewWriterPrimary(wWALPath)
defer w.Close()
nw := testconn.NewNetwork()
@ -53,8 +53,9 @@ func TestShip(t *testing.T) {
// Run the follower.
go func() {
w := NewWriterSecondary(fWALPath)
conn := nw.Dial()
RecvWAL(fWALPath, conn)
w.RecvWAL(conn)
}()
time.Sleep(time.Second)
@ -70,10 +71,12 @@ func TestShip(t *testing.T) {
})
run("net failures", func(t *testing.T, wWALPath, fWALPath string, w *Writer, nw *testconn.Network) {
defer nw.CloseClient()
defer nw.CloseServer()
N := 2000
sleepTime := time.Millisecond
go func() {
time.Sleep(4 * time.Second)
for i := 0; i < N; i++ {
time.Sleep(sleepTime)
if rand.Float64() < 0.9 {
@ -88,6 +91,7 @@ func TestShip(t *testing.T) {
go func() {
sender := NewFollower(wWALPath)
f := NewFollower(fWALPath)
defer f.Close()
for f.MaxSeqNum() < uint64(N) {
conn := nw.Accept()
@ -98,16 +102,21 @@ func TestShip(t *testing.T) {
// Run the follower in the background.
go func() {
f := NewFollower(fWALPath)
defer f.Close()
w := NewWriterSecondary(fWALPath)
for f.MaxSeqNum() < uint64(N) {
conn := nw.Dial()
RecvWAL(fWALPath, conn)
w.RecvWAL(conn)
}
}()
// Disconnect the network randomly.
go func() {
f := NewFollower(fWALPath)
defer f.Close()
for f.MaxSeqNum() < uint64(N) {
time.Sleep(time.Duration(rand.Intn(10 * int(sleepTime))))
if rand.Float64() < 0.5 {
@ -118,7 +127,6 @@ func TestShip(t *testing.T) {
}
}()
time.Sleep(time.Second)
f := NewFollower(fWALPath)
defer f.Close()
@ -129,6 +137,11 @@ func TestShip(t *testing.T) {
t.Fatal(err)
}
})
run("TODO: secondary too far behind", func(t *testing.T, wWALPath, fWALPath string, w *Writer, nw *testconn.Network) {
})
}
func TestShippingEncoding(t *testing.T) {

View File

@ -32,7 +32,15 @@ type Writer struct {
recvLock sync.Mutex
}
func NewWriter(walPath string, primary bool) *Writer {
func NewWriterPrimary(walPath string) *Writer {
return newWriter(walPath, true)
}
func NewWriterSecondary(walPath string) *Writer {
return newWriter(walPath, false)
}
func newWriter(walPath string, primary bool) *Writer {
db := initWAL(walPath)
insert, err := db.Prepare(sqlWALInsert)
@ -62,7 +70,7 @@ func (w *Writer) Close() {
func (w *Writer) Store(collection string, id uint64, data []byte) {
if !w.primary {
//panic("Store called on secondary.")
panic("Store called on secondary.")
}
job := insertJob{
@ -79,7 +87,7 @@ func (w *Writer) Store(collection string, id uint64, data []byte) {
func (w *Writer) Delete(collection string, id uint64) {
if !w.primary {
//panic("Delete called on secondary.")
panic("Delete called on secondary.")
}
job := insertJob{
@ -121,7 +129,7 @@ func (w *Writer) RecvWAL(conn net.Conn) {
defer conn.Close()
if w.primary {
//panic("RecvWAL called on primary.")
panic("RecvWAL called on primary.")
}
if !w.recvLock.TryLock() {

View File

@ -29,7 +29,7 @@ func TestWriter(t *testing.T) {
t.Run(name, func(t *testing.T) {
walPath := randPath() + ".wal"
defer os.RemoveAll(walPath)
w := NewWriter(walPath, true)
w := newWriter(walPath, true)
defer w.Close()
inner(t, walPath, w)
})
@ -57,15 +57,15 @@ func TestWriter(t *testing.T) {
w.Store("a", 1, _b("Hello"))
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.Delete("b", 1)
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.Store("a", 2, _b("World"))
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.Store("a", 1, _b("Good bye"))
err := walEqual(walPath, []Record{
@ -168,19 +168,19 @@ func TestWriter(t *testing.T) {
run("store delete async with close", func(t *testing.T, walPath string, w *Writer) {
w.storeAsync("a", 1, _b("hello1"))
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.storeAsync("a", 2, _b("hello2"))
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.deleteAsync("a", 1)
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.storeAsync("a", 3, _b("hello3"))
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.storeAsync("b", 1, _b("b1"))
w.Close()
w = NewWriter(walPath, true)
w = newWriter(walPath, true)
w.waitForSeqNum(5)