This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/kvstore/sync-send.go

109 lines
1.9 KiB
Go
Raw Normal View History

2022-07-29 19:36:42 +00:00
package kvstore
2022-07-25 05:39:16 +00:00
import (
2022-07-25 20:47:26 +00:00
"encoding/binary"
"log"
"net"
"time"
2022-07-25 05:39:16 +00:00
)
2022-07-29 19:36:42 +00:00
func (kv *KV) SyncSend(conn net.Conn) {
2022-07-25 20:47:26 +00:00
defer conn.Close()
var (
2022-07-29 19:36:42 +00:00
seqNumBuf = make([]byte, 8)
2022-07-26 05:58:53 +00:00
headerBuf = make([]byte, recHeaderSize)
empty = make([]byte, recHeaderSize)
err error
2022-07-25 20:47:26 +00:00
)
2022-07-29 19:36:42 +00:00
// Read afterSeqNum from the conn.
conn.SetReadDeadline(time.Now().Add(connTimeout))
if _, err := conn.Read(seqNumBuf[:8]); err != nil {
log.Printf("SyncSend failed to read afterSeqNum: %v", err)
2022-07-25 20:47:26 +00:00
return
}
2022-07-29 19:36:42 +00:00
afterSeqNum := binary.LittleEndian.Uint64(seqNumBuf[:8])
2022-07-25 20:47:26 +00:00
POLL:
2022-07-30 07:34:30 +00:00
for i := 0; i < 4; i++ {
2022-07-29 19:36:42 +00:00
if kv.MaxSeqNum() > afterSeqNum {
2022-07-25 20:47:26 +00:00
goto REPLAY
}
time.Sleep(pollInterval)
}
2022-07-30 03:58:35 +00:00
2022-07-25 20:47:26 +00:00
goto HEARTBEAT
HEARTBEAT:
2022-07-26 05:58:53 +00:00
conn.SetWriteDeadline(time.Now().Add(connTimeout))
2022-07-25 20:47:26 +00:00
if _, err := conn.Write(empty); err != nil {
log.Printf("SendWAL failed to send heartbeat: %v", err)
return
}
goto POLL
REPLAY:
2022-07-29 19:36:42 +00:00
err = kv.replay(afterSeqNum, func(rec record) error {
2022-07-26 05:58:53 +00:00
conn.SetWriteDeadline(time.Now().Add(connTimeout))
2022-07-25 20:47:26 +00:00
afterSeqNum = rec.SeqNum
encodeRecordHeader(rec, headerBuf)
if _, err := conn.Write(headerBuf); err != nil {
log.Printf("SendWAL failed to send header %v", err)
return err
}
if _, err := conn.Write([]byte(rec.Collection)); err != nil {
log.Printf("SendWAL failed to send collection name %v", err)
return err
}
if !rec.Store {
return nil
}
if _, err := conn.Write(rec.Data); err != nil {
log.Printf("SendWAL failed to send data %v", err)
return err
}
return nil
})
if err != nil {
return
}
goto POLL
}
2022-07-29 19:36:42 +00:00
func (kv *KV) replay(afterSeqNum uint64, each func(rec record) error) error {
rec := record{}
rows, err := kv.logIterateStmt.Query(afterSeqNum)
must(err)
defer rows.Close()
2022-07-30 03:58:35 +00:00
rec.Data = GetDataBuf(0)
defer RecycleDataBuf(rec.Data)
2022-07-29 19:36:42 +00:00
for rows.Next() {
must(rows.Scan(
&rec.SeqNum,
&rec.Collection,
&rec.ID,
&rec.Store,
&rec.Data))
2022-07-30 03:58:35 +00:00
2022-07-29 19:36:42 +00:00
if err = each(rec); err != nil {
return err
}
}
return nil
}