This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/kvstore/shipping_test.go

247 lines
4.5 KiB
Go

package kvstore
import (
"math/rand"
"os"
"sync"
"testing"
"time"
"git.crumpington.com/private/mdb/testconn"
)
// ----------------------------------------------------------------------------
// Stores info from secondary callbacks.
type callbacks struct {
lock sync.Mutex
m map[string]map[uint64]string
}
func (sc *callbacks) onStore(c string, id uint64, data []byte) {
sc.lock.Lock()
defer sc.lock.Unlock()
if _, ok := sc.m[c]; !ok {
sc.m[c] = map[uint64]string{}
}
sc.m[c][id] = string(data)
}
func (sc *callbacks) onDelete(c string, id uint64) {
sc.lock.Lock()
defer sc.lock.Unlock()
if _, ok := sc.m[c]; !ok {
return
}
delete(sc.m[c], id)
}
// ----------------------------------------------------------------------------
func TestShipping(t *testing.T) {
run := func(name string, inner func(
t *testing.T,
pDir string,
sDir string,
primary *KV,
secondary *KV,
cbs *callbacks,
nw *testconn.Network,
)) {
t.Run(name, func(t *testing.T) {
pDir, _ := os.MkdirTemp("", "")
defer os.RemoveAll(pDir)
sDir, _ := os.MkdirTemp("", "")
defer os.RemoveAll(sDir)
nw := testconn.NewNetwork()
defer func() {
nw.CloseServer()
nw.CloseClient()
}()
cbs := &callbacks{
m: map[string]map[uint64]string{},
}
prim := NewPrimary(pDir)
defer prim.Close()
sec := NewSecondary(sDir, cbs.onStore, cbs.onDelete)
defer sec.Close()
inner(t, pDir, sDir, prim, sec, cbs, nw)
})
}
run("simple", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
M := 10
N := 1000
wg := sync.WaitGroup{}
// Store M*N values in the background.
for i := 0; i < M; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
time.Sleep(time.Millisecond)
prim.randAction()
}
}()
}
// Send in the background.
wg.Add(1)
go func() {
defer wg.Done()
conn := nw.Accept()
prim.SyncSend(conn)
}()
// Recv in the background.
wg.Add(1)
go func() {
defer wg.Done()
conn := nw.Dial()
sec.SyncRecv(conn)
}()
sec.waitForSeqNum(uint64(M * N))
nw.CloseServer()
nw.CloseClient()
wg.Wait()
if err := prim.equalsKV("a", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("b", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("c", sec); err != nil {
t.Fatal(err)
}
})
run("simple concurrent", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
M := 64
N := 128
wg := sync.WaitGroup{}
// Store M*N values in the background.
for i := 0; i < M; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
time.Sleep(time.Millisecond)
prim.randAction()
}
}()
}
// Send in the background.
wg.Add(1)
go func() {
defer wg.Done()
conn := nw.Accept()
prim.SyncSend(conn)
}()
// Recv in the background.
wg.Add(1)
go func() {
defer wg.Done()
conn := nw.Dial()
sec.SyncRecv(conn)
}()
sec.waitForSeqNum(uint64(M * N))
nw.CloseServer()
nw.CloseClient()
wg.Wait()
if err := prim.equalsKV("a", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("b", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("c", sec); err != nil {
t.Fatal(err)
}
})
run("net failures", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
M := 10
N := 1000
sleepTime := time.Millisecond
wg := sync.WaitGroup{}
// Store M*N values in the background.
for i := 0; i < M; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for j := 0; j < N; j++ {
time.Sleep(sleepTime)
prim.randAction()
}
}()
}
// Send in the background.
wg.Add(1)
go func() {
defer wg.Done()
for sec.MaxSeqNum() < uint64(M*N) {
if conn := nw.Accept(); conn != nil {
prim.SyncSend(conn)
}
}
}()
// Recv in the background.
wg.Add(1)
go func() {
defer wg.Done()
for sec.MaxSeqNum() < uint64(M*N) {
if conn := nw.Dial(); conn != nil {
sec.SyncRecv(conn)
}
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for sec.MaxSeqNum() < uint64(M*N) {
time.Sleep(time.Duration(rand.Intn(10 * int(sleepTime))))
if rand.Float64() < 0.5 {
nw.CloseClient()
} else {
nw.CloseServer()
}
}
}()
sec.waitForSeqNum(prim.MaxSeqNum())
wg.Wait()
if err := prim.equalsKV("a", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("b", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("c", sec); err != nil {
t.Fatal(err)
}
})
}