Compare commits

...

3 Commits

Author SHA1 Message Date
jdl 4aec388a15 Cleanup 2022-07-30 09:34:30 +02:00
jdl 0c8281a1c0 Added tests, wip. 2022-07-30 09:32:27 +02:00
jdl 9107aa8818 Code cleanup. 2022-07-30 09:20:54 +02:00
5 changed files with 65 additions and 19 deletions

View File

@ -1,6 +1,7 @@
package kvstore
import (
"sync"
"time"
)
@ -8,29 +9,24 @@ var (
connTimeout = 16 * time.Second
pollInterval = 500 * time.Millisecond
modQSize = 1024
bufferPool = make(chan []byte, 1024)
poolBufSize = 4096
poolBufSize = 8192
bufferPool = sync.Pool{
New: func() any {
return make([]byte, poolBufSize)
},
}
)
func GetDataBuf(size int) []byte {
if size > poolBufSize {
return make([]byte, size)
}
select {
case b := <-bufferPool:
return b[:size]
default:
return make([]byte, poolBufSize)[:size]
}
return bufferPool.Get().([]byte)[:size]
}
func RecycleDataBuf(b []byte) {
if cap(b) != poolBufSize {
return
}
select {
case bufferPool <- b:
default:
}
bufferPool.Put(b)
}

View File

@ -124,6 +124,57 @@ func TestShipping(t *testing.T) {
}
})
run("simple concurrent", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
M := 64
N := 128
wg := sync.WaitGroup{}
// Store M*N values in the background.
for i := 0; i < M; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < N; i++ {
time.Sleep(time.Millisecond)
prim.randAction()
}
}()
}
// Send in the background.
wg.Add(1)
go func() {
defer wg.Done()
conn := nw.Accept()
prim.SyncSend(conn)
}()
// Recv in the background.
wg.Add(1)
go func() {
defer wg.Done()
conn := nw.Dial()
sec.SyncRecv(conn)
}()
sec.waitForSeqNum(uint64(M * N))
nw.CloseServer()
nw.CloseClient()
wg.Wait()
if err := prim.equalsKV("a", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("b", sec); err != nil {
t.Fatal(err)
}
if err := prim.equalsKV("c", sec); err != nil {
t.Fatal(err)
}
})
run("net failures", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
M := 10
N := 1000

View File

@ -2,7 +2,6 @@ package kvstore
import (
"fmt"
"log"
"math/rand"
"os"
"reflect"
@ -15,7 +14,6 @@ import (
func (kv *KV) waitForSeqNum(x uint64) {
for {
seqNum := kv.MaxSeqNum()
log.Printf("%d/%d", seqNum, x)
if seqNum >= x {
return
}
@ -75,7 +73,7 @@ func (kv *KV) equalsKV(collection string, rhs *KV) error {
// ID one of [1,10]
var (
randCollections = []string{"a", "b", "c"}
randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
)
func (kv *KV) randAction() {

View File

@ -28,14 +28,15 @@ func (kv *KV) SyncRecv(conn net.Conn) {
headerBuf := make([]byte, recHeaderSize)
nameBuf := make([]byte, 32)
afterSeqNumBuf := make([]byte, 8)
afterSeqNum := kv.MaxSeqNum()
expectedSeqNum := afterSeqNum + 1
// Send fromID to the conn.
conn.SetWriteDeadline(time.Now().Add(connTimeout))
binary.LittleEndian.PutUint64(nameBuf, afterSeqNum)
if _, err := conn.Write(nameBuf); err != nil {
binary.LittleEndian.PutUint64(afterSeqNumBuf, afterSeqNum)
if _, err := conn.Write(afterSeqNumBuf); err != nil {
log.Printf("RecvWAL failed to send after sequence number: %v", err)
return
}

View File

@ -28,7 +28,7 @@ func (kv *KV) SyncSend(conn net.Conn) {
POLL:
for i := 0; i < 8; i++ {
for i := 0; i < 4; i++ {
if kv.MaxSeqNum() > afterSeqNum {
goto REPLAY
}