From f625c92c72a4b3ba9176b95fa0672bdca8143ca0 Mon Sep 17 00:00:00 2001 From: jdl Date: Wed, 27 Jul 2022 21:16:53 +0200 Subject: [PATCH] testing: wip --- README.md | 4 +++- kvstore/store.go | 34 +++++++++++++++++++++++++++++-- shipping_test.go | 52 ++++++++++++++++++++++++++++++++++++++++++++++-- testdb_test.go | 27 ++++++++++++++++--------- 4 files changed, 103 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index de8b31a..87e3150 100644 --- a/README.md +++ b/README.md @@ -4,10 +4,12 @@ An in-process, in-memory database for Go. ## TO DO +* database: first WAL shipping test. +* database: race test - multiple routines writing the same key set. * database: test for concurrent writers * Create writers in different ID ranges of Users and Accounts * Check results at end. * database: WAL shipping * database: WAL shipping with network disconnects * BTreeIndex: - * Should insert panic if item is replaced? + * Panic if insert or update replaces an item diff --git a/kvstore/store.go b/kvstore/store.go index 81a22be..8a62dcc 100644 --- a/kvstore/store.go +++ b/kvstore/store.go @@ -209,16 +209,46 @@ func (kv *KV) commit() { delete, err := tx.Prepare(sqlKVDelete) must(err) + var ( + doUpsert func(wal.Record) error + doDelete func(wal.Record) error + ) + + if kv.primary { + doUpsert = func(rec wal.Record) (err error) { + _, err = upsert.Exec(rec.Collection, rec.ID, rec.Data) + return err + } + + doDelete = func(rec wal.Record) (err error) { + _, err = delete.Exec(rec.Collection, rec.ID) + return err + } + } else { + doUpsert = func(rec wal.Record) (err error) { + kv.onStore(rec.Collection, rec.ID, rec.Data) + _, err = upsert.Exec(rec.Collection, rec.ID, rec.Data) + return err + } + + doDelete = func(rec wal.Record) (err error) { + kv.onDelete(rec.Collection, rec.ID) + _, err = delete.Exec(rec.Collection, rec.ID) + return err + } + } + err = kv.f.Replay(maxSeqNum, func(rec wal.Record) error { if rec.SeqNum != maxSeqNum+1 { return fmt.Errorf("expected sequence number %d but got %d", maxSeqNum+1, rec.SeqNum) } if rec.Store { - _, err = upsert.Exec(rec.Collection, rec.ID, rec.Data) + err = doUpsert(rec) } else { - _, err = delete.Exec(rec.Collection, rec.ID) + err = doDelete(rec) } + maxSeqNum = rec.SeqNum return err }) diff --git a/shipping_test.go b/shipping_test.go index 917487d..4690f59 100644 --- a/shipping_test.go +++ b/shipping_test.go @@ -2,6 +2,7 @@ package mdb import ( "os" + "sync" "testing" "git.crumpington.com/private/mdb/testconn" @@ -26,7 +27,54 @@ func TestShipping(t *testing.T) { }) } - run("simple", func(t *testing.T, db1, db2 *DB, network *testconn.Network) { - // TODO + run("simple", func(t *testing.T, db, db2 *DB, network *testconn.Network) { + wg := sync.WaitGroup{} + wg.Add(2) + + // Send in background. + go func() { + defer wg.Done() + conn := network.Accept() + db.SyncSend(conn) + }() + + // Recv in background. + go func() { + defer wg.Done() + conn := network.Dial() + db2.SyncRecv(conn) + }() + + users := []User{ + {ID: db.Users.c.NextID(), Email: "a@b.com", Name: "xxx"}, + {ID: db.Users.c.NextID(), Email: "c@d.com", Name: "ggg"}, + {ID: db.Users.c.NextID(), Email: "e@f.com", Name: "aaa"}, + } + + for _, user := range users { + _, err := db.Users.c.Insert(user) + if err != nil { + t.Fatal(err) + } + } + + err := db.Users.c.Update(users[1].ID, func(u User) (User, error) { + u.Name = "hello" + return u, nil + }) + if err != nil { + t.Fatal(err) + } + + db.Users.c.Delete(users[0].ID) + db.Users.c.Delete(users[2].ID) + + db.WaitForSync(db2) + network.CloseClient() + wg.Wait() + + if err := db.Equals(db2); err != nil { + t.Fatal(err) + } }) } diff --git a/testdb_test.go b/testdb_test.go index ac162eb..a566067 100644 --- a/testdb_test.go +++ b/testdb_test.go @@ -2,6 +2,8 @@ package mdb import ( "errors" + "fmt" + "log" "net/mail" "strings" "time" @@ -125,6 +127,12 @@ func OpenDB(root string, primary bool) *DB { db.Accounts = Accounts{} db.Accounts.c = NewCollection(db.Database, "accounts", accountGetID) + db.Accounts.nameMap = NewMapIndex( + db.Accounts.c, + "name", + func(a *Account) string { return a.Name }, + nil) + db.Start() return db @@ -135,42 +143,42 @@ func (db *DB) Equals(rhs *DB) error { // Users: itemMap. if err := db.Users.c.items.Equals(rhs.Users.c.items); err != nil { - return err + return fmt.Errorf("%w: Users.c.items not equal", err) } // Users: emailMap if err := db.Users.emailMap.Equals(rhs.Users.emailMap); err != nil { - return err + return fmt.Errorf("%w: Users.emailMap not equal", err) } // Users: emailBTree if err := db.Users.emailBTree.Equals(rhs.Users.emailBTree); err != nil { - return err + return fmt.Errorf("%w: Users.emailBTree not equal", err) } // Users: nameBTree if err := db.Users.nameBTree.Equals(rhs.Users.nameBTree); err != nil { - return err + return fmt.Errorf("%w: Users.nameBTree not equal", err) } // Users: extIDMap if err := db.Users.extIDMap.Equals(rhs.Users.extIDMap); err != nil { - return err + return fmt.Errorf("%w: Users.extIDMap not equal", err) } // Users: extIDBTree if err := db.Users.extIDBTree.Equals(rhs.Users.extIDBTree); err != nil { - return err + return fmt.Errorf("%w: Users.extIDBTree not equal", err) } // Accounts: itemMap if err := db.Accounts.c.items.Equals(rhs.Accounts.c.items); err != nil { - return err + return fmt.Errorf("%w: Accounts.c.items not equal", err) } // Accounts: nameMap if err := db.Accounts.nameMap.Equals(rhs.Accounts.nameMap); err != nil { - return err + return fmt.Errorf("%w: Accounts.nameMap not equal", err) } return nil @@ -181,7 +189,8 @@ func (db *DB) WaitForSync(rhs *DB) { for { s1 := db.WALStatus() s2 := rhs.WALStatus() - if s1 == s2 && s1.MaxSeqNumKV == s1.MaxSeqNumWAL { + log.Print(s1, s2) + if s1.MaxSeqNumKV == s1.MaxSeqNumWAL && s1.MaxSeqNumKV == s2.MaxSeqNumKV { return } time.Sleep(100 * time.Millisecond)