testing: wip

master
jdl 2022-07-27 21:16:53 +02:00
parent 2f0ab90271
commit f625c92c72
4 changed files with 103 additions and 14 deletions

View File

@ -4,10 +4,12 @@ An in-process, in-memory database for Go.
## TO DO ## TO DO
* database: first WAL shipping test.
* database: race test - multiple routines writing the same key set.
* database: test for concurrent writers * database: test for concurrent writers
* Create writers in different ID ranges of Users and Accounts * Create writers in different ID ranges of Users and Accounts
* Check results at end. * Check results at end.
* database: WAL shipping * database: WAL shipping
* database: WAL shipping with network disconnects * database: WAL shipping with network disconnects
* BTreeIndex: * BTreeIndex:
* Should insert panic if item is replaced? * Panic if insert or update replaces an item

View File

@ -209,16 +209,46 @@ func (kv *KV) commit() {
delete, err := tx.Prepare(sqlKVDelete) delete, err := tx.Prepare(sqlKVDelete)
must(err) must(err)
var (
doUpsert func(wal.Record) error
doDelete func(wal.Record) error
)
if kv.primary {
doUpsert = func(rec wal.Record) (err error) {
_, err = upsert.Exec(rec.Collection, rec.ID, rec.Data)
return err
}
doDelete = func(rec wal.Record) (err error) {
_, err = delete.Exec(rec.Collection, rec.ID)
return err
}
} else {
doUpsert = func(rec wal.Record) (err error) {
kv.onStore(rec.Collection, rec.ID, rec.Data)
_, err = upsert.Exec(rec.Collection, rec.ID, rec.Data)
return err
}
doDelete = func(rec wal.Record) (err error) {
kv.onDelete(rec.Collection, rec.ID)
_, err = delete.Exec(rec.Collection, rec.ID)
return err
}
}
err = kv.f.Replay(maxSeqNum, func(rec wal.Record) error { err = kv.f.Replay(maxSeqNum, func(rec wal.Record) error {
if rec.SeqNum != maxSeqNum+1 { if rec.SeqNum != maxSeqNum+1 {
return fmt.Errorf("expected sequence number %d but got %d", maxSeqNum+1, rec.SeqNum) return fmt.Errorf("expected sequence number %d but got %d", maxSeqNum+1, rec.SeqNum)
} }
if rec.Store { if rec.Store {
_, err = upsert.Exec(rec.Collection, rec.ID, rec.Data) err = doUpsert(rec)
} else { } else {
_, err = delete.Exec(rec.Collection, rec.ID) err = doDelete(rec)
} }
maxSeqNum = rec.SeqNum maxSeqNum = rec.SeqNum
return err return err
}) })

View File

@ -2,6 +2,7 @@ package mdb
import ( import (
"os" "os"
"sync"
"testing" "testing"
"git.crumpington.com/private/mdb/testconn" "git.crumpington.com/private/mdb/testconn"
@ -26,7 +27,54 @@ func TestShipping(t *testing.T) {
}) })
} }
run("simple", func(t *testing.T, db1, db2 *DB, network *testconn.Network) { run("simple", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
// TODO wg := sync.WaitGroup{}
wg.Add(2)
// Send in background.
go func() {
defer wg.Done()
conn := network.Accept()
db.SyncSend(conn)
}()
// Recv in background.
go func() {
defer wg.Done()
conn := network.Dial()
db2.SyncRecv(conn)
}()
users := []User{
{ID: db.Users.c.NextID(), Email: "a@b.com", Name: "xxx"},
{ID: db.Users.c.NextID(), Email: "c@d.com", Name: "ggg"},
{ID: db.Users.c.NextID(), Email: "e@f.com", Name: "aaa"},
}
for _, user := range users {
_, err := db.Users.c.Insert(user)
if err != nil {
t.Fatal(err)
}
}
err := db.Users.c.Update(users[1].ID, func(u User) (User, error) {
u.Name = "hello"
return u, nil
})
if err != nil {
t.Fatal(err)
}
db.Users.c.Delete(users[0].ID)
db.Users.c.Delete(users[2].ID)
db.WaitForSync(db2)
network.CloseClient()
wg.Wait()
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
}) })
} }

View File

@ -2,6 +2,8 @@ package mdb
import ( import (
"errors" "errors"
"fmt"
"log"
"net/mail" "net/mail"
"strings" "strings"
"time" "time"
@ -125,6 +127,12 @@ func OpenDB(root string, primary bool) *DB {
db.Accounts = Accounts{} db.Accounts = Accounts{}
db.Accounts.c = NewCollection(db.Database, "accounts", accountGetID) db.Accounts.c = NewCollection(db.Database, "accounts", accountGetID)
db.Accounts.nameMap = NewMapIndex(
db.Accounts.c,
"name",
func(a *Account) string { return a.Name },
nil)
db.Start() db.Start()
return db return db
@ -135,42 +143,42 @@ func (db *DB) Equals(rhs *DB) error {
// Users: itemMap. // Users: itemMap.
if err := db.Users.c.items.Equals(rhs.Users.c.items); err != nil { if err := db.Users.c.items.Equals(rhs.Users.c.items); err != nil {
return err return fmt.Errorf("%w: Users.c.items not equal", err)
} }
// Users: emailMap // Users: emailMap
if err := db.Users.emailMap.Equals(rhs.Users.emailMap); err != nil { if err := db.Users.emailMap.Equals(rhs.Users.emailMap); err != nil {
return err return fmt.Errorf("%w: Users.emailMap not equal", err)
} }
// Users: emailBTree // Users: emailBTree
if err := db.Users.emailBTree.Equals(rhs.Users.emailBTree); err != nil { if err := db.Users.emailBTree.Equals(rhs.Users.emailBTree); err != nil {
return err return fmt.Errorf("%w: Users.emailBTree not equal", err)
} }
// Users: nameBTree // Users: nameBTree
if err := db.Users.nameBTree.Equals(rhs.Users.nameBTree); err != nil { if err := db.Users.nameBTree.Equals(rhs.Users.nameBTree); err != nil {
return err return fmt.Errorf("%w: Users.nameBTree not equal", err)
} }
// Users: extIDMap // Users: extIDMap
if err := db.Users.extIDMap.Equals(rhs.Users.extIDMap); err != nil { if err := db.Users.extIDMap.Equals(rhs.Users.extIDMap); err != nil {
return err return fmt.Errorf("%w: Users.extIDMap not equal", err)
} }
// Users: extIDBTree // Users: extIDBTree
if err := db.Users.extIDBTree.Equals(rhs.Users.extIDBTree); err != nil { if err := db.Users.extIDBTree.Equals(rhs.Users.extIDBTree); err != nil {
return err return fmt.Errorf("%w: Users.extIDBTree not equal", err)
} }
// Accounts: itemMap // Accounts: itemMap
if err := db.Accounts.c.items.Equals(rhs.Accounts.c.items); err != nil { if err := db.Accounts.c.items.Equals(rhs.Accounts.c.items); err != nil {
return err return fmt.Errorf("%w: Accounts.c.items not equal", err)
} }
// Accounts: nameMap // Accounts: nameMap
if err := db.Accounts.nameMap.Equals(rhs.Accounts.nameMap); err != nil { if err := db.Accounts.nameMap.Equals(rhs.Accounts.nameMap); err != nil {
return err return fmt.Errorf("%w: Accounts.nameMap not equal", err)
} }
return nil return nil
@ -181,7 +189,8 @@ func (db *DB) WaitForSync(rhs *DB) {
for { for {
s1 := db.WALStatus() s1 := db.WALStatus()
s2 := rhs.WALStatus() s2 := rhs.WALStatus()
if s1 == s2 && s1.MaxSeqNumKV == s1.MaxSeqNumWAL { log.Print(s1, s2)
if s1.MaxSeqNumKV == s1.MaxSeqNumWAL && s1.MaxSeqNumKV == s2.MaxSeqNumKV {
return return
} }
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)