diff --git a/README.md b/README.md index a40dd4d..37df3fd 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ An in-process, in-memory database for Go. ## TO DO +* [ ] wal: delete old entries * [ ] wal: shipping_test: secondary too far behind * [ ] wal: writer_test: call RecvWAL twice * [ ] wal: writer_test double start / stop diff --git a/wal/db-sql.go b/wal/db-sql.go index d8c942c..712952c 100644 --- a/wal/db-sql.go +++ b/wal/db-sql.go @@ -9,6 +9,8 @@ CREATE TABLE IF NOT EXISTS wal( Store INTEGER NOT NULL, Data BLOB ) WITHOUT ROWID; + +CREATE INDEX IF NOT EXISTS wal_created_at_index ON wal(CreatedAt); ` const sqlWALMaxSeqNum = ` @@ -28,3 +30,6 @@ FROM WHERE SeqNum > ? ORDER BY SeqNum ASC` + +const sqlWALDeleteQuery = ` +DELETE FROM wal WHERE CreatedAt < ? AND SeqNum < (SELECT MAX(SeqNum) FROM wal)` diff --git a/wal/follower.go b/wal/follower.go index 98ca580..5dffb44 100644 --- a/wal/follower.go +++ b/wal/follower.go @@ -66,14 +66,11 @@ func (f *Follower) SendWAL(conn net.Conn) { defer conn.Close() var ( - buf = make([]byte, 8) - headerBuf = make([]byte, recHeaderSize) - empty = make([]byte, recHeaderSize) - timeout = 16 * time.Second - heartbeatInterval = time.Second * 2 - pollInterval = 200 * time.Millisecond - tStart time.Time - err error + buf = make([]byte, 8) + headerBuf = make([]byte, recHeaderSize) + empty = make([]byte, recHeaderSize) + tStart time.Time + err error ) // Read the fromID from the conn. @@ -87,7 +84,7 @@ func (f *Follower) SendWAL(conn net.Conn) { POLL: - conn.SetWriteDeadline(time.Now().Add(timeout)) + conn.SetWriteDeadline(time.Now().Add(connTimeout)) tStart = time.Now() for time.Since(tStart) < heartbeatInterval { if f.MaxSeqNum() > afterSeqNum { @@ -99,7 +96,7 @@ POLL: HEARTBEAT: - conn.SetWriteDeadline(time.Now().Add(timeout)) + conn.SetWriteDeadline(time.Now().Add(connTimeout)) if _, err := conn.Write(empty); err != nil { log.Printf("SendWAL failed to send heartbeat: %v", err) return @@ -110,7 +107,7 @@ HEARTBEAT: REPLAY: err = f.Replay(afterSeqNum, func(rec Record) error { - conn.SetWriteDeadline(time.Now().Add(timeout)) + conn.SetWriteDeadline(time.Now().Add(connTimeout)) afterSeqNum = rec.SeqNum encodeRecordHeader(rec, headerBuf) diff --git a/wal/global.go b/wal/global.go new file mode 100644 index 0000000..74a28e1 --- /dev/null +++ b/wal/global.go @@ -0,0 +1,9 @@ +package wal + +import "time" + +var ( + connTimeout = 16 * time.Second // For sending / receiving WAL. + heartbeatInterval = 2 * time.Second // Used in Follower.SendLog + pollInterval = 250 * time.Millisecond // Used in Follower.SendLog +) diff --git a/wal/shipping_test.go b/wal/shipping_test.go index 6dbd3b0..5a6c301 100644 --- a/wal/shipping_test.go +++ b/wal/shipping_test.go @@ -74,7 +74,7 @@ func TestShipping(t *testing.T) { defer nw.CloseClient() defer nw.CloseServer() - N := 2000 + N := 4000 sleepTime := time.Millisecond go func() { for i := 0; i < N; i++ { @@ -138,7 +138,39 @@ func TestShipping(t *testing.T) { } }) - run("TODO: secondary too far behind", func(t *testing.T, wWALPath, fWALPath string, w *Writer, nw *testconn.Network) { + run("secondary too far behind", func(t *testing.T, wWALPath, fWALPath string, w *Writer, nw *testconn.Network) { + // Write some entries to the primary. + // MaxSeqNum will be 10. + for i := 0; i < 10; i++ { + w.Store(randString(), randID(), _b(randString())) + } + + // Delete everything. + w.DeleteBefore(time.Now().Unix() + 2) + + // Run a sender in the background. + go func() { + f := NewFollower(wWALPath) + defer f.Close() + conn := nw.Accept() + f.SendWAL(conn) + }() + + // Run the follower. + go func() { + w := NewWriterSecondary(fWALPath) + defer w.Close() + conn := nw.Dial() + w.RecvWAL(conn) + }() + + time.Sleep(1) + + f := NewFollower(fWALPath) + defer f.Close() + if f.MaxSeqNum() != 0 { + t.Fatal(f.MaxSeqNum()) + } }) diff --git a/wal/writer.go b/wal/writer.go index 6a1ac2e..bb7a686 100644 --- a/wal/writer.go +++ b/wal/writer.go @@ -138,8 +138,6 @@ func (w *Writer) RecvWAL(conn net.Conn) { } defer w.recvLock.Unlock() - timeout := 16 * time.Second - headerBuf := make([]byte, recHeaderSize) buf := make([]byte, 8) @@ -147,7 +145,7 @@ func (w *Writer) RecvWAL(conn net.Conn) { expectedSeqNum := afterSeqNum + 1 // Send fromID to the conn. - conn.SetWriteDeadline(time.Now().Add(timeout)) + conn.SetWriteDeadline(time.Now().Add(connTimeout)) binary.LittleEndian.PutUint64(buf, afterSeqNum) if _, err := conn.Write(buf); err != nil { log.Printf("RecvWAL failed to send after sequence number: %v", err) @@ -160,7 +158,7 @@ func (w *Writer) RecvWAL(conn net.Conn) { defer w.stop() for { - conn.SetReadDeadline(time.Now().Add(timeout)) + conn.SetReadDeadline(time.Now().Add(connTimeout)) if _, err := conn.Read(headerBuf); err != nil { log.Printf("RecvWAL failed to read header: %v", err) return @@ -205,3 +203,8 @@ func (w *Writer) RecvWAL(conn net.Conn) { } } } + +func (w *Writer) DeleteBefore(ts int64) { + _, err := w.db.Exec(sqlWALDeleteQuery, ts) + must(err) +}