From 485003204f8388d994678b999a6e99fcf14b5a92 Mon Sep 17 00:00:00 2001 From: jdl Date: Sat, 14 Dec 2024 07:38:06 +0100 Subject: [PATCH] wip --- peer/conndata.go | 14 ++-- peer/connhandler.go | 26 ++----- peer/connstate.go | 149 +++++++++++++---------------------------- peer/duplist.go | 6 +- peer/peer-netreader.go | 5 +- peer/router-types.go | 11 +-- peer/router.go | 31 +++------ 7 files changed, 77 insertions(+), 165 deletions(-) diff --git a/peer/conndata.go b/peer/conndata.go index 24d680a..8b118c6 100644 --- a/peer/conndata.go +++ b/peer/conndata.go @@ -64,17 +64,19 @@ func (d *connData) HandlePeerUpdate(state connState, update peerUpdate) connStat func (d *connData) HandleSendPing() { route := d.route.Load() req := Ping{SentAt: time.Now().UnixMilli()} - req.Marshal(d.buf[:PING_SIZE]) - d.sender.send(PACKET_TYPE_PING, d.buf[:PING_SIZE], route, nil) + d.buf = req.Marshal(d.buf) + d.sender.send(PACKET_TYPE_PING, d.buf, route, nil) d.pingTimer.Reset(pingInterval) } -func (d *connData) sendPong(w wrapper[Ping]) { +func (d *connData) sendPong(w wrapper) { + ping := w.Packet.(*Ping) route := d.route.Load() pong := Pong{ - SentAt: w.T.SentAt, + SentAt: ping.SentAt, RecvdAt: time.Now().UnixMilli(), } - pong.Marshal(d.buf[:PONG_SIZE]) - d.sender.send(PACKET_TYPE_PONG, d.buf[:PONG_SIZE], route, nil) + + d.buf = pong.Marshal(d.buf) + d.sender.send(PACKET_TYPE_PONG, d.buf, route, nil) } diff --git a/peer/connhandler.go b/peer/connhandler.go index a5dea4e..cce64c0 100644 --- a/peer/connhandler.go +++ b/peer/connhandler.go @@ -12,8 +12,7 @@ type connHandler struct { // Communication. mediatorUpdates chan byte peerUpdates chan peerUpdate - pings chan wrapper[Ping] - pongs chan wrapper[Pong] + packets chan wrapper data *connData } @@ -40,8 +39,7 @@ func newConnHandler( h := &connHandler{ mediatorUpdates: make(chan byte, 1), peerUpdates: make(chan peerUpdate, 1), - pings: make(chan wrapper[Ping], 1), - pongs: make(chan wrapper[Pong], 1), + packets: make(chan wrapper, 1), data: d, } @@ -68,15 +66,12 @@ func (h *connHandler) mainLoop() { case update := <-h.peerUpdates: state = data.HandlePeerUpdate(state, update) - case w := <-h.pings: - state = state.HandlePing(w) - - case w := <-h.pongs: - state = state.HandlePong(w) - case <-data.pingTimer.C: data.HandleSendPing() + case w := <-h.packets: + state = state.HandlePacket(w) + case <-data.timeoutTimer.C: log.Printf("[%s] Connection timeout.", state.Name()) state = state.HandleTimeout() @@ -89,16 +84,9 @@ func (h *connHandler) mainLoop() { } } -func (c *connHandler) HandlePing(w wrapper[Ping]) { +func (c *connHandler) HandlePacket(w wrapper) { select { - case c.pings <- w: - default: - } -} - -func (c *connHandler) HandlePong(w wrapper[Pong]) { - select { - case c.pongs <- w: + case c.packets <- w: default: } } diff --git a/peer/connstate.go b/peer/connstate.go index 73853a8..87a322b 100644 --- a/peer/connstate.go +++ b/peer/connstate.go @@ -14,9 +14,7 @@ func logState(s connState, msg string, args ...any) { // The connection state corresponds to what we're connected TO. type connState interface { Name() string - //HandleConnReq(wrapper[ConnReq]) connState - HandlePing(wrapper[Ping]) connState - HandlePong(wrapper[Pong]) connState + HandlePacket(wrapper) connState HandleTimeout() connState } @@ -43,9 +41,7 @@ func newStateFromPeer(peer *m.Peer, data *connData) connState { // Null Connection // ///////////////////// -type connNull struct { - *connData -} +type connNull struct{ *connData } func newConnNull(data *connData) connState { c := connNull{data} @@ -61,24 +57,9 @@ func newConnNull(data *connData) connState { return c } -func (c connNull) Name() string { - return "NoPeer" -} +func (c connNull) Name() string { return "NoPeer" } -func (c connNull) HandleConnReq(w wrapper[ConnReq]) connState { - logState(c, "Ignoring conn request.") - return c -} - -func (c connNull) HandlePing(w wrapper[Ping]) connState { - logState(c, "Ignoring ping.") - return c -} - -func (c connNull) HandlePong(w wrapper[Pong]) connState { - logState(c, "Ignoring pong.") - return c -} +func (c connNull) HandlePacket(w wrapper) connState { return c } func (c connNull) HandleTimeout() connState { logState(c, "Unexpected timeout.") @@ -89,9 +70,7 @@ func (c connNull) HandleTimeout() connState { // Unconnected Server // //////////////////////// -type stateServerDown struct { - *connData -} +type stateServerDown struct{ *connData } func newStateServerDown(data *connData, peer *m.Peer) connState { addr, _ := netip.AddrFromSlice(peer.PublicIP) @@ -111,25 +90,16 @@ func newStateServerDown(data *connData, peer *m.Peer) connState { return c } -func (c stateServerDown) Name() string { - return "Server:DOWN" -} +func (c stateServerDown) Name() string { return "Server:DOWN" } -func (c stateServerDown) HandleConnReq(w wrapper[ConnReq]) connState { - // Send ConnResp. - // TODO +func (c stateServerDown) HandlePacket(w wrapper) connState { + switch p := w.Packet.(type) { + case *Pong: + return newStateServerUp(c.connData, w, p) + } return c } -func (c stateServerDown) HandlePing(w wrapper[Ping]) connState { - logState(c, "Ignoring ping.") - return c -} - -func (c stateServerDown) HandlePong(w wrapper[Pong]) connState { - return newStateServerUp(c.connData, w) -} - func (c stateServerDown) HandleTimeout() connState { logState(c, "Unexpected timeout.") return c @@ -139,11 +109,9 @@ func (c stateServerDown) HandleTimeout() connState { // Connected Server // ////////////////////// -type stateServerUp struct { - *connData -} +type stateServerUp struct{ *connData } -func newStateServerUp(data *connData, w wrapper[Pong]) connState { +func newStateServerUp(data *connData, w wrapper, pong *Pong) connState { c := stateServerUp{data} c.pingTimer.Reset(pingInterval) c.timeoutTimer.Reset(timeoutInterval) @@ -154,18 +122,15 @@ func newStateServerUp(data *connData, w wrapper[Pong]) connState { return c } -func (c stateServerUp) Name() string { - return "Server:UP" -} +func (c stateServerUp) Name() string { return "Server:UP" } -func (c stateServerUp) HandlePing(w wrapper[Ping]) connState { - logState(c, "Ignoring ping.") +func (c stateServerUp) HandlePacket(w wrapper) connState { + switch w.Packet.(type) { + case *Pong: + c.timeoutTimer.Reset(timeoutInterval) + } return c -} -func (c stateServerUp) HandlePong(w wrapper[Pong]) connState { - c.timeoutTimer.Reset(timeoutInterval) - return c } func (c stateServerUp) HandleTimeout() connState { @@ -176,9 +141,7 @@ func (c stateServerUp) HandleTimeout() connState { // Unconnected Client // //////////////////////// -type stateClientDown struct { - *connData -} +type stateClientDown struct{ *connData } func newStateClientDown(data *connData, peer *m.Peer) connState { addr, _ := netip.AddrFromSlice(peer.PublicIP) @@ -200,19 +163,15 @@ func newStateClientDown(data *connData, peer *m.Peer) connState { return c } -func (c stateClientDown) Name() string { - return "Client:DOWN" -} +func (c stateClientDown) Name() string { return "Client:DOWN" } -func (c stateClientDown) HandlePing(w wrapper[Ping]) connState { - log.Printf("Got ping...") - next := newStateClientUp(c.connData, w) - c.sendPong(w) // Have to send after transitionsing so route is ok. - return next -} - -func (c stateClientDown) HandlePong(w wrapper[Pong]) connState { - logState(c, "Ignorning pong.") +func (c stateClientDown) HandlePacket(w wrapper) connState { + switch w.Packet.(type) { + case *Ping: + next := newStateClientUp(c.connData, w) + c.sendPong(w) + return next + } return c } @@ -225,11 +184,9 @@ func (c stateClientDown) HandleTimeout() connState { // Connected Client // ////////////////////// -type stateClientUp struct { - *connData -} +type stateClientUp struct{ *connData } -func newStateClientUp(data *connData, w wrapper[Ping]) connState { +func newStateClientUp(data *connData, w wrapper) connState { c := stateClientUp{data} c.addr = w.SrcAddr c.useMediator = false @@ -241,24 +198,20 @@ func newStateClientUp(data *connData, w wrapper[Ping]) connState { return c } -func (c stateClientUp) Name() string { - return "Client:UP" -} +func (c stateClientUp) Name() string { return "Client:UP" } -func (c stateClientUp) HandlePing(w wrapper[Ping]) connState { - // The connection is from a client. If the client's address changes, we - // should follow that change. - if c.addr != w.SrcAddr { - c.addr = w.SrcAddr - c.route.Store(c.Route()) +func (c stateClientUp) HandlePacket(w wrapper) connState { + switch w.Packet.(type) { + case *Ping: + // The connection is from a client. If the client's address changes, we + // should follow that change. + if c.addr != w.SrcAddr { + c.addr = w.SrcAddr + c.route.Store(c.Route()) + } + c.sendPong(w) + c.timeoutTimer.Reset(timeoutInterval) } - c.sendPong(w) - c.timeoutTimer.Reset(timeoutInterval) - return c -} - -func (c stateClientUp) HandlePong(w wrapper[Pong]) connState { - logState(c, "Ignoring pong.") return c } @@ -270,9 +223,7 @@ func (c stateClientUp) HandleTimeout() connState { // Mediated // ////////////// -type stateMediated struct { - *connData -} +type stateMediated struct{ *connData } func newStateMediated(data *connData, peer *m.Peer) connState { addr, _ := netip.AddrFromSlice(peer.PublicIP) @@ -293,19 +244,9 @@ func newStateMediated(data *connData, peer *m.Peer) connState { return c } -func (c stateMediated) Name() string { - return "Mediated:UP" -} +func (c stateMediated) Name() string { return "Mediated:UP" } -func (c stateMediated) HandlePing(w wrapper[Ping]) connState { - logState(c, "Ignorning ping.") - return c -} - -func (c stateMediated) HandlePong(w wrapper[Pong]) connState { - logState(c, "Ignorning pong.") - return c -} +func (c stateMediated) HandlePacket(w wrapper) connState { return c } func (c stateMediated) HandleTimeout() connState { logState(c, "Unexpected timeout.") diff --git a/peer/duplist.go b/peer/duplist.go index 622efdc..3e6f3d9 100644 --- a/peer/duplist.go +++ b/peer/duplist.go @@ -1,7 +1,9 @@ package peer +const DUP_LIST_SIZE = 32 + type dupList struct { - items [64]uint64 + items [DUP_LIST_SIZE]uint64 index int } @@ -12,6 +14,6 @@ func (l *dupList) isDuplicate(in uint64) bool { } } l.items[l.index] = in - l.index = (l.index + 1) % 64 + l.index = (l.index + 1) % DUP_LIST_SIZE return false } diff --git a/peer/peer-netreader.go b/peer/peer-netreader.go index 2137df9..9586210 100644 --- a/peer/peer-netreader.go +++ b/peer/peer-netreader.go @@ -16,7 +16,7 @@ func (peer *Peer) netReader() { }() var ( - dupList = &dupList{} + dupLists = [MAX_IP]dupList{} n int srcAddr netip.AddrPort nonce Nonce @@ -57,8 +57,7 @@ NEXT_PACKET: goto NEXT_PACKET } - if dupList.isDuplicate(nonce.Counter) { - //if nonce.Counter+64 <= counters[nonce.StreamID][nonce.SourceIP] { + if nonce.Counter <= counters[nonce.StreamID][nonce.SourceIP] { log.Printf("Dropping packet with bad counter: %d (-%d) - %v", nonce.Counter, counters[nonce.StreamID][nonce.SourceIP]-nonce.Counter, srcAddr) goto NEXT_PACKET } diff --git a/peer/router-types.go b/peer/router-types.go index f5242d6..aa1ce40 100644 --- a/peer/router-types.go +++ b/peer/router-types.go @@ -27,15 +27,8 @@ type peerUpdate struct { // ---------------------------------------------------------------------------- // Wrapper for routing packets. -type wrapper[T any] struct { - T T +type wrapper struct { + Packet Packet Nonce Nonce SrcAddr netip.AddrPort } - -func newWrapper[T any](srcAddr netip.AddrPort, nonce Nonce) wrapper[T] { - return wrapper[T]{ - SrcAddr: srcAddr, - Nonce: nonce, - } -} diff --git a/peer/router.go b/peer/router.go index 6786ee5..3a4c90c 100644 --- a/peer/router.go +++ b/peer/router.go @@ -72,27 +72,14 @@ func (r *Router) HandlePacket(src netip.AddrPort, nonce Nonce, data []byte) { return } - switch nonce.PacketType { - case PACKET_TYPE_PING: - if len(data) < PING_SIZE { - log.Printf("Short ping request: %d", len(data)) - return - } - w := newWrapper[Ping](src, nonce) - w.T.Parse(data) - - r.conns[nonce.SourceIP].HandlePing(w) - - case PACKET_TYPE_PONG: - if len(data) < PONG_SIZE { - log.Printf("Short ping response: %d", len(data)) - return - } - w := newWrapper[Pong](src, nonce) - w.T.Parse(data) - r.conns[nonce.SourceIP].HandlePong(w) - - default: - log.Printf("Unknown routing packet type: %d", nonce.PacketType) + packet := unmarshalPacket(nonce, data) + if packet == nil { + return } + + r.conns[nonce.SourceIP].HandlePacket(wrapper{ + Packet: packet, + Nonce: nonce, + SrcAddr: src, + }) }