From 0c8281a1c01d57f9038e7bb6db0d4b9853177fff Mon Sep 17 00:00:00 2001 From: jdl Date: Sat, 30 Jul 2022 09:32:27 +0200 Subject: [PATCH] Added tests, wip. --- kvstore/shipping_test.go | 51 ++++++++++++++++++++++++++++++++++++++++ kvstore/store_test.go | 4 +--- kvstore/sync-recv.go | 6 ++--- 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/kvstore/shipping_test.go b/kvstore/shipping_test.go index e7585c8..ba5097c 100644 --- a/kvstore/shipping_test.go +++ b/kvstore/shipping_test.go @@ -124,6 +124,57 @@ func TestShipping(t *testing.T) { } }) + run("simple concurrent", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) { + M := 1000 + N := 100 + + wg := sync.WaitGroup{} + + // Store M*N values in the background. + for i := 0; i < M; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for i := 0; i < N; i++ { + time.Sleep(time.Millisecond) + prim.randAction() + } + }() + } + + // Send in the background. + wg.Add(1) + go func() { + defer wg.Done() + conn := nw.Accept() + prim.SyncSend(conn) + }() + + // Recv in the background. + wg.Add(1) + go func() { + defer wg.Done() + conn := nw.Dial() + sec.SyncRecv(conn) + }() + + sec.waitForSeqNum(uint64(M * N)) + + nw.CloseServer() + nw.CloseClient() + wg.Wait() + + if err := prim.equalsKV("a", sec); err != nil { + t.Fatal(err) + } + if err := prim.equalsKV("b", sec); err != nil { + t.Fatal(err) + } + if err := prim.equalsKV("c", sec); err != nil { + t.Fatal(err) + } + }) + run("net failures", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) { M := 10 N := 1000 diff --git a/kvstore/store_test.go b/kvstore/store_test.go index 05ac7fe..4c0f1b3 100644 --- a/kvstore/store_test.go +++ b/kvstore/store_test.go @@ -2,7 +2,6 @@ package kvstore import ( "fmt" - "log" "math/rand" "os" "reflect" @@ -15,7 +14,6 @@ import ( func (kv *KV) waitForSeqNum(x uint64) { for { seqNum := kv.MaxSeqNum() - log.Printf("%d/%d", seqNum, x) if seqNum >= x { return } @@ -75,7 +73,7 @@ func (kv *KV) equalsKV(collection string, rhs *KV) error { // ID one of [1,10] var ( randCollections = []string{"a", "b", "c"} - randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} + randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16} ) func (kv *KV) randAction() { diff --git a/kvstore/sync-recv.go b/kvstore/sync-recv.go index 8d0a7b9..f4acf1a 100644 --- a/kvstore/sync-recv.go +++ b/kvstore/sync-recv.go @@ -28,15 +28,15 @@ func (kv *KV) SyncRecv(conn net.Conn) { headerBuf := make([]byte, recHeaderSize) nameBuf := make([]byte, 32) - afterIDBuf := make([]byte, 8) + afterSeqNumBuf := make([]byte, 8) afterSeqNum := kv.MaxSeqNum() expectedSeqNum := afterSeqNum + 1 // Send fromID to the conn. conn.SetWriteDeadline(time.Now().Add(connTimeout)) - binary.LittleEndian.PutUint64(afterIDBuf, afterSeqNum) - if _, err := conn.Write(afterIDBuf); err != nil { + binary.LittleEndian.PutUint64(afterSeqNumBuf, afterSeqNum) + if _, err := conn.Write(afterSeqNumBuf); err != nil { log.Printf("RecvWAL failed to send after sequence number: %v", err) return }