This repository has been archived on 2022-07-30. You can view files and clone it, but cannot push or open issues/pull-requests.
mdb/wal/follower.go

142 lines
2.5 KiB
Go

package wal
import (
"database/sql"
"encoding/binary"
"log"
"net"
"time"
)
type Record struct {
SeqNum uint64
Collection string
ID uint64
Store bool
Data []byte
}
type Follower struct {
db *sql.DB
selectStmt *sql.Stmt
}
func NewFollower(walPath string) *Follower {
db := initWAL(walPath)
selectStmt, err := db.Prepare(sqlWALFollowQuery)
must(err)
return &Follower{
db: db,
selectStmt: selectStmt,
}
}
func (f *Follower) Close() {
f.db.Close()
}
func (f *Follower) MaxSeqNum() (n uint64) {
must(f.db.QueryRow(sqlWALMaxSeqNum).Scan(&n))
return
}
func (f *Follower) Replay(afterSeqNum uint64, each func(rec Record) error) error {
rec := Record{}
rows, err := f.selectStmt.Query(afterSeqNum)
must(err)
defer rows.Close()
for rows.Next() {
must(rows.Scan(
&rec.SeqNum,
&rec.Collection,
&rec.ID,
&rec.Store,
&rec.Data))
if err = each(rec); err != nil {
return err
}
}
return nil
}
func (f *Follower) SendWAL(conn net.Conn) {
defer conn.Close()
var (
buf = make([]byte, 8)
headerBuf = make([]byte, recHeaderSize)
empty = make([]byte, recHeaderSize)
tStart time.Time
err error
)
// Read the fromID from the conn.
conn.SetReadDeadline(time.Now().Add(16 * time.Second))
if _, err := conn.Read(buf[:8]); err != nil {
log.Printf("SendWAL failed to read from ID: %v", err)
return
}
afterSeqNum := binary.LittleEndian.Uint64(buf[:8])
POLL:
conn.SetWriteDeadline(time.Now().Add(connTimeout))
tStart = time.Now()
for time.Since(tStart) < heartbeatInterval {
if f.MaxSeqNum() > afterSeqNum {
goto REPLAY
}
time.Sleep(pollInterval)
}
goto HEARTBEAT
HEARTBEAT:
conn.SetWriteDeadline(time.Now().Add(connTimeout))
if _, err := conn.Write(empty); err != nil {
log.Printf("SendWAL failed to send heartbeat: %v", err)
return
}
goto POLL
REPLAY:
err = f.Replay(afterSeqNum, func(rec Record) error {
conn.SetWriteDeadline(time.Now().Add(connTimeout))
afterSeqNum = rec.SeqNum
encodeRecordHeader(rec, headerBuf)
if _, err := conn.Write(headerBuf); err != nil {
log.Printf("SendWAL failed to send header %v", err)
return err
}
if _, err := conn.Write([]byte(rec.Collection)); err != nil {
log.Printf("SendWAL failed to send collection name %v", err)
return err
}
if !rec.Store {
return nil
}
if _, err := conn.Write(rec.Data); err != nil {
log.Printf("SendWAL failed to send data %v", err)
return err
}
return nil
})
if err != nil {
return
}
goto POLL
}