package mdb import ( "math/rand" "os" "sync" "testing" "time" "git.crumpington.com/private/mdb/testconn" ) func TestShipping(t *testing.T) { run := func(name string, inner func(t *testing.T, db1 *DB, db2 *DB, network *testconn.Network)) { t.Run(name, func(t *testing.T) { root1, err := os.MkdirTemp("", "") must(err) defer os.RemoveAll(root1) root2, err := os.MkdirTemp("", "") must(err) defer os.RemoveAll(root2) db1 := OpenDB(root1, true) defer db1.Close() db2 := OpenDB(root2, false) defer db2.Close() inner(t, db1, db2, testconn.NewNetwork()) }) } run("simple", func(t *testing.T, db, db2 *DB, network *testconn.Network) { wg := sync.WaitGroup{} wg.Add(2) // Send in background. go func() { defer wg.Done() conn := network.Accept() db.SyncSend(conn) }() // Recv in background. go func() { defer wg.Done() conn := network.Dial() db2.SyncRecv(conn) }() for i := 0; i < 100; i++ { db.RandAction() } db.WaitForSync(db2) network.CloseClient() wg.Wait() if err := db.Equals(db2); err != nil { t.Fatal(err) } }) run("simple multiple writers", func(t *testing.T, db, db2 *DB, network *testconn.Network) { wg := sync.WaitGroup{} // Send in background. wg.Add(1) go func() { defer wg.Done() conn := network.Accept() db.SyncSend(conn) }() // Recv in background. wg.Add(1) go func() { defer wg.Done() conn := network.Dial() db2.SyncRecv(conn) }() updateWG := sync.WaitGroup{} updateWG.Add(64) for i := 0; i < 64; i++ { go func() { defer updateWG.Done() for j := 0; j < 1024; j++ { db.RandAction() } }() } updateWG.Wait() db.WaitForSync(db2) network.CloseClient() wg.Wait() if err := db.Equals(db2); err != nil { t.Fatal(err) } }) run("unstable network", func(t *testing.T, db, db2 *DB, network *testconn.Network) { sleepTimeout := time.Millisecond updateWG := sync.WaitGroup{} updateWG.Add(64) for i := 0; i < 64; i++ { go func() { defer updateWG.Done() for j := 0; j < 4096; j++ { time.Sleep(sleepTimeout) db.RandAction() } }() } updating := &atomicBool{} updating.Set(true) go func() { updateWG.Wait() updating.Set(false) }() // Recv in background. recving := &atomicBool{} recving.Set(true) go func() { for { // Stop when no longer updating and WAL files match. if !updating.Get() { if db.MaxSeqNum() == db2.MaxSeqNum() { recving.Set(false) return } } if conn := network.Dial(); conn != nil { db2.SyncRecv(conn) } } }() // Send in background. sending := &atomicBool{} sending.Set(true) go func() { for { // Stop when no longer updating and WAL files match. if !updating.Get() { if db.MaxSeqNum() == db2.MaxSeqNum() { sending.Set(false) return } } if conn := network.Accept(); conn != nil { db.SyncSend(conn) } } }() // Interrupt network periodically as long as sending or receiving. for sending.Get() || recving.Get() { time.Sleep(time.Duration(rand.Intn(10 * int(sleepTimeout)))) if rand.Float64() < 0.5 { network.CloseClient() } else { network.CloseServer() } } if err := db.Equals(db2); err != nil { t.Fatal(err) } }) }