This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/shipping_test.go

180 lines
3.3 KiB
Go

package mdb
import (
"math/rand"
"os"
"sync"
"testing"
"time"
"git.crumpington.com/private/mdb/testconn"
)
func TestShipping(t *testing.T) {
run := func(name string, inner func(t *testing.T, db1 *DB, db2 *DB, network *testconn.Network)) {
t.Run(name, func(t *testing.T) {
root1, err := os.MkdirTemp("", "")
must(err)
defer os.RemoveAll(root1)
root2, err := os.MkdirTemp("", "")
must(err)
defer os.RemoveAll(root2)
db1 := OpenDB(root1, true)
defer db1.Close()
db2 := OpenDB(root2, false)
defer db2.Close()
inner(t, db1, db2, testconn.NewNetwork())
})
}
run("simple", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
wg := sync.WaitGroup{}
wg.Add(2)
// Send in background.
go func() {
defer wg.Done()
conn := network.Accept()
db.SyncSend(conn)
}()
// Recv in background.
go func() {
defer wg.Done()
conn := network.Dial()
db2.SyncRecv(conn)
}()
for i := 0; i < 100; i++ {
db.RandAction()
}
db.WaitForSync(db2)
network.CloseClient()
wg.Wait()
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
})
run("simple multiple writers", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
wg := sync.WaitGroup{}
// Send in background.
wg.Add(1)
go func() {
defer wg.Done()
conn := network.Accept()
db.SyncSend(conn)
}()
// Recv in background.
wg.Add(1)
go func() {
defer wg.Done()
conn := network.Dial()
db2.SyncRecv(conn)
}()
updateWG := sync.WaitGroup{}
updateWG.Add(64)
for i := 0; i < 64; i++ {
go func() {
defer updateWG.Done()
for j := 0; j < 1024; j++ {
db.RandAction()
}
}()
}
updateWG.Wait()
db.WaitForSync(db2)
network.CloseClient()
wg.Wait()
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
})
run("unstable network", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
sleepTimeout := time.Millisecond
updateWG := sync.WaitGroup{}
updateWG.Add(64)
for i := 0; i < 64; i++ {
go func() {
defer updateWG.Done()
for j := 0; j < 4096; j++ {
time.Sleep(sleepTimeout)
db.RandAction()
}
}()
}
updating := &atomicBool{}
updating.Set(true)
go func() {
updateWG.Wait()
updating.Set(false)
}()
// Recv in background.
recving := &atomicBool{}
recving.Set(true)
go func() {
for {
// Stop when no longer updating and WAL files match.
if !updating.Get() {
if db.MaxSeqNum() == db2.MaxSeqNum() {
recving.Set(false)
return
}
}
if conn := network.Dial(); conn != nil {
db2.SyncRecv(conn)
}
}
}()
// Send in background.
sending := &atomicBool{}
sending.Set(true)
go func() {
for {
// Stop when no longer updating and WAL files match.
if !updating.Get() {
if db.MaxSeqNum() == db2.MaxSeqNum() {
sending.Set(false)
return
}
}
if conn := network.Accept(); conn != nil {
db.SyncSend(conn)
}
}
}()
// Interrupt network periodically as long as sending or receiving.
for sending.Get() || recving.Get() {
time.Sleep(time.Duration(rand.Intn(10 * int(sleepTimeout))))
if rand.Float64() < 0.5 {
network.CloseClient()
} else {
network.CloseServer()
}
}
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
})
}