187 lines
3.4 KiB
Go
187 lines
3.4 KiB
Go
|
package mdb
|
||
|
|
||
|
/*Copyright (c) 2022, John David Lee
|
||
|
All rights reserved.
|
||
|
|
||
|
This source code is licensed under the BSD-style license found in the
|
||
|
LICENSE file in the root directory of this source tree.
|
||
|
*/
|
||
|
|
||
|
import (
|
||
|
"math/rand"
|
||
|
"os"
|
||
|
"sync"
|
||
|
"testing"
|
||
|
"time"
|
||
|
|
||
|
"git.crumpington.com/public/mdb/testconn"
|
||
|
)
|
||
|
|
||
|
func TestShipping(t *testing.T) {
|
||
|
run := func(name string, inner func(t *testing.T, db1 *DB, db2 *DB, network *testconn.Network)) {
|
||
|
t.Run(name, func(t *testing.T) {
|
||
|
root1, err := os.MkdirTemp("", "")
|
||
|
must(err)
|
||
|
defer os.RemoveAll(root1)
|
||
|
root2, err := os.MkdirTemp("", "")
|
||
|
must(err)
|
||
|
defer os.RemoveAll(root2)
|
||
|
|
||
|
db1 := OpenDB(root1, true)
|
||
|
defer db1.Close()
|
||
|
db2 := OpenDB(root2, false)
|
||
|
defer db2.Close()
|
||
|
|
||
|
inner(t, db1, db2, testconn.NewNetwork())
|
||
|
})
|
||
|
}
|
||
|
|
||
|
run("simple", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
|
||
|
wg := sync.WaitGroup{}
|
||
|
wg.Add(2)
|
||
|
|
||
|
// Send in background.
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
conn := network.Accept()
|
||
|
db.SyncSend(conn)
|
||
|
}()
|
||
|
|
||
|
// Recv in background.
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
conn := network.Dial()
|
||
|
db2.SyncRecv(conn)
|
||
|
}()
|
||
|
|
||
|
for i := 0; i < 100; i++ {
|
||
|
db.RandAction()
|
||
|
}
|
||
|
|
||
|
db.WaitForSync(db2)
|
||
|
network.CloseClient()
|
||
|
wg.Wait()
|
||
|
|
||
|
if err := db.Equals(db2); err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
run("simple multiple writers", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
|
||
|
wg := sync.WaitGroup{}
|
||
|
|
||
|
// Send in background.
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
conn := network.Accept()
|
||
|
db.SyncSend(conn)
|
||
|
}()
|
||
|
|
||
|
// Recv in background.
|
||
|
wg.Add(1)
|
||
|
go func() {
|
||
|
defer wg.Done()
|
||
|
conn := network.Dial()
|
||
|
db2.SyncRecv(conn)
|
||
|
}()
|
||
|
|
||
|
updateWG := sync.WaitGroup{}
|
||
|
updateWG.Add(64)
|
||
|
for i := 0; i < 64; i++ {
|
||
|
go func() {
|
||
|
defer updateWG.Done()
|
||
|
for j := 0; j < 1024; j++ {
|
||
|
db.RandAction()
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
updateWG.Wait()
|
||
|
|
||
|
db.WaitForSync(db2)
|
||
|
network.CloseClient()
|
||
|
wg.Wait()
|
||
|
|
||
|
if err := db.Equals(db2); err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
run("unstable network", func(t *testing.T, db, db2 *DB, network *testconn.Network) {
|
||
|
sleepTimeout := time.Millisecond
|
||
|
|
||
|
updateWG := sync.WaitGroup{}
|
||
|
updateWG.Add(64)
|
||
|
for i := 0; i < 64; i++ {
|
||
|
go func() {
|
||
|
defer updateWG.Done()
|
||
|
for j := 0; j < 4096; j++ {
|
||
|
time.Sleep(sleepTimeout)
|
||
|
db.RandAction()
|
||
|
}
|
||
|
}()
|
||
|
}
|
||
|
|
||
|
updating := &atomicBool{}
|
||
|
updating.Set(true)
|
||
|
|
||
|
go func() {
|
||
|
updateWG.Wait()
|
||
|
updating.Set(false)
|
||
|
}()
|
||
|
|
||
|
// Recv in background.
|
||
|
recving := &atomicBool{}
|
||
|
recving.Set(true)
|
||
|
go func() {
|
||
|
for {
|
||
|
// Stop when no longer updating and WAL files match.
|
||
|
if !updating.Get() {
|
||
|
if db.MaxSeqNum() == db2.MaxSeqNum() {
|
||
|
recving.Set(false)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if conn := network.Dial(); conn != nil {
|
||
|
db2.SyncRecv(conn)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Send in background.
|
||
|
sending := &atomicBool{}
|
||
|
sending.Set(true)
|
||
|
go func() {
|
||
|
for {
|
||
|
// Stop when no longer updating and WAL files match.
|
||
|
if !updating.Get() {
|
||
|
if db.MaxSeqNum() == db2.MaxSeqNum() {
|
||
|
sending.Set(false)
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if conn := network.Accept(); conn != nil {
|
||
|
db.SyncSend(conn)
|
||
|
}
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
// Interrupt network periodically as long as sending or receiving.
|
||
|
for sending.Get() || recving.Get() {
|
||
|
time.Sleep(time.Duration(rand.Intn(10 * int(sleepTimeout))))
|
||
|
if rand.Float64() < 0.5 {
|
||
|
network.CloseClient()
|
||
|
} else {
|
||
|
network.CloseServer()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if err := db.Equals(db2); err != nil {
|
||
|
t.Fatal(err)
|
||
|
}
|
||
|
})
|
||
|
}
|