Compare commits
3 Commits
Author | SHA1 | Date |
---|---|---|
jdl | 4aec388a15 | |
jdl | 0c8281a1c0 | |
jdl | 9107aa8818 |
|
@ -1,6 +1,7 @@
|
||||||
package kvstore
|
package kvstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -8,29 +9,24 @@ var (
|
||||||
connTimeout = 16 * time.Second
|
connTimeout = 16 * time.Second
|
||||||
pollInterval = 500 * time.Millisecond
|
pollInterval = 500 * time.Millisecond
|
||||||
modQSize = 1024
|
modQSize = 1024
|
||||||
|
poolBufSize = 8192
|
||||||
bufferPool = make(chan []byte, 1024)
|
bufferPool = sync.Pool{
|
||||||
poolBufSize = 4096
|
New: func() any {
|
||||||
|
return make([]byte, poolBufSize)
|
||||||
|
},
|
||||||
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
func GetDataBuf(size int) []byte {
|
func GetDataBuf(size int) []byte {
|
||||||
if size > poolBufSize {
|
if size > poolBufSize {
|
||||||
return make([]byte, size)
|
return make([]byte, size)
|
||||||
}
|
}
|
||||||
select {
|
return bufferPool.Get().([]byte)[:size]
|
||||||
case b := <-bufferPool:
|
|
||||||
return b[:size]
|
|
||||||
default:
|
|
||||||
return make([]byte, poolBufSize)[:size]
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func RecycleDataBuf(b []byte) {
|
func RecycleDataBuf(b []byte) {
|
||||||
if cap(b) != poolBufSize {
|
if cap(b) != poolBufSize {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
bufferPool.Put(b)
|
||||||
case bufferPool <- b:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -124,6 +124,57 @@ func TestShipping(t *testing.T) {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
run("simple concurrent", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
|
||||||
|
M := 64
|
||||||
|
N := 128
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
|
||||||
|
// Store M*N values in the background.
|
||||||
|
for i := 0; i < M; i++ {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
for i := 0; i < N; i++ {
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
prim.randAction()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send in the background.
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
conn := nw.Accept()
|
||||||
|
prim.SyncSend(conn)
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Recv in the background.
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
defer wg.Done()
|
||||||
|
conn := nw.Dial()
|
||||||
|
sec.SyncRecv(conn)
|
||||||
|
}()
|
||||||
|
|
||||||
|
sec.waitForSeqNum(uint64(M * N))
|
||||||
|
|
||||||
|
nw.CloseServer()
|
||||||
|
nw.CloseClient()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
if err := prim.equalsKV("a", sec); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := prim.equalsKV("b", sec); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err := prim.equalsKV("c", sec); err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
run("net failures", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
|
run("net failures", func(t *testing.T, pDir, sDir string, prim, sec *KV, cbs *callbacks, nw *testconn.Network) {
|
||||||
M := 10
|
M := 10
|
||||||
N := 1000
|
N := 1000
|
||||||
|
|
|
@ -2,7 +2,6 @@ package kvstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"os"
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
|
@ -15,7 +14,6 @@ import (
|
||||||
func (kv *KV) waitForSeqNum(x uint64) {
|
func (kv *KV) waitForSeqNum(x uint64) {
|
||||||
for {
|
for {
|
||||||
seqNum := kv.MaxSeqNum()
|
seqNum := kv.MaxSeqNum()
|
||||||
log.Printf("%d/%d", seqNum, x)
|
|
||||||
if seqNum >= x {
|
if seqNum >= x {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
@ -75,7 +73,7 @@ func (kv *KV) equalsKV(collection string, rhs *KV) error {
|
||||||
// ID one of [1,10]
|
// ID one of [1,10]
|
||||||
var (
|
var (
|
||||||
randCollections = []string{"a", "b", "c"}
|
randCollections = []string{"a", "b", "c"}
|
||||||
randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
|
randIDs = []uint64{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16}
|
||||||
)
|
)
|
||||||
|
|
||||||
func (kv *KV) randAction() {
|
func (kv *KV) randAction() {
|
||||||
|
|
|
@ -28,14 +28,15 @@ func (kv *KV) SyncRecv(conn net.Conn) {
|
||||||
|
|
||||||
headerBuf := make([]byte, recHeaderSize)
|
headerBuf := make([]byte, recHeaderSize)
|
||||||
nameBuf := make([]byte, 32)
|
nameBuf := make([]byte, 32)
|
||||||
|
afterSeqNumBuf := make([]byte, 8)
|
||||||
|
|
||||||
afterSeqNum := kv.MaxSeqNum()
|
afterSeqNum := kv.MaxSeqNum()
|
||||||
expectedSeqNum := afterSeqNum + 1
|
expectedSeqNum := afterSeqNum + 1
|
||||||
|
|
||||||
// Send fromID to the conn.
|
// Send fromID to the conn.
|
||||||
conn.SetWriteDeadline(time.Now().Add(connTimeout))
|
conn.SetWriteDeadline(time.Now().Add(connTimeout))
|
||||||
binary.LittleEndian.PutUint64(nameBuf, afterSeqNum)
|
binary.LittleEndian.PutUint64(afterSeqNumBuf, afterSeqNum)
|
||||||
if _, err := conn.Write(nameBuf); err != nil {
|
if _, err := conn.Write(afterSeqNumBuf); err != nil {
|
||||||
log.Printf("RecvWAL failed to send after sequence number: %v", err)
|
log.Printf("RecvWAL failed to send after sequence number: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ func (kv *KV) SyncSend(conn net.Conn) {
|
||||||
|
|
||||||
POLL:
|
POLL:
|
||||||
|
|
||||||
for i := 0; i < 8; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
if kv.MaxSeqNum() > afterSeqNum {
|
if kv.MaxSeqNum() > afterSeqNum {
|
||||||
goto REPLAY
|
goto REPLAY
|
||||||
}
|
}
|
||||||
|
|
Reference in New Issue