Added tests, wip.
parent
9107aa8818
commit
0c8281a1c0
|
@ -124,6 +124,57 @@ func TestShipping(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
run("simple concurrent", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
|
||||
M := 1000
|
||||
N := 100
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
|
||||
// Store M*N values in the background.
|
||||
for i := 0; i < M; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < N; i++ {
|
||||
time.Sleep(time.Millisecond)
|
||||
prim.randAction()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Send in the background.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
conn := nw.Accept()
|
||||
prim.SyncSend(conn)
|
||||
}()
|
||||
|
||||
// Recv in the background.
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
conn := nw.Dial()
|
||||
sec.SyncRecv(conn)
|
||||
}()
|
||||
|
||||
sec.waitForSeqNum(uint64(M * N))
|
||||
|
||||
nw.CloseServer()
|
||||
nw.CloseClient()
|
||||
wg.Wait()
|
||||
|
||||
if err := prim.equalsKV("a", sec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := prim.equalsKV("b", sec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := prim.equalsKV("c", sec); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
})
|
||||
|
||||
run("net failures", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
|
||||
M := 10
|
||||
N := 1000
|
||||
|
|
|
@ -2,7 +2,6 @@ package kvstore
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"reflect"
|
||||
|
@ -15,7 +14,6 @@ import (
|
|||
func (kv *KV) waitForSeqNum(x uint64) {
|
||||
for {
|
||||
seqNum := kv.MaxSeqNum()
|
||||
log.Printf("%d/%d", seqNum, x)
|
||||
if seqNum >= x {
|
||||
return
|
||||
}
|
||||
|
@ -75,7 +73,7 @@ func (kv *KV) equalsKV(collection string, rhs *KV) error {
|
|||
// ID one of [1,10]
|
||||
var (
|
||||
randCollections = []string{"a", "b", "c"}
|
||||
randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
|
||||
randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||
)
|
||||
|
||||
func (kv *KV) randAction() {
|
||||
|
|
|
@ -28,15 +28,15 @@ func (kv *KV) SyncRecv(conn net.Conn) {
|
|||
|
||||
headerBuf := make([]byte, recHeaderSize)
|
||||
nameBuf := make([]byte, 32)
|
||||
afterIDBuf := make([]byte, 8)
|
||||
afterSeqNumBuf := make([]byte, 8)
|
||||
|
||||
afterSeqNum := kv.MaxSeqNum()
|
||||
expectedSeqNum := afterSeqNum + 1
|
||||
|
||||
// Send fromID to the conn.
|
||||
conn.SetWriteDeadline(time.Now().Add(connTimeout))
|
||||
binary.LittleEndian.PutUint64(afterIDBuf, afterSeqNum)
|
||||
if _, err := conn.Write(afterIDBuf); err != nil {
|
||||
binary.LittleEndian.PutUint64(afterSeqNumBuf, afterSeqNum)
|
||||
if _, err := conn.Write(afterSeqNumBuf); err != nil {
|
||||
log.Printf("RecvWAL failed to send after sequence number: %v", err)
|
||||
return
|
||||
}
|
||||
|
|
Reference in New Issue