From db30ce80d60c9680d2c7e7cd17f1ccc7a33d44eb Mon Sep 17 00:00:00 2001 From: jdl Date: Thu, 28 Jul 2022 07:29:10 +0200 Subject: [PATCH] Testing --- README.md | 7 -- collection.go | 11 ++-- database_test.go | 23 +++++++ kvstore/wal/global.go | 2 +- shipping_test.go | 148 +++++++++++++++++++++++++++++++++++------- testdb_test.go | 73 ++++++++++++++++++++- util_test.go | 17 +++++ 7 files changed, 242 insertions(+), 39 deletions(-) create mode 100644 database_test.go diff --git a/README.md b/README.md index 87e3150..fce0ea3 100644 --- a/README.md +++ b/README.md @@ -4,12 +4,5 @@ An in-process, in-memory database for Go. ## TO DO -* database: first WAL shipping test. -* database: race test - multiple routines writing the same key set. -* database: test for concurrent writers - * Create writers in different ID ranges of Users and Accounts - * Check results at end. -* database: WAL shipping -* database: WAL shipping with network disconnects * BTreeIndex: * Panic if insert or update replaces an item diff --git a/collection.go b/collection.go index 9e2bd1b..4625adc 100644 --- a/collection.go +++ b/collection.go @@ -197,12 +197,9 @@ func (c *Collection[T]) onStore(collection string, id uint64, data []byte) { } func (c *Collection[T]) onDelete(collection string, id uint64) { - item, ok := c.items.Get(id) - if !ok { - return - } - - for _, idx := range c.indices { - idx.delete(item) + if item, ok := c.items.Get(id); ok { + for _, idx := range c.indices { + idx.delete(item) + } } } diff --git a/database_test.go b/database_test.go new file mode 100644 index 0000000..4e26aa9 --- /dev/null +++ b/database_test.go @@ -0,0 +1,23 @@ +package mdb + +import ( + "sync" + "testing" +) + +func TestDatabase(t *testing.T) { + testWithDB(t, "multiple writers", func(t *testing.T, db *DB) { + wg := sync.WaitGroup{} + N := 64 + wg.Add(64) + for i := 0; i < N; i++ { + go func() { + defer wg.Done() + for j := 0; j < 1024; j++ { + db.RandAction() + } + }() + } + wg.Wait() + }) +} diff --git a/kvstore/wal/global.go b/kvstore/wal/global.go index 74a28e1..899f4fe 100644 --- a/kvstore/wal/global.go +++ b/kvstore/wal/global.go @@ -5,5 +5,5 @@ import "time" var ( connTimeout = 16 * time.Second // For sending / receiving WAL. heartbeatInterval = 2 * time.Second // Used in Follower.SendLog - pollInterval = 250 * time.Millisecond // Used in Follower.SendLog + pollInterval = 500 * time.Millisecond // Used in Follower.SendLog ) diff --git a/shipping_test.go b/shipping_test.go index 4690f59..fa866f5 100644 --- a/shipping_test.go +++ b/shipping_test.go @@ -1,9 +1,11 @@ package mdb import ( + "math/rand" "os" "sync" "testing" + "time" "git.crumpington.com/private/mdb/testconn" ) @@ -45,30 +47,10 @@ func TestShipping(t *testing.T) { db2.SyncRecv(conn) }() - users := []User{ - {ID: db.Users.c.NextID(), Email: "a@b.com", Name: "xxx"}, - {ID: db.Users.c.NextID(), Email: "c@d.com", Name: "ggg"}, - {ID: db.Users.c.NextID(), Email: "e@f.com", Name: "aaa"}, + for i := 0; i < 100; i++ { + db.RandAction() } - for _, user := range users { - _, err := db.Users.c.Insert(user) - if err != nil { - t.Fatal(err) - } - } - - err := db.Users.c.Update(users[1].ID, func(u User) (User, error) { - u.Name = "hello" - return u, nil - }) - if err != nil { - t.Fatal(err) - } - - db.Users.c.Delete(users[0].ID) - db.Users.c.Delete(users[2].ID) - db.WaitForSync(db2) network.CloseClient() wg.Wait() @@ -77,4 +59,126 @@ func TestShipping(t *testing.T) { t.Fatal(err) } }) + + run("simple multiple writers", func(t *testing.T, db, db2 *DB, network *testconn.Network) { + wg := sync.WaitGroup{} + + // Send in background. + wg.Add(1) + go func() { + defer wg.Done() + conn := network.Accept() + db.SyncSend(conn) + }() + + // Recv in background. + wg.Add(1) + go func() { + defer wg.Done() + conn := network.Dial() + db2.SyncRecv(conn) + }() + + updateWG := sync.WaitGroup{} + updateWG.Add(64) + for i := 0; i < 64; i++ { + go func() { + defer updateWG.Done() + for j := 0; j < 1024; j++ { + db.RandAction() + } + }() + } + + updateWG.Wait() + + db.WaitForSync(db2) + network.CloseClient() + wg.Wait() + + if err := db.Equals(db2); err != nil { + t.Fatal(err) + } + }) + + run("flakey network", func(t *testing.T, db, db2 *DB, network *testconn.Network) { + sleepTimeout := time.Millisecond + + updateWG := sync.WaitGroup{} + updateWG.Add(64) + for i := 0; i < 64; i++ { + go func() { + defer updateWG.Done() + for j := 0; j < 1024; j++ { + time.Sleep(sleepTimeout) + db.RandAction() + } + }() + } + + updating := &atomicBool{} + updating.Set(true) + + go func() { + updateWG.Wait() + updating.Set(false) + }() + + // Recv in background. + recving := &atomicBool{} + recving.Set(true) + go func() { + for { + // Stop when no longer updating and WAL files match. + if !updating.Get() { + ws := db.WALStatus() + ws2 := db2.WALStatus() + if ws.MaxSeqNumWAL == ws2.MaxSeqNumWAL { + recving.Set(false) + return + } + } + + if conn := network.Dial(); conn != nil { + db2.SyncRecv(conn) + } + } + }() + + // Send in background. + sending := &atomicBool{} + sending.Set(true) + go func() { + for { + // Stop when no longer updating and WAL files match. + if !updating.Get() { + ws := db.WALStatus() + ws2 := db2.WALStatus() + if ws.MaxSeqNumWAL == ws2.MaxSeqNumWAL { + sending.Set(false) + return + } + } + + if conn := network.Accept(); conn != nil { + db.SyncSend(conn) + } + } + }() + + // Interrupt network periodically as long as sending or receiving. + for sending.Get() || recving.Get() { + time.Sleep(time.Duration(rand.Intn(10 * int(sleepTimeout)))) + if rand.Float64() < 0.5 { + network.CloseClient() + } else { + network.CloseServer() + } + } + + if err := db.Equals(db2); err != nil { + t.Fatal(err) + } + }) + } diff --git a/testdb_test.go b/testdb_test.go index a566067..ac94ed2 100644 --- a/testdb_test.go +++ b/testdb_test.go @@ -3,7 +3,7 @@ package mdb import ( "errors" "fmt" - "log" + "math/rand" "net/mail" "strings" "time" @@ -189,10 +189,79 @@ func (db *DB) WaitForSync(rhs *DB) { for { s1 := db.WALStatus() s2 := rhs.WALStatus() - log.Print(s1, s2) if s1.MaxSeqNumKV == s1.MaxSeqNumWAL && s1.MaxSeqNumKV == s2.MaxSeqNumKV { return } time.Sleep(100 * time.Millisecond) } } + +var ( + randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 2, 13, 14, 15, 16} +) + +func (db *DB) RandAction() { + if rand.Float32() < 0.3 { + db.randActionAccount() + } else { + db.randActionUser() + } +} + +func (db *DB) randActionAccount() { + id := randIDs[rand.Intn(len(randIDs))] + f := rand.Float32() + + _, exists := db.Accounts.c.Get(id) + if !exists { + db.Accounts.c.Insert(Account{ + ID: id, + Name: randString(), + }) + return + } + + if f < 0.05 { + db.Accounts.c.Delete(id) + return + } + db.Accounts.c.Update(id, func(a Account) (Account, error) { + a.Name = randString() + return a, nil + }) +} + +func (db *DB) randActionUser() { + id := randIDs[rand.Intn(len(randIDs))] + f := rand.Float32() + + _, exists := db.Users.c.Get(id) + if !exists { + user := User{ + ID: id, + Email: randString() + "@domain.com", + Name: randString(), + } + if f < 0.1 { + user.ExtID = randString() + } + db.Users.c.Insert(user) + return + } + + if f < 0.05 { + db.Users.c.Delete(id) + return + } + + db.Users.c.Update(id, func(a User) (User, error) { + a.Name = randString() + if f < 0.1 { + a.ExtID = randString() + } else { + a.ExtID = "" + } + a.Email = randString() + "@domain.com" + return a, nil + }) +} diff --git a/util_test.go b/util_test.go index 08fc1e7..2f8746b 100644 --- a/util_test.go +++ b/util_test.go @@ -4,6 +4,7 @@ import ( "crypto/rand" "encoding/hex" mrand "math/rand" + "sync/atomic" ) func randString() string { @@ -13,3 +14,19 @@ func randString() string { } return hex.EncodeToString(buf) } + +type atomicBool struct { + i int64 +} + +func (a *atomicBool) Get() bool { + return atomic.LoadInt64(&a.i) == 1 +} + +func (a *atomicBool) Set(b bool) { + if b { + atomic.StoreInt64(&a.i, 1) + } else { + atomic.StoreInt64(&a.i, 0) + } +}