package kvstore import ( "encoding/binary" "log" "net" "time" ) func (kv *KV) SyncSend(conn net.Conn) { defer conn.Close() var ( seqNumBuf = make([]byte, 8) headerBuf = make([]byte, recHeaderSize) empty = make([]byte, recHeaderSize) err error ) // Read afterSeqNum from the conn. conn.SetReadDeadline(time.Now().Add(connTimeout)) if _, err := conn.Read(seqNumBuf[:8]); err != nil { log.Printf("SyncSend failed to read afterSeqNum: %v", err) return } afterSeqNum := binary.LittleEndian.Uint64(seqNumBuf[:8]) POLL: for i := 0; i < 4; i++ { if kv.MaxSeqNum() > afterSeqNum { goto REPLAY } time.Sleep(pollInterval) } goto HEARTBEAT HEARTBEAT: conn.SetWriteDeadline(time.Now().Add(connTimeout)) if _, err := conn.Write(empty); err != nil { log.Printf("SendWAL failed to send heartbeat: %v", err) return } goto POLL REPLAY: err = kv.replay(afterSeqNum, func(rec record) error { conn.SetWriteDeadline(time.Now().Add(connTimeout)) afterSeqNum = rec.SeqNum encodeRecordHeader(rec, headerBuf) if _, err := conn.Write(headerBuf); err != nil { log.Printf("SendWAL failed to send header %v", err) return err } if _, err := conn.Write([]byte(rec.Collection)); err != nil { log.Printf("SendWAL failed to send collection name %v", err) return err } if !rec.Store { return nil } if _, err := conn.Write(rec.Data); err != nil { log.Printf("SendWAL failed to send data %v", err) return err } return nil }) if err != nil { return } goto POLL } func (kv *KV) replay(afterSeqNum uint64, each func(rec record) error) error { rec := record{} rows, err := kv.logIterateStmt.Query(afterSeqNum) must(err) defer rows.Close() rec.Data = GetDataBuf(0) defer RecycleDataBuf(rec.Data) for rows.Next() { must(rows.Scan( &rec.SeqNum, &rec.Collection, &rec.ID, &rec.Store, &rec.Data)) if err = each(rec); err != nil { return err } } return nil }