2023-10-13 09:43:27 +00:00
|
|
|
package wal
|
|
|
|
|
|
|
|
import (
|
|
|
|
"encoding/binary"
|
|
|
|
"io"
|
|
|
|
"net"
|
|
|
|
"time"
|
2023-10-16 08:50:19 +00:00
|
|
|
|
|
|
|
"git.crumpington.com/public/jldb/lib/errs"
|
2023-10-13 09:43:27 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
func (wal *WAL) Recv(conn net.Conn, timeout time.Duration) error {
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
var (
|
|
|
|
rec Record
|
|
|
|
msgType = make([]byte, 1)
|
|
|
|
)
|
|
|
|
|
|
|
|
// Send sequence number.
|
|
|
|
seqNum := wal.Info().LastSeqNum + 1
|
|
|
|
conn.SetWriteDeadline(time.Now().Add(timeout))
|
|
|
|
if err := binary.Write(conn, binary.LittleEndian, seqNum); err != nil {
|
|
|
|
return errs.IO.WithErr(err)
|
|
|
|
}
|
|
|
|
conn.SetWriteDeadline(time.Time{})
|
|
|
|
|
|
|
|
for {
|
|
|
|
conn.SetReadDeadline(time.Now().Add(timeout))
|
|
|
|
|
|
|
|
if _, err := io.ReadFull(conn, msgType); err != nil {
|
|
|
|
return errs.IO.WithErr(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
switch msgType[0] {
|
|
|
|
|
|
|
|
case msgTypeHeartbeat:
|
|
|
|
// Nothing to do.
|
|
|
|
|
|
|
|
case msgTypeError:
|
|
|
|
e := &errs.Error{}
|
|
|
|
if err := e.Read(conn); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
return e
|
|
|
|
|
|
|
|
case msgTypeRec:
|
|
|
|
if err := rec.readFrom(conn); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if _, _, err := wal.appendRecord(rec); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
default:
|
|
|
|
return errs.Unexpected.WithMsg("Unknown message type: %d", msgType[0])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|