This commit is contained in:
jdl 2024-12-14 07:38:06 +01:00
parent 6fcd8637a3
commit 485003204f
7 changed files with 77 additions and 165 deletions

View File

@ -64,17 +64,19 @@ func (d *connData) HandlePeerUpdate(state connState, update peerUpdate) connStat
func (d *connData) HandleSendPing() { func (d *connData) HandleSendPing() {
route := d.route.Load() route := d.route.Load()
req := Ping{SentAt: time.Now().UnixMilli()} req := Ping{SentAt: time.Now().UnixMilli()}
req.Marshal(d.buf[:PING_SIZE]) d.buf = req.Marshal(d.buf)
d.sender.send(PACKET_TYPE_PING, d.buf[:PING_SIZE], route, nil) d.sender.send(PACKET_TYPE_PING, d.buf, route, nil)
d.pingTimer.Reset(pingInterval) d.pingTimer.Reset(pingInterval)
} }
func (d *connData) sendPong(w wrapper[Ping]) { func (d *connData) sendPong(w wrapper) {
ping := w.Packet.(*Ping)
route := d.route.Load() route := d.route.Load()
pong := Pong{ pong := Pong{
SentAt: w.T.SentAt, SentAt: ping.SentAt,
RecvdAt: time.Now().UnixMilli(), RecvdAt: time.Now().UnixMilli(),
} }
pong.Marshal(d.buf[:PONG_SIZE])
d.sender.send(PACKET_TYPE_PONG, d.buf[:PONG_SIZE], route, nil) d.buf = pong.Marshal(d.buf)
d.sender.send(PACKET_TYPE_PONG, d.buf, route, nil)
} }

View File

@ -12,8 +12,7 @@ type connHandler struct {
// Communication. // Communication.
mediatorUpdates chan byte mediatorUpdates chan byte
peerUpdates chan peerUpdate peerUpdates chan peerUpdate
pings chan wrapper[Ping] packets chan wrapper
pongs chan wrapper[Pong]
data *connData data *connData
} }
@ -40,8 +39,7 @@ func newConnHandler(
h := &connHandler{ h := &connHandler{
mediatorUpdates: make(chan byte, 1), mediatorUpdates: make(chan byte, 1),
peerUpdates: make(chan peerUpdate, 1), peerUpdates: make(chan peerUpdate, 1),
pings: make(chan wrapper[Ping], 1), packets: make(chan wrapper, 1),
pongs: make(chan wrapper[Pong], 1),
data: d, data: d,
} }
@ -68,15 +66,12 @@ func (h *connHandler) mainLoop() {
case update := <-h.peerUpdates: case update := <-h.peerUpdates:
state = data.HandlePeerUpdate(state, update) state = data.HandlePeerUpdate(state, update)
case w := <-h.pings:
state = state.HandlePing(w)
case w := <-h.pongs:
state = state.HandlePong(w)
case <-data.pingTimer.C: case <-data.pingTimer.C:
data.HandleSendPing() data.HandleSendPing()
case w := <-h.packets:
state = state.HandlePacket(w)
case <-data.timeoutTimer.C: case <-data.timeoutTimer.C:
log.Printf("[%s] Connection timeout.", state.Name()) log.Printf("[%s] Connection timeout.", state.Name())
state = state.HandleTimeout() state = state.HandleTimeout()
@ -89,16 +84,9 @@ func (h *connHandler) mainLoop() {
} }
} }
func (c *connHandler) HandlePing(w wrapper[Ping]) { func (c *connHandler) HandlePacket(w wrapper) {
select { select {
case c.pings <- w: case c.packets <- w:
default:
}
}
func (c *connHandler) HandlePong(w wrapper[Pong]) {
select {
case c.pongs <- w:
default: default:
} }
} }

View File

@ -14,9 +14,7 @@ func logState(s connState, msg string, args ...any) {
// The connection state corresponds to what we're connected TO. // The connection state corresponds to what we're connected TO.
type connState interface { type connState interface {
Name() string Name() string
//HandleConnReq(wrapper[ConnReq]) connState HandlePacket(wrapper) connState
HandlePing(wrapper[Ping]) connState
HandlePong(wrapper[Pong]) connState
HandleTimeout() connState HandleTimeout() connState
} }
@ -43,9 +41,7 @@ func newStateFromPeer(peer *m.Peer, data *connData) connState {
// Null Connection // // Null Connection //
///////////////////// /////////////////////
type connNull struct { type connNull struct{ *connData }
*connData
}
func newConnNull(data *connData) connState { func newConnNull(data *connData) connState {
c := connNull{data} c := connNull{data}
@ -61,24 +57,9 @@ func newConnNull(data *connData) connState {
return c return c
} }
func (c connNull) Name() string { func (c connNull) Name() string { return "NoPeer" }
return "NoPeer"
}
func (c connNull) HandleConnReq(w wrapper[ConnReq]) connState { func (c connNull) HandlePacket(w wrapper) connState { return c }
logState(c, "Ignoring conn request.")
return c
}
func (c connNull) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.")
return c
}
func (c connNull) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignoring pong.")
return c
}
func (c connNull) HandleTimeout() connState { func (c connNull) HandleTimeout() connState {
logState(c, "Unexpected timeout.") logState(c, "Unexpected timeout.")
@ -89,9 +70,7 @@ func (c connNull) HandleTimeout() connState {
// Unconnected Server // // Unconnected Server //
//////////////////////// ////////////////////////
type stateServerDown struct { type stateServerDown struct{ *connData }
*connData
}
func newStateServerDown(data *connData, peer *m.Peer) connState { func newStateServerDown(data *connData, peer *m.Peer) connState {
addr, _ := netip.AddrFromSlice(peer.PublicIP) addr, _ := netip.AddrFromSlice(peer.PublicIP)
@ -111,25 +90,16 @@ func newStateServerDown(data *connData, peer *m.Peer) connState {
return c return c
} }
func (c stateServerDown) Name() string { func (c stateServerDown) Name() string { return "Server:DOWN" }
return "Server:DOWN"
}
func (c stateServerDown) HandleConnReq(w wrapper[ConnReq]) connState { func (c stateServerDown) HandlePacket(w wrapper) connState {
// Send ConnResp. switch p := w.Packet.(type) {
// TODO case *Pong:
return newStateServerUp(c.connData, w, p)
}
return c return c
} }
func (c stateServerDown) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.")
return c
}
func (c stateServerDown) HandlePong(w wrapper[Pong]) connState {
return newStateServerUp(c.connData, w)
}
func (c stateServerDown) HandleTimeout() connState { func (c stateServerDown) HandleTimeout() connState {
logState(c, "Unexpected timeout.") logState(c, "Unexpected timeout.")
return c return c
@ -139,11 +109,9 @@ func (c stateServerDown) HandleTimeout() connState {
// Connected Server // // Connected Server //
////////////////////// //////////////////////
type stateServerUp struct { type stateServerUp struct{ *connData }
*connData
}
func newStateServerUp(data *connData, w wrapper[Pong]) connState { func newStateServerUp(data *connData, w wrapper, pong *Pong) connState {
c := stateServerUp{data} c := stateServerUp{data}
c.pingTimer.Reset(pingInterval) c.pingTimer.Reset(pingInterval)
c.timeoutTimer.Reset(timeoutInterval) c.timeoutTimer.Reset(timeoutInterval)
@ -154,18 +122,15 @@ func newStateServerUp(data *connData, w wrapper[Pong]) connState {
return c return c
} }
func (c stateServerUp) Name() string { func (c stateServerUp) Name() string { return "Server:UP" }
return "Server:UP"
}
func (c stateServerUp) HandlePing(w wrapper[Ping]) connState { func (c stateServerUp) HandlePacket(w wrapper) connState {
logState(c, "Ignoring ping.") switch w.Packet.(type) {
return c case *Pong:
}
func (c stateServerUp) HandlePong(w wrapper[Pong]) connState {
c.timeoutTimer.Reset(timeoutInterval) c.timeoutTimer.Reset(timeoutInterval)
}
return c return c
} }
func (c stateServerUp) HandleTimeout() connState { func (c stateServerUp) HandleTimeout() connState {
@ -176,9 +141,7 @@ func (c stateServerUp) HandleTimeout() connState {
// Unconnected Client // // Unconnected Client //
//////////////////////// ////////////////////////
type stateClientDown struct { type stateClientDown struct{ *connData }
*connData
}
func newStateClientDown(data *connData, peer *m.Peer) connState { func newStateClientDown(data *connData, peer *m.Peer) connState {
addr, _ := netip.AddrFromSlice(peer.PublicIP) addr, _ := netip.AddrFromSlice(peer.PublicIP)
@ -200,19 +163,15 @@ func newStateClientDown(data *connData, peer *m.Peer) connState {
return c return c
} }
func (c stateClientDown) Name() string { func (c stateClientDown) Name() string { return "Client:DOWN" }
return "Client:DOWN"
}
func (c stateClientDown) HandlePing(w wrapper[Ping]) connState { func (c stateClientDown) HandlePacket(w wrapper) connState {
log.Printf("Got ping...") switch w.Packet.(type) {
case *Ping:
next := newStateClientUp(c.connData, w) next := newStateClientUp(c.connData, w)
c.sendPong(w) // Have to send after transitionsing so route is ok. c.sendPong(w)
return next return next
} }
func (c stateClientDown) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignorning pong.")
return c return c
} }
@ -225,11 +184,9 @@ func (c stateClientDown) HandleTimeout() connState {
// Connected Client // // Connected Client //
////////////////////// //////////////////////
type stateClientUp struct { type stateClientUp struct{ *connData }
*connData
}
func newStateClientUp(data *connData, w wrapper[Ping]) connState { func newStateClientUp(data *connData, w wrapper) connState {
c := stateClientUp{data} c := stateClientUp{data}
c.addr = w.SrcAddr c.addr = w.SrcAddr
c.useMediator = false c.useMediator = false
@ -241,11 +198,11 @@ func newStateClientUp(data *connData, w wrapper[Ping]) connState {
return c return c
} }
func (c stateClientUp) Name() string { func (c stateClientUp) Name() string { return "Client:UP" }
return "Client:UP"
}
func (c stateClientUp) HandlePing(w wrapper[Ping]) connState { func (c stateClientUp) HandlePacket(w wrapper) connState {
switch w.Packet.(type) {
case *Ping:
// The connection is from a client. If the client's address changes, we // The connection is from a client. If the client's address changes, we
// should follow that change. // should follow that change.
if c.addr != w.SrcAddr { if c.addr != w.SrcAddr {
@ -254,11 +211,7 @@ func (c stateClientUp) HandlePing(w wrapper[Ping]) connState {
} }
c.sendPong(w) c.sendPong(w)
c.timeoutTimer.Reset(timeoutInterval) c.timeoutTimer.Reset(timeoutInterval)
return c
} }
func (c stateClientUp) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignoring pong.")
return c return c
} }
@ -270,9 +223,7 @@ func (c stateClientUp) HandleTimeout() connState {
// Mediated // // Mediated //
////////////// //////////////
type stateMediated struct { type stateMediated struct{ *connData }
*connData
}
func newStateMediated(data *connData, peer *m.Peer) connState { func newStateMediated(data *connData, peer *m.Peer) connState {
addr, _ := netip.AddrFromSlice(peer.PublicIP) addr, _ := netip.AddrFromSlice(peer.PublicIP)
@ -293,19 +244,9 @@ func newStateMediated(data *connData, peer *m.Peer) connState {
return c return c
} }
func (c stateMediated) Name() string { func (c stateMediated) Name() string { return "Mediated:UP" }
return "Mediated:UP"
}
func (c stateMediated) HandlePing(w wrapper[Ping]) connState { func (c stateMediated) HandlePacket(w wrapper) connState { return c }
logState(c, "Ignorning ping.")
return c
}
func (c stateMediated) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignorning pong.")
return c
}
func (c stateMediated) HandleTimeout() connState { func (c stateMediated) HandleTimeout() connState {
logState(c, "Unexpected timeout.") logState(c, "Unexpected timeout.")

View File

@ -1,7 +1,9 @@
package peer package peer
const DUP_LIST_SIZE = 32
type dupList struct { type dupList struct {
items [64]uint64 items [DUP_LIST_SIZE]uint64
index int index int
} }
@ -12,6 +14,6 @@ func (l *dupList) isDuplicate(in uint64) bool {
} }
} }
l.items[l.index] = in l.items[l.index] = in
l.index = (l.index + 1) % 64 l.index = (l.index + 1) % DUP_LIST_SIZE
return false return false
} }

View File

@ -16,7 +16,7 @@ func (peer *Peer) netReader() {
}() }()
var ( var (
dupList = &dupList{} dupLists = [MAX_IP]dupList{}
n int n int
srcAddr netip.AddrPort srcAddr netip.AddrPort
nonce Nonce nonce Nonce
@ -57,8 +57,7 @@ NEXT_PACKET:
goto NEXT_PACKET goto NEXT_PACKET
} }
if dupList.isDuplicate(nonce.Counter) { if nonce.Counter <= counters[nonce.StreamID][nonce.SourceIP] {
//if nonce.Counter+64 <= counters[nonce.StreamID][nonce.SourceIP] {
log.Printf("Dropping packet with bad counter: %d (-%d) - %v", nonce.Counter, counters[nonce.StreamID][nonce.SourceIP]-nonce.Counter, srcAddr) log.Printf("Dropping packet with bad counter: %d (-%d) - %v", nonce.Counter, counters[nonce.StreamID][nonce.SourceIP]-nonce.Counter, srcAddr)
goto NEXT_PACKET goto NEXT_PACKET
} }

View File

@ -27,15 +27,8 @@ type peerUpdate struct {
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Wrapper for routing packets. // Wrapper for routing packets.
type wrapper[T any] struct { type wrapper struct {
T T Packet Packet
Nonce Nonce Nonce Nonce
SrcAddr netip.AddrPort SrcAddr netip.AddrPort
} }
func newWrapper[T any](srcAddr netip.AddrPort, nonce Nonce) wrapper[T] {
return wrapper[T]{
SrcAddr: srcAddr,
Nonce: nonce,
}
}

View File

@ -72,27 +72,14 @@ func (r *Router) HandlePacket(src netip.AddrPort, nonce Nonce, data []byte) {
return return
} }
switch nonce.PacketType { packet := unmarshalPacket(nonce, data)
case PACKET_TYPE_PING: if packet == nil {
if len(data) < PING_SIZE {
log.Printf("Short ping request: %d", len(data))
return return
} }
w := newWrapper[Ping](src, nonce)
w.T.Parse(data)
r.conns[nonce.SourceIP].HandlePing(w) r.conns[nonce.SourceIP].HandlePacket(wrapper{
Packet: packet,
case PACKET_TYPE_PONG: Nonce: nonce,
if len(data) < PONG_SIZE { SrcAddr: src,
log.Printf("Short ping response: %d", len(data)) })
return
}
w := newWrapper[Pong](src, nonce)
w.T.Parse(data)
r.conns[nonce.SourceIP].HandlePong(w)
default:
log.Printf("Unknown routing packet type: %d", nonce.PacketType)
}
} }