This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/shipping_test.go

180 lines
3.3 KiB
Go
Raw Permalink Normal View History

2022-07-26 12:02:32 +00:00
package mdb
2022-07-27 13:22:20 +00:00
import (
2022-07-28 05:29:10 +00:00
"math/rand"
2022-07-27 13:22:20 +00:00
"os"
2022-07-27 19:16:53 +00:00
"sync"
2022-07-27 13:22:20 +00:00
"testing"
2022-07-28 05:29:10 +00:00
"time"
2022-07-27 13:22:20 +00:00
"git.crumpington.com/private/mdb/testconn"
)
func TestShipping(t *testing.T) {
run := func(name string, inner func(t *testing.T, db1 *DB, db2 *DB, network *testconn.Network)) {
t.Run(name, func(t *testing.T) {
root1, err := os.MkdirTemp("", "")
must(err)
defer os.RemoveAll(root1)
root2, err := os.MkdirTemp("", "")
must(err)
defer os.RemoveAll(root2)
db1 := OpenDB(root1, true)
defer db1.Close()
db2 := OpenDB(root2, false)
defer db2.Close()
inner(t, db1, db2, testconn.NewNetwork())
})
2022-07-26 12:02:32 +00:00
}
2022-07-27 19:16:53 +00:00
run("simple", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
wg := sync.WaitGroup{}
wg.Add(2)
// Send in background.
go func() {
defer wg.Done()
conn := network.Accept()
db.SyncSend(conn)
}()
// Recv in background.
go func() {
defer wg.Done()
conn := network.Dial()
db2.SyncRecv(conn)
}()
2022-07-28 05:29:10 +00:00
for i := 0; i < 100; i++ {
db.RandAction()
2022-07-27 19:16:53 +00:00
}
2022-07-28 05:29:10 +00:00
db.WaitForSync(db2)
network.CloseClient()
wg.Wait()
2022-07-27 19:16:53 +00:00
2022-07-28 05:29:10 +00:00
if err := db.Equals(db2); err != nil {
2022-07-27 19:16:53 +00:00
t.Fatal(err)
}
2022-07-28 05:29:10 +00:00
})
2022-07-27 19:16:53 +00:00
2022-07-28 05:29:10 +00:00
run("simple multiple writers", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
wg := sync.WaitGroup{}
// Send in background.
wg.Add(1)
go func() {
defer wg.Done()
conn := network.Accept()
db.SyncSend(conn)
}()
// Recv in background.
wg.Add(1)
go func() {
defer wg.Done()
conn := network.Dial()
db2.SyncRecv(conn)
}()
updateWG := sync.WaitGroup{}
updateWG.Add(64)
for i := 0; i < 64; i++ {
go func() {
defer updateWG.Done()
for j := 0; j < 1024; j++ {
db.RandAction()
}
}()
}
updateWG.Wait()
2022-07-27 19:16:53 +00:00
db.WaitForSync(db2)
network.CloseClient()
wg.Wait()
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
2022-07-26 12:02:32 +00:00
})
2022-07-28 05:29:10 +00:00
2022-07-29 19:36:42 +00:00
run("unstable network", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
2022-07-28 05:29:10 +00:00
sleepTimeout := time.Millisecond
updateWG := sync.WaitGroup{}
updateWG.Add(64)
for i := 0; i < 64; i++ {
go func() {
defer updateWG.Done()
for j := 0; j < 4096; j++ {
2022-07-28 05:29:10 +00:00
time.Sleep(sleepTimeout)
db.RandAction()
}
}()
}
updating := &atomicBool{}
updating.Set(true)
go func() {
updateWG.Wait()
updating.Set(false)
}()
// Recv in background.
recving := &atomicBool{}
recving.Set(true)
go func() {
for {
// Stop when no longer updating and WAL files match.
if !updating.Get() {
2022-07-29 19:36:42 +00:00
if db.MaxSeqNum() == db2.MaxSeqNum() {
2022-07-28 05:29:10 +00:00
recving.Set(false)
return
}
}
if conn := network.Dial(); conn != nil {
db2.SyncRecv(conn)
}
}
}()
// Send in background.
sending := &atomicBool{}
sending.Set(true)
go func() {
for {
// Stop when no longer updating and WAL files match.
if !updating.Get() {
2022-07-29 19:36:42 +00:00
if db.MaxSeqNum() == db2.MaxSeqNum() {
2022-07-28 05:29:10 +00:00
sending.Set(false)
return
}
}
if conn := network.Accept(); conn != nil {
db.SyncSend(conn)
}
}
}()
// Interrupt network periodically as long as sending or receiving.
for sending.Get() || recving.Get() {
time.Sleep(time.Duration(rand.Intn(10 * int(sleepTimeout))))
if rand.Float64() < 0.5 {
network.CloseClient()
} else {
network.CloseServer()
}
}
if err := db.Equals(db2); err != nil {
t.Fatal(err)
}
})
2022-07-26 12:02:32 +00:00
}