This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/kvstore/sync-send.go

109 lines
1.9 KiB
Go

package kvstore
import (
"encoding/binary"
"log"
"net"
"time"
)
func (kv *KV) SyncSend(conn net.Conn) {
defer conn.Close()
var (
seqNumBuf = make([]byte, 8)
headerBuf = make([]byte, recHeaderSize)
empty = make([]byte, recHeaderSize)
err error
)
// Read afterSeqNum from the conn.
conn.SetReadDeadline(time.Now().Add(connTimeout))
if _, err := conn.Read(seqNumBuf[:8]); err != nil {
log.Printf("SyncSend failed to read afterSeqNum: %v", err)
return
}
afterSeqNum := binary.LittleEndian.Uint64(seqNumBuf[:8])
POLL:
for i := 0; i < 4; i++ {
if kv.MaxSeqNum() > afterSeqNum {
goto REPLAY
}
time.Sleep(pollInterval)
}
goto HEARTBEAT
HEARTBEAT:
conn.SetWriteDeadline(time.Now().Add(connTimeout))
if _, err := conn.Write(empty); err != nil {
log.Printf("SendWAL failed to send heartbeat: %v", err)
return
}
goto POLL
REPLAY:
err = kv.replay(afterSeqNum, func(rec record) error {
conn.SetWriteDeadline(time.Now().Add(connTimeout))
afterSeqNum = rec.SeqNum
encodeRecordHeader(rec, headerBuf)
if _, err := conn.Write(headerBuf); err != nil {
log.Printf("SendWAL failed to send header %v", err)
return err
}
if _, err := conn.Write([]byte(rec.Collection)); err != nil {
log.Printf("SendWAL failed to send collection name %v", err)
return err
}
if !rec.Store {
return nil
}
if _, err := conn.Write(rec.Data); err != nil {
log.Printf("SendWAL failed to send data %v", err)
return err
}
return nil
})
if err != nil {
return
}
goto POLL
}
func (kv *KV) replay(afterSeqNum uint64, each func(rec record) error) error {
rec := record{}
rows, err := kv.logIterateStmt.Query(afterSeqNum)
must(err)
defer rows.Close()
rec.Data = GetDataBuf(0)
defer RecycleDataBuf(rec.Data)
for rows.Next() {
must(rows.Scan(
&rec.SeqNum,
&rec.Collection,
&rec.ID,
&rec.Store,
&rec.Data))
if err = each(rec); err != nil {
return err
}
}
return nil
}