master
jdl 2022-07-28 07:29:10 +02:00
parent f625c92c72
commit db30ce80d6
7 changed files with 242 additions and 39 deletions

View File

@ -4,12 +4,5 @@ An in-process, in-memory database for Go.
## TO DO
* database: first WAL shipping test.
* database: race test - multiple routines writing the same key set.
* database: test for concurrent writers
* Create writers in different ID ranges of Users and Accounts
* Check results at end.
* database: WAL shipping
* database: WAL shipping with network disconnects
* BTreeIndex:
* Panic if insert or update replaces an item

View File

@ -197,12 +197,9 @@ func (c *Collection[T]) onStore(collection string, id uint64, data []byte) {
}
func (c *Collection[T]) onDelete(collection string, id uint64) {
item, ok := c.items.Get(id)
if !ok {
return
}
for _, idx := range c.indices {
idx.delete(item)
if item, ok := c.items.Get(id); ok {
for _, idx := range c.indices {
idx.delete(item)
}
}
}

23
database_test.go Normal file
View File

@ -0,0 +1,23 @@
package mdb
import (
"sync"
"testing"
)
func TestDatabase(t *testing.T) {
testWithDB(t, "multiple writers", func(t *testing.T, db *DB) {
wg := sync.WaitGroup{}
N := 64
wg.Add(64)
for i := 0; i < N; i++ {
go func() {
defer wg.Done()
for j := 0; j < 1024; j++ {
db.RandAction()
}
}()
}
wg.Wait()
})
}

View File

@ -5,5 +5,5 @@ import "time"
var (
connTimeout = 16 * time.Second // For sending / receiving WAL.
heartbeatInterval = 2 * time.Second // Used in Follower.SendLog
pollInterval = 250 * time.Millisecond // Used in Follower.SendLog
pollInterval = 500 * time.Millisecond // Used in Follower.SendLog
)

View File

@ -1,9 +1,11 @@
package mdb
import (
"math/rand"
"os"
"sync"
"testing"
"time"
"git.crumpington.com/private/mdb/testconn"
)
@ -45,30 +47,10 @@ func TestShipping(t *testing.T) {
db2.SyncRecv(conn)
}()
users := []User{
{ID: db.Users.c.NextID(), Email: "a@b.com", Name: "xxx"},
{ID: db.Users.c.NextID(), Email: "c@d.com", Name: "ggg"},
{ID: db.Users.c.NextID(), Email: "e@f.com", Name: "aaa"},
for i := 0; i < 100; i++ {
db.RandAction()
}
for _, user := range users {
_, err := db.Users.c.Insert(user)
if err != nil {
t.Fatal(err)
}
}
err := db.Users.c.Update(users[1].ID, func(u User) (User, error) {
u.Name = "hello"
return u, nil
})
if err != nil {
t.Fatal(err)
}
db.Users.c.Delete(users[0].ID)
db.Users.c.Delete(users[2].ID)
db.WaitForSync(db2)
network.CloseClient()
wg.Wait()
@ -77,4 +59,126 @@ func TestShipping(t *testing.T) {
t.Fatal(err)
}
})
run("simple multiple writers", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
wg := sync.WaitGroup{}
// Send in background.
wg.Add(1)
go func() {
defer wg.Done()
conn := network.Accept()
db.SyncSend(conn)
}()
// Recv in background.
wg.Add(1)
go func() {
defer wg.Done()
conn := network.Dial()
db2.SyncRecv(conn)
}()
updateWG := sync.WaitGroup{}
updateWG.Add(64)
for i := 0; i < 64; i++ {
go func() {
defer updateWG.Done()
for j := 0; j < 1024; j++ {
db.RandAction()
}
}()
}
updateWG.Wait()
db.WaitForSync(db2)
network.CloseClient()
wg.Wait()
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
})
run("flakey network", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
sleepTimeout := time.Millisecond
updateWG := sync.WaitGroup{}
updateWG.Add(64)
for i := 0; i < 64; i++ {
go func() {
defer updateWG.Done()
for j := 0; j < 1024; j++ {
time.Sleep(sleepTimeout)
db.RandAction()
}
}()
}
updating := &atomicBool{}
updating.Set(true)
go func() {
updateWG.Wait()
updating.Set(false)
}()
// Recv in background.
recving := &atomicBool{}
recving.Set(true)
go func() {
for {
// Stop when no longer updating and WAL files match.
if !updating.Get() {
ws := db.WALStatus()
ws2 := db2.WALStatus()
if ws.MaxSeqNumWAL == ws2.MaxSeqNumWAL {
recving.Set(false)
return
}
}
if conn := network.Dial(); conn != nil {
db2.SyncRecv(conn)
}
}
}()
// Send in background.
sending := &atomicBool{}
sending.Set(true)
go func() {
for {
// Stop when no longer updating and WAL files match.
if !updating.Get() {
ws := db.WALStatus()
ws2 := db2.WALStatus()
if ws.MaxSeqNumWAL == ws2.MaxSeqNumWAL {
sending.Set(false)
return
}
}
if conn := network.Accept(); conn != nil {
db.SyncSend(conn)
}
}
}()
// Interrupt network periodically as long as sending or receiving.
for sending.Get() || recving.Get() {
time.Sleep(time.Duration(rand.Intn(10 * int(sleepTimeout))))
if rand.Float64() < 0.5 {
network.CloseClient()
} else {
network.CloseServer()
}
}
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
})
}

View File

@ -3,7 +3,7 @@ package mdb
import (
"errors"
"fmt"
"log"
"math/rand"
"net/mail"
"strings"
"time"
@ -189,10 +189,79 @@ func (db *DB) WaitForSync(rhs *DB) {
for {
s1 := db.WALStatus()
s2 := rhs.WALStatus()
log.Print(s1, s2)
if s1.MaxSeqNumKV == s1.MaxSeqNumWAL && s1.MaxSeqNumKV == s2.MaxSeqNumKV {
return
}
time.Sleep(100 * time.Millisecond)
}
}
var (
randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 2, 13, 14, 15, 16}
)
func (db *DB) RandAction() {
if rand.Float32() < 0.3 {
db.randActionAccount()
} else {
db.randActionUser()
}
}
func (db *DB) randActionAccount() {
id := randIDs[rand.Intn(len(randIDs))]
f := rand.Float32()
_, exists := db.Accounts.c.Get(id)
if !exists {
db.Accounts.c.Insert(Account{
ID: id,
Name: randString(),
})
return
}
if f < 0.05 {
db.Accounts.c.Delete(id)
return
}
db.Accounts.c.Update(id, func(a Account) (Account, error) {
a.Name = randString()
return a, nil
})
}
func (db *DB) randActionUser() {
id := randIDs[rand.Intn(len(randIDs))]
f := rand.Float32()
_, exists := db.Users.c.Get(id)
if !exists {
user := User{
ID: id,
Email: randString() + "@domain.com",
Name: randString(),
}
if f < 0.1 {
user.ExtID = randString()
}
db.Users.c.Insert(user)
return
}
if f < 0.05 {
db.Users.c.Delete(id)
return
}
db.Users.c.Update(id, func(a User) (User, error) {
a.Name = randString()
if f < 0.1 {
a.ExtID = randString()
} else {
a.ExtID = ""
}
a.Email = randString() + "@domain.com"
return a, nil
})
}

View File

@ -4,6 +4,7 @@ import (
"crypto/rand"
"encoding/hex"
mrand "math/rand"
"sync/atomic"
)
func randString() string {
@ -13,3 +14,19 @@ func randString() string {
}
return hex.EncodeToString(buf)
}
type atomicBool struct {
i int64
}
func (a *atomicBool) Get() bool {
return atomic.LoadInt64(&a.i) == 1
}
func (a *atomicBool) Set(b bool) {
if b {
atomic.StoreInt64(&a.i, 1)
} else {
atomic.StoreInt64(&a.i, 0)
}
}