From 88f12256e7477a30219340df065aaaf578b3a796 Mon Sep 17 00:00:00 2001 From: jdl Date: Mon, 25 Jul 2022 07:39:16 +0200 Subject: [PATCH] WAL written and tested. --- README.md | 50 ++++++++++- wal/db-sql.go | 30 +++++++ wal/follower.go | 59 ++++++++++++ wal/follower_test.go | 123 +++++++++++++++++++++++++ wal/init.go | 20 +++++ wal/shipping.go | 180 +++++++++++++++++++++++++++++++++++++ wal/shipping_test.go | 155 ++++++++++++++++++++++++++++++++ wal/util.go | 7 ++ wal/util_test.go | 66 ++++++++++++++ wal/writer.go | 176 ++++++++++++++++++++++++++++++++++++ wal/writer_test.go | 208 +++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 1073 insertions(+), 1 deletion(-) create mode 100644 wal/db-sql.go create mode 100644 wal/follower.go create mode 100644 wal/follower_test.go create mode 100644 wal/init.go create mode 100644 wal/shipping.go create mode 100644 wal/shipping_test.go create mode 100644 wal/util.go create mode 100644 wal/util_test.go create mode 100644 wal/writer.go create mode 100644 wal/writer_test.go diff --git a/README.md b/README.md index ea962f4..44186f3 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,51 @@ # mdb -In-memory database code. \ No newline at end of file +An in-process, in-memory database for Go. + +## TO DO + +* [ ] wal: writer lock via flock? + +## Structure + +### Files + +``` +wal//data +wal//length +wal//archived +max-committed-id +write-lock +commit-lock +``` + +`` is the unix timestamp / 86400. Therefore a new segment will be +created every day. The data file is append-only, and the length is written +atomically. + +`write-lock` is used by `flock` to ensure there's only one +writer. `commit-lock` is used by the commit routine. This routine will commit +data to the database and update `max-committed-id` atomically. + +### Code + +``` +retry/ +wal/ + segment/ +storage/ +``` + +## WAL File Format + +The WAL file consists of an 8 byte minimum ID followed by a stream of entries +of the form + +``` +(4) Data length +(Data length) Data +(8) ID. +``` + +This allows us to quickly get the min and max IDs from the file, and iterate +though the segment from the beginning. diff --git a/wal/db-sql.go b/wal/db-sql.go new file mode 100644 index 0000000..d8c942c --- /dev/null +++ b/wal/db-sql.go @@ -0,0 +1,30 @@ +package wal + +const sqlSchema = ` +CREATE TABLE IF NOT EXISTS wal( + SeqNum INTEGER NOT NULL PRIMARY KEY, + CreatedAt INTEGER NOT NULL, + Collection TEXT NOT NULL, + ID INTEGER NOT NULL, + Store INTEGER NOT NULL, + Data BLOB +) WITHOUT ROWID; +` + +const sqlWALMaxSeqNum = ` +SELECT COALESCE(MAX(SeqNum), 0) FROM wal; +` + +const sqlWALInsert = ` +INSERT INTO wal( + SeqNum,CreatedAt,Collection,ID,Store,Data +) VALUES (?,?,?,?,?,?)` + +const sqlWALFollowQuery = ` +SELECT + SeqNum,Collection,ID,Store,Data +FROM + wal +WHERE + SeqNum > ? +ORDER BY SeqNum ASC` diff --git a/wal/follower.go b/wal/follower.go new file mode 100644 index 0000000..53af3c1 --- /dev/null +++ b/wal/follower.go @@ -0,0 +1,59 @@ +package wal + +import ( + "database/sql" +) + +type Record struct { + SeqNum uint64 + Collection string + ID uint64 + Store bool + Data []byte +} + +type Follower struct { + db *sql.DB + selectStmt *sql.Stmt +} + +func NewFollower(walPath string) *Follower { + db := initWAL(walPath) + + selectStmt, err := db.Prepare(sqlWALFollowQuery) + must(err) + + return &Follower{ + db: db, + selectStmt: selectStmt, + } +} + +func (f *Follower) Close() { + f.db.Close() +} + +func (f *Follower) MaxSeqNum() (n uint64) { + must(f.db.QueryRow(sqlWALMaxSeqNum).Scan(&n)) + return +} + +func (f *Follower) Replay(afterSeqNum uint64, each func(rec Record) error) error { + rec := Record{} + rows, err := f.selectStmt.Query(afterSeqNum) + must(err) + defer rows.Close() + + for rows.Next() { + must(rows.Scan( + &rec.SeqNum, + &rec.Collection, + &rec.ID, + &rec.Store, + &rec.Data)) + if err = each(rec); err != nil { + return err + } + } + return nil +} diff --git a/wal/follower_test.go b/wal/follower_test.go new file mode 100644 index 0000000..bfa419a --- /dev/null +++ b/wal/follower_test.go @@ -0,0 +1,123 @@ +package wal + +import ( + "errors" + "log" + "os" + "testing" + "time" +) + +// ---------------------------------------------------------------------------- + +func (f *Follower) getReplay(afterSeqNum uint64) (l []Record) { + f.Replay(afterSeqNum, func(rec Record) error { + l = append(l, rec) + return nil + }) + return l +} + +func (f *Follower) waitForSeqNum(n uint64) { + for { + if f.MaxSeqNum() == n { + return + } + time.Sleep(time.Millisecond) + } +} + +// ---------------------------------------------------------------------------- + +func TestFollower(t *testing.T) { + + run := func(name string, inner func(t *testing.T, walPath string, w *Writer, f *Follower)) { + t.Run(name, func(t *testing.T) { + walPath := randPath() + ".wal" + log.Print(walPath) + defer os.RemoveAll(walPath) + w := NewWriter(walPath) + defer w.Close() + f := NewFollower(walPath) + defer f.Close() + inner(t, walPath, w, f) + }) + } + + run("simple", func(t *testing.T, walPath string, w *Writer, f *Follower) { + w.Store("a", 1, _b("Hello")) + w.Delete("b", 1) + w.Store("a", 2, _b("World")) + w.Store("a", 1, _b("Good bye")) + + expected := []Record{ + {SeqNum: 1, Collection: "a", ID: 1, Store: true, Data: _b("Hello")}, + {SeqNum: 2, Collection: "b", ID: 1}, + {SeqNum: 3, Collection: "a", ID: 2, Store: true, Data: _b("World")}, + {SeqNum: 4, Collection: "a", ID: 1, Store: true, Data: _b("Good bye")}, + } + + recs := f.getReplay(0) + if err := recsEqual(recs, expected); err != nil { + t.Fatal(err) + } + + for i := 1; i < 4; i++ { + recs = f.getReplay(uint64(i)) + if err := recsEqual(recs, expected[i:]); err != nil { + t.Fatal(err) + } + } + }) + + run("write async", func(t *testing.T, walPath string, w *Writer, f *Follower) { + w.storeAsync("a", 1, _b("hello1")) + w.storeAsync("a", 2, _b("hello2")) + w.deleteAsync("a", 1) + w.storeAsync("a", 3, _b("hello3")) + w.storeAsync("b", 1, _b("b1")) + + f.waitForSeqNum(5) + + expected := []Record{ + {SeqNum: 1, Collection: "a", ID: 1, Store: true, Data: _b("hello1")}, + {SeqNum: 2, Collection: "a", ID: 2, Store: true, Data: _b("hello2")}, + {SeqNum: 3, Collection: "a", ID: 1, Store: false}, + {SeqNum: 4, Collection: "a", ID: 3, Store: true, Data: _b("hello3")}, + {SeqNum: 5, Collection: "b", ID: 1, Store: true, Data: _b("b1")}, + } + + recs := f.getReplay(0) + if err := recsEqual(recs, expected); err != nil { + t.Fatal(err) + } + + for i := 1; i < 4; i++ { + recs = f.getReplay(uint64(i)) + if err := recsEqual(recs, expected[i:]); err != nil { + t.Fatal(err) + } + } + }) + + run("replay error", func(t *testing.T, walPath string, w *Writer, f *Follower) { + expectedErr := errors.New("My error") + + w.Store("a", 1, _b("Hello")) + w.Delete("b", 1) + w.Store("a", 2, _b("World")) + w.Store("a", 1, _b("Good bye")) + + err := f.Replay(0, func(rec Record) error { + if rec.Collection == "b" { + return expectedErr + } + return nil + }) + + if err != expectedErr { + t.Fatal(err) + } + }) + +} diff --git a/wal/init.go b/wal/init.go new file mode 100644 index 0000000..0b0331d --- /dev/null +++ b/wal/init.go @@ -0,0 +1,20 @@ +package wal + +import ( + "database/sql" + "sync" +) + +var initLock sync.Mutex + +func initWAL(walPath string) *sql.DB { + initLock.Lock() + defer initLock.Unlock() + + db, err := sql.Open("sqlite3", walPath+"?_journal=WAL") + must(err) + + _, err = db.Exec(sqlSchema) + must(err) + return db +} diff --git a/wal/shipping.go b/wal/shipping.go new file mode 100644 index 0000000..c4196ca --- /dev/null +++ b/wal/shipping.go @@ -0,0 +1,180 @@ +package wal + +import ( + "encoding/binary" + "log" + "net" + "time" +) + +const recHeaderSize = 22 + +func encodeRecordHeader(rec Record, buf []byte) { + // SeqNum (8) + // ID (8) + // DataLen (4) + // Store (1) + // CollectionLen (1) + + binary.LittleEndian.PutUint64(buf[:8], rec.SeqNum) + buf = buf[8:] + binary.LittleEndian.PutUint64(buf[:8], rec.ID) + buf = buf[8:] + + if rec.Store { + binary.LittleEndian.PutUint32(buf[:4], uint32(len(rec.Data))) + buf[4] = 1 + } else { + binary.LittleEndian.PutUint32(buf[:4], 0) + buf[4] = 0 + } + buf = buf[5:] + + buf[0] = byte(len(rec.Collection)) +} + +func decodeRecHeader(header []byte) (rec Record, colLen, dataLen int) { + buf := header + + rec.SeqNum = binary.LittleEndian.Uint64(buf[:8]) + buf = buf[8:] + rec.ID = binary.LittleEndian.Uint64(buf[:8]) + buf = buf[8:] + dataLen = int(binary.LittleEndian.Uint32(buf[:4])) + buf = buf[4:] + rec.Store = buf[0] != 0 + buf = buf[1:] + colLen = int(buf[0]) + + return +} + +func SendWAL(walPath string, conn net.Conn) { + defer conn.Close() + + buf := make([]byte, 8) + headerBuf := make([]byte, recHeaderSize) + empty := make([]byte, recHeaderSize) + + // Read the fromID from the conn. + conn.SetReadDeadline(time.Now().Add(16 * time.Second)) + if _, err := conn.Read(buf[:8]); err != nil { + log.Printf("SendWAL failed to read from ID: %v", err) + return + } + + afterSeqNum := binary.LittleEndian.Uint64(buf[:8]) + + follower := NewFollower(walPath) + defer follower.Close() + + for { + conn.SetWriteDeadline(time.Now().Add(16 * time.Second)) + + // Nothing to do. + if follower.MaxSeqNum() <= afterSeqNum { + if _, err := conn.Write(empty); err != nil { + log.Printf("SendWAL failed to send heartbeat: %v", err) + return + } + time.Sleep(time.Second) + continue + } + + err := follower.Replay(afterSeqNum, func(rec Record) error { + afterSeqNum = rec.SeqNum + encodeRecordHeader(rec, headerBuf) + if _, err := conn.Write(headerBuf); err != nil { + log.Printf("SendWAL failed to send header %v", err) + return err + } + + if _, err := conn.Write([]byte(rec.Collection)); err != nil { + log.Printf("SendWAL failed to send collection name %v", err) + return err + } + + if !rec.Store { + return nil + } + + if _, err := conn.Write(rec.Data); err != nil { + log.Printf("SendWAL failed to send data %v", err) + return err + } + return nil + }) + + if err != nil { + return + } + } +} + +func RecvWAL(walPath string, conn net.Conn) { + defer conn.Close() + + headerBuf := make([]byte, recHeaderSize) + buf := make([]byte, 8) + + w := NewWriter(walPath) + defer w.Close() + + afterSeqNum := w.MaxSeqNum() + expectedSeqNum := afterSeqNum + 1 + + // Send fromID to the conn. + conn.SetWriteDeadline(time.Now().Add(time.Minute)) + binary.LittleEndian.PutUint64(buf, afterSeqNum) + if _, err := conn.Write(buf); err != nil { + log.Printf("RecvWAL failed to send after sequence number: %v", err) + return + } + conn.SetWriteDeadline(time.Time{}) + + for { + conn.SetReadDeadline(time.Now().Add(time.Minute)) + if _, err := conn.Read(headerBuf); err != nil { + log.Printf("RecvWAL failed to read header: %v", err) + return + } + rec, colLen, dataLen := decodeRecHeader(headerBuf) + + // Heartbeat. + if rec.SeqNum == 0 { + continue + } + + if rec.SeqNum != expectedSeqNum { + log.Printf("Expected sequence number %d but got %d.", + expectedSeqNum, rec.SeqNum) + return + } + expectedSeqNum++ + + if cap(buf) < colLen { + buf = make([]byte, colLen) + } + buf = buf[:colLen] + + if _, err := conn.Read(buf); err != nil { + log.Printf("RecvWAL failed to collection name: %v", err) + return + } + + rec.Collection = string(buf) + + if rec.Store { + rec.Data = make([]byte, dataLen) + if _, err := conn.Read(rec.Data); err != nil { + log.Printf("RecvWAL failed to data: %v", err) + return + } + } + if rec.Store { + w.storeAsync(rec.Collection, rec.ID, rec.Data) + } else { + w.deleteAsync(rec.Collection, rec.ID) + } + } +} diff --git a/wal/shipping_test.go b/wal/shipping_test.go new file mode 100644 index 0000000..6270c5d --- /dev/null +++ b/wal/shipping_test.go @@ -0,0 +1,155 @@ +package wal + +import ( + "fmt" + "math/rand" + "mdb/testconn" + "os" + "testing" + "time" +) + +func TestShipp(t *testing.T) { + run := func(name string, inner func( + t *testing.T, + wWALPath string, + fWALPath string, + w *Writer, + nw *testconn.Network, + )) { + t.Run(name, func(t *testing.T) { + wWALPath := randPath() + ".wal" + fWALPath := randPath() + ".wal" + w := NewWriter(wWALPath) + defer w.Close() + + nw := testconn.NewNetwork() + defer nw.CloseClient() + defer nw.CloseServer() + + defer os.RemoveAll(wWALPath) + defer os.RemoveAll(fWALPath) + + inner(t, wWALPath, fWALPath, w, nw) + }) + } + + run("simple", func(t *testing.T, wWALPath, fWALPath string, w *Writer, nw *testconn.Network) { + // Write 100 entries in background. + go func() { + for i := 0; i < 100; i++ { + time.Sleep(10 * time.Millisecond) + w.Store("x", uint64(i+10), _b(fmt.Sprintf("data %d", i))) + } + }() + + // Run a sender in the background. + go func() { + conn := nw.Accept() + SendWAL(wWALPath, conn) + }() + + // Run the follower. + go func() { + conn := nw.Dial() + RecvWAL(fWALPath, conn) + }() + + time.Sleep(time.Second) + + // Wait for follower to get 100 entries, then close connection. + f := NewFollower(fWALPath) + defer f.Close() + f.waitForSeqNum(100) + + if err := walsEqual(wWALPath, fWALPath); err != nil { + t.Fatal(err) + } + }) + + run("net failures", func(t *testing.T, wWALPath, fWALPath string, w *Writer, nw *testconn.Network) { + N := 10000 + sleepTime := time.Millisecond + go func() { + for i := 0; i < N; i++ { + time.Sleep(sleepTime) + if rand.Float64() < 0.9 { + w.Store(randString(), randID(), _b(randString())) + } else { + w.Delete(randString(), randID()) + } + } + }() + + // Run a sender in the background. + go func() { + f := NewFollower(fWALPath) + for f.MaxSeqNum() < uint64(N) { + conn := nw.Accept() + SendWAL(wWALPath, conn) + } + }() + + // Run the follower in the background. + go func() { + f := NewFollower(fWALPath) + for f.MaxSeqNum() < uint64(N) { + conn := nw.Dial() + RecvWAL(fWALPath, conn) + } + }() + + // Disconnect the network randomly. + go func() { + f := NewFollower(fWALPath) + for f.MaxSeqNum() < uint64(N) { + time.Sleep(time.Duration(rand.Intn(2 * int(sleepTime)))) + if rand.Float64() < 0.5 { + nw.CloseClient() + } else { + nw.CloseServer() + } + } + }() + + time.Sleep(time.Second) + + // Wait for follower to get 100 entries, then close connection. + f := NewFollower(fWALPath) + defer f.Close() + f.waitForSeqNum(uint64(N)) + + if err := walsEqual(wWALPath, fWALPath); err != nil { + t.Fatal(err) + } + }) +} + +func TestShippingEncoding(t *testing.T) { + recs := []Record{ + {SeqNum: 10, Collection: "x", ID: 44, Store: true, Data: _b("Hello")}, + {SeqNum: 24, Collection: "abc", ID: 3, Store: true, Data: _b("x")}, + {SeqNum: 81, Collection: "qrs", ID: 102, Store: false}, + } + + buf := make([]byte, recHeaderSize) + for _, rec := range recs { + encodeRecordHeader(rec, buf) + out, colLen, dataLen := decodeRecHeader(buf) + if out.SeqNum != rec.SeqNum { + t.Fatal(out, rec) + } + if out.ID != rec.ID { + t.Fatal(out, rec) + } + if out.Store != rec.Store { + t.Fatal(out, rec) + } + if colLen != len(rec.Collection) { + t.Fatal(out, rec) + } + if dataLen != len(rec.Data) { + t.Fatal(out, rec) + } + } +} diff --git a/wal/util.go b/wal/util.go new file mode 100644 index 0000000..58e1ae4 --- /dev/null +++ b/wal/util.go @@ -0,0 +1,7 @@ +package wal + +func must(err error) { + if err != nil { + panic(err) + } +} diff --git a/wal/util_test.go b/wal/util_test.go new file mode 100644 index 0000000..658c223 --- /dev/null +++ b/wal/util_test.go @@ -0,0 +1,66 @@ +package wal + +import ( + "crypto/rand" + "encoding/hex" + "fmt" + mrand "math/rand" + "os" + "path/filepath" + "reflect" +) + +func _b(in string) []byte { + return []byte(in) +} + +func randString() string { + buf := make([]byte, 1+mrand.Intn(20)) + rand.Read(buf) + return hex.EncodeToString(buf) +} + +func randID() uint64 { + return uint64(mrand.Uint32()) +} + +func randPath() string { + buf := make([]byte, 8) + rand.Read(buf) + return filepath.Join(os.TempDir(), hex.EncodeToString(buf)) +} + +func readWAL(walPath string) (l []Record) { + f := NewFollower(walPath) + defer f.Close() + f.Replay(0, func(rec Record) error { + l = append(l, rec) + return nil + }) + return l +} + +func walEqual(walPath string, expected []Record) error { + recs := readWAL(walPath) + return recsEqual(recs, expected) +} + +func recsEqual(recs, expected []Record) error { + if len(recs) != len(expected) { + return fmt.Errorf("Expected %d records but found %d", + len(expected), len(recs)) + } + + for i, rec := range recs { + exp := expected[i] + if !reflect.DeepEqual(rec, exp) { + return fmt.Errorf("Mismatched records: %v != %v", rec, exp) + } + } + + return nil +} + +func walsEqual(path1, path2 string) error { + return recsEqual(readWAL(path1), readWAL(path2)) +} diff --git a/wal/writer.go b/wal/writer.go new file mode 100644 index 0000000..4462cc2 --- /dev/null +++ b/wal/writer.go @@ -0,0 +1,176 @@ +package wal + +import ( + "database/sql" + "sync" + "time" + + _ "github.com/mattn/go-sqlite3" +) + +type insertJob struct { + Collection string + ID uint64 + Store bool + Data []byte + Ready *sync.WaitGroup +} + +type Writer struct { + db *sql.DB + insert *sql.Stmt + insertQ chan insertJob + doneWG sync.WaitGroup +} + +func NewWriter(walPath string) *Writer { + db := initWAL(walPath) + + insert, err := db.Prepare(sqlWALInsert) + must(err) + w := &Writer{ + db: db, + insert: insert, + insertQ: make(chan insertJob, 1024), + } + + var maxSeqNum uint64 + row := db.QueryRow(sqlWALMaxSeqNum) + must(row.Scan(&maxSeqNum)) + + w.doneWG.Add(1) + go w.insertProc(maxSeqNum) + return w +} + +func (w *Writer) Close() { + if w.db == nil { + return + } + + close(w.insertQ) + w.doneWG.Wait() + w.db.Close() + w.db = nil +} + +func (w *Writer) Store(collection string, id uint64, data []byte) { + job := insertJob{ + Collection: collection, + ID: id, + Store: true, + Data: data, + Ready: &sync.WaitGroup{}, + } + job.Ready.Add(1) + w.insertQ <- job + job.Ready.Wait() +} + +func (w *Writer) Delete(collection string, id uint64) { + job := insertJob{ + Collection: collection, + ID: id, + Store: false, + Ready: &sync.WaitGroup{}, + } + job.Ready.Add(1) + w.insertQ <- job + job.Ready.Wait() +} + +// Called single-threaded from RecvWAL. +func (w *Writer) storeAsync(collection string, id uint64, data []byte) { + w.insertQ <- insertJob{ + Collection: collection, + ID: id, + Store: true, + Data: data, + } +} + +// Called single-threaded from RecvWAL. +func (w *Writer) deleteAsync(collection string, id uint64) { + w.insertQ <- insertJob{ + Collection: collection, + ID: id, + Store: false, + } +} + +func (w *Writer) MaxSeqNum() (n uint64) { + w.db.QueryRow(sqlWALMaxSeqNum).Scan(&n) + return +} + +func (w *Writer) insertProc(maxSeqNum uint64) { + defer w.doneWG.Done() + + var ( + job insertJob + tx *sql.Tx + insert *sql.Stmt + ok bool + err error + newSeqNum uint64 + now int64 + wgs = make([]*sync.WaitGroup, 10) + ) + + var () + +BEGIN: + + newSeqNum = maxSeqNum + wgs = wgs[:0] + + job, ok = <-w.insertQ + if !ok { + return + } + + tx, err = w.db.Begin() + must(err) + + insert, err = tx.Prepare(sqlWALInsert) + must(err) + + now = time.Now().Unix() + +LOOP: + + newSeqNum++ + _, err = insert.Exec( + newSeqNum, + now, + job.Collection, + job.ID, + job.Store, + job.Data) + + must(err) + if job.Ready != nil { + wgs = append(wgs, job.Ready) + } + + select { + case job, ok = <-w.insertQ: + if ok { + goto LOOP + } + default: + } + + goto COMMIT + +COMMIT: + + must(tx.Commit()) + + maxSeqNum = newSeqNum + for i := range wgs { + wgs[i].Done() + } + + goto BEGIN +} diff --git a/wal/writer_test.go b/wal/writer_test.go new file mode 100644 index 0000000..c4e74e3 --- /dev/null +++ b/wal/writer_test.go @@ -0,0 +1,208 @@ +package wal + +import ( + "bytes" + "fmt" + "os" + "strconv" + "sync" + "testing" + "time" +) + +// ---------------------------------------------------------------------------- + +func (w *Writer) waitForSeqNum(n uint64) { + for { + if w.MaxSeqNum() == n { + return + } + time.Sleep(time.Millisecond) + } +} + +// ---------------------------------------------------------------------------- + +func TestWriter(t *testing.T) { + + run := func(name string, inner func(t *testing.T, walPath string, w *Writer)) { + t.Run(name, func(t *testing.T) { + walPath := randPath() + ".wal" + defer os.RemoveAll(walPath) + w := NewWriter(walPath) + defer w.Close() + inner(t, walPath, w) + }) + } + + run("simple", func(t *testing.T, walPath string, w *Writer) { + w.Store("a", 1, _b("Hello")) + w.Delete("b", 1) + w.Store("a", 2, _b("World")) + w.Store("a", 1, _b("Good bye")) + + err := walEqual(walPath, []Record{ + {SeqNum: 1, Collection: "a", ID: 1, Store: true, Data: _b("Hello")}, + {SeqNum: 2, Collection: "b", ID: 1}, + {SeqNum: 3, Collection: "a", ID: 2, Store: true, Data: _b("World")}, + {SeqNum: 4, Collection: "a", ID: 1, Store: true, Data: _b("Good bye")}, + }) + + if err != nil { + t.Fatal(err) + } + }) + + run("write close write", func(t *testing.T, walPath string, w *Writer) { + w.Store("a", 1, _b("Hello")) + w.Close() + + w = NewWriter(walPath) + w.Delete("b", 1) + w.Close() + + w = NewWriter(walPath) + w.Store("a", 2, _b("World")) + w.Close() + + w = NewWriter(walPath) + w.Store("a", 1, _b("Good bye")) + + err := walEqual(walPath, []Record{ + {SeqNum: 1, Collection: "a", ID: 1, Store: true, Data: _b("Hello")}, + {SeqNum: 2, Collection: "b", ID: 1}, + {SeqNum: 3, Collection: "a", ID: 2, Store: true, Data: _b("World")}, + {SeqNum: 4, Collection: "a", ID: 1, Store: true, Data: _b("Good bye")}, + }) + + if err != nil { + t.Fatal(err) + } + }) + + run("write concurrent", func(t *testing.T, walPath string, w *Writer) { + N := 32 + wg := sync.WaitGroup{} + + expected := make([][]Record, N) + + for i := 0; i < N; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + collection := fmt.Sprintf("%d", i) + for j := 0; j < 1024; j++ { + rec := Record{ + Collection: collection, + ID: uint64(j + 1), + Store: true, + Data: _b(fmt.Sprintf("%d", j)), + } + + w.Store(rec.Collection, rec.ID, rec.Data) + expected[i] = append(expected[i], rec) + } + }(i) + } + + wg.Wait() + + recs := readWAL(walPath) + found := make([][]Record, N) + for _, rec := range recs { + rec := rec + index, err := strconv.ParseInt(rec.Collection, 10, 64) + if err != nil { + t.Fatal(err) + } + found[index] = append(found[index], rec) + } + + if len(found) != len(expected) { + t.Fatal(len(found), len(expected)) + } + + for i := range found { + fnd := found[i] + exp := expected[i] + if len(fnd) != len(exp) { + t.Fatal(i, len(fnd), len(exp)) + } + + for j := range fnd { + f := fnd[j] + e := exp[j] + ok := f.Collection == e.Collection && + f.ID == e.ID && + f.Store == e.Store && + bytes.Equal(f.Data, e.Data) + if !ok { + t.Fatal(i, j, f, e) + } + } + } + }) + + run("store delete async", func(t *testing.T, walPath string, w *Writer) { + w.storeAsync("a", 1, _b("hello1")) + w.storeAsync("a", 2, _b("hello2")) + w.deleteAsync("a", 1) + w.storeAsync("a", 3, _b("hello3")) + w.storeAsync("b", 1, _b("b1")) + + w.waitForSeqNum(5) + + err := walEqual(walPath, []Record{ + {SeqNum: 1, Collection: "a", ID: 1, Store: true, Data: _b("hello1")}, + {SeqNum: 2, Collection: "a", ID: 2, Store: true, Data: _b("hello2")}, + {SeqNum: 3, Collection: "a", ID: 1, Store: false}, + {SeqNum: 4, Collection: "a", ID: 3, Store: true, Data: _b("hello3")}, + {SeqNum: 5, Collection: "b", ID: 1, Store: true, Data: _b("b1")}, + }) + + if err != nil { + t.Fatal(err) + } + }) + + run("store delete async with close", func(t *testing.T, walPath string, w *Writer) { + w.storeAsync("a", 1, _b("hello1")) + w.Close() + w = NewWriter(walPath) + w.storeAsync("a", 2, _b("hello2")) + w.Close() + w = NewWriter(walPath) + w.deleteAsync("a", 1) + w.Close() + w = NewWriter(walPath) + w.storeAsync("a", 3, _b("hello3")) + w.Close() + w = NewWriter(walPath) + w.storeAsync("b", 1, _b("b1")) + w.Close() + w = NewWriter(walPath) + + w.waitForSeqNum(5) + + err := walEqual(walPath, []Record{ + {SeqNum: 1, Collection: "a", ID: 1, Store: true, Data: _b("hello1")}, + {SeqNum: 2, Collection: "a", ID: 2, Store: true, Data: _b("hello2")}, + {SeqNum: 3, Collection: "a", ID: 1, Store: false}, + {SeqNum: 4, Collection: "a", ID: 3, Store: true, Data: _b("hello3")}, + {SeqNum: 5, Collection: "b", ID: 1, Store: true, Data: _b("b1")}, + }) + + if err != nil { + t.Fatal(err) + } + }) + + // This is really just a benchmark. + run("store async many", func(t *testing.T, walPath string, w *Writer) { + N := 32768 + for i := 0; i < N; i++ { + w.storeAsync("a", 1, _b("x")) + } + w.waitForSeqNum(uint64(N)) + }) +}