package wal import ( "encoding/binary" "net" "time" "git.crumpington.com/public/jldb/lib/errs" ) const ( msgTypeRec = 8 msgTypeHeartbeat = 16 msgTypeError = 32 ) func (wal *WAL) Send(conn net.Conn, timeout time.Duration) error { defer conn.Close() var ( seqNum int64 heartbeatTimeout = timeout / 8 ) conn.SetReadDeadline(time.Now().Add(timeout)) if err := binary.Read(conn, binary.LittleEndian, &seqNum); err != nil { return errs.IO.WithErr(err) } conn.SetReadDeadline(time.Time{}) it, err := wal.Iterator(seqNum) if err != nil { return err } defer it.Close() for { if it.Next(heartbeatTimeout) { rec := it.Record() conn.SetWriteDeadline(time.Now().Add(timeout)) if _, err := conn.Write([]byte{msgTypeRec}); err != nil { return errs.IO.WithErr(err) } if _, err := rec.writeTo(conn); err != nil { return err } continue } if it.Error() != nil { conn.SetWriteDeadline(time.Now().Add(timeout)) if _, err := conn.Write([]byte{msgTypeError}); err != nil { return errs.IO.WithErr(err) } err, ok := it.Error().(*errs.Error) if !ok { err = errs.Unexpected.WithErr(err) } err.Write(conn) // w.Flush() return err } conn.SetWriteDeadline(time.Now().Add(timeout)) if _, err := conn.Write([]byte{msgTypeHeartbeat}); err != nil { return errs.IO.WithErr(err) } } }