From 68cc5195b8b69eb0dfdb42ea673d09edc5867a1c Mon Sep 17 00:00:00 2001 From: jdl Date: Tue, 25 Feb 2025 19:23:44 +0100 Subject: [PATCH] wip --- peer/connreader.go | 2 +- peer/globals.go | 6 +- peer/hubpoller.go | 16 +++-- peer/logging.go | 13 ---- peer/mcreader.go | 6 +- peer/mcwriter.go | 4 +- peer/peer.go | 28 +++++--- peer/state-client.go | 162 +++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 201 insertions(+), 36 deletions(-) delete mode 100644 peer/logging.go create mode 100644 peer/state-client.go diff --git a/peer/connreader.go b/peer/connreader.go index 0727ced..4c156f4 100644 --- a/peer/connreader.go +++ b/peer/connreader.go @@ -84,7 +84,7 @@ func (r *connReader) handleControlPacket( enc []byte, ) { if peer.ControlCipher == nil { - log.Printf("No control cipher for peer: %v", h) + r.logf("No control cipher for peer: %d", h.SourceIP) return } diff --git a/peer/globals.go b/peer/globals.go index cd0e1f6..6dd26eb 100644 --- a/peer/globals.go +++ b/peer/globals.go @@ -18,8 +18,10 @@ const ( dataCipherOverhead = 16 signOverhead = 64 - pingInterval = 8 * time.Second - timeoutInterval = 30 * time.Second + pingInterval = 8 * time.Second + timeoutInterval = 30 * time.Second + broadcastInterval = 16 * time.Second + broadcastErrorTimeoutInterval = 8 * time.Second ) var multicastAddr = net.UDPAddrFromAddrPort(netip.AddrPortFrom( diff --git a/peer/hubpoller.go b/peer/hubpoller.go index 238dfda..0082989 100644 --- a/peer/hubpoller.go +++ b/peer/hubpoller.go @@ -50,11 +50,15 @@ func newHubPoller( }, nil } +func (hp *hubPoller) logf(s string, args ...any) { + log.Printf("[HubPoller] "+s, args...) +} + func (hp *hubPoller) Run() { state, err := loadNetworkState(hp.netName) if err != nil { - log.Printf("Failed to load network state: %v", err) - log.Printf("Polling hub...") + hp.logf("Failed to load network state: %v", err) + hp.logf("Polling hub...") hp.pollHub() } else { hp.applyNetworkState(state) @@ -70,25 +74,25 @@ func (hp *hubPoller) pollHub() { resp, err := hp.client.Do(hp.req) if err != nil { - log.Printf("Failed to fetch peer state: %v", err) + hp.logf("Failed to fetch peer state: %v", err) return } body, err := io.ReadAll(resp.Body) _ = resp.Body.Close() if err != nil { - log.Printf("Failed to read body from hub: %v", err) + hp.logf("Failed to read body from hub: %v", err) return } if err := json.Unmarshal(body, &state); err != nil { - log.Printf("Failed to unmarshal response from hub: %v\n%s", err, body) + hp.logf("Failed to unmarshal response from hub: %v\n%s", err, body) return } hp.applyNetworkState(state) if err := storeNetworkState(hp.netName, state); err != nil { - log.Printf("Failed to store network state: %v", err) + hp.logf("Failed to store network state: %v", err) } } diff --git a/peer/logging.go b/peer/logging.go deleted file mode 100644 index 4906b04..0000000 --- a/peer/logging.go +++ /dev/null @@ -1,13 +0,0 @@ -package peer - -import "log" - -func logPacket(p []byte, notes string) { - h := parseHeader(p) - log.Printf(`Sending: Data: %v | From: %d | To: %d | %s -`, - h.StreamID == dataStreamID, - h.SourceIP, - h.DestIP, - notes) -} diff --git a/peer/mcreader.go b/peer/mcreader.go index 7c63f26..7b8af27 100644 --- a/peer/mcreader.go +++ b/peer/mcreader.go @@ -12,12 +12,12 @@ func runMCReader( handleControlMsg func(destIP byte, msg any), ) { for { - runMCReader2(rt, handleControlMsg) - time.Sleep(8 * time.Second) + runMCReaderInner(rt, handleControlMsg) + time.Sleep(broadcastErrorTimeoutInterval) } } -func runMCReader2( +func runMCReaderInner( rt *atomic.Pointer[routingTable], handleControlMsg func(destIP byte, msg any), ) { diff --git a/peer/mcwriter.go b/peer/mcwriter.go index 29cf2be..eb53af4 100644 --- a/peer/mcwriter.go +++ b/peer/mcwriter.go @@ -41,10 +41,10 @@ func runMCWriter(localIP byte, signingKey []byte) { conn, err := net.ListenMulticastUDP("udp", nil, multicastAddr) if err != nil { - log.Fatalf("Failed to bind to multicast address: %v", err) + log.Fatalf("[MCWriter] Failed to bind to multicast address: %v", err) } - for range time.Tick(8 * time.Second) { + for range time.Tick(broadcastInterval) { _, err := conn.WriteToUDP(discoveryPacket, multicastAddr) if err != nil { log.Printf("[MCWriter] Failed to write multicast: %v", err) diff --git a/peer/peer.go b/peer/peer.go index 45627b0..c210af4 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -32,10 +32,14 @@ type peerConfig struct { } func newPeerMain(conf peerConfig) *peerMain { + logf := func(s string, args ...any) { + log.Printf("[Main] "+s, args...) + } + config, err := loadPeerConfig(conf.NetName) if err != nil { - log.Printf("Failed to load configuration: %v", err) - log.Printf("Initializing...") + logf("Failed to load configuration: %v", err) + logf("Initializing...") initPeerWithHub(conf) config, err = loadPeerConfig(conf.NetName) @@ -54,7 +58,7 @@ func newPeerMain(conf peerConfig) *peerMain { log.Fatalf("Failed to resolve UDP address: %v", err) } - log.Printf("Listening on %v...", myAddr) + logf("Listening on %v...", myAddr) conn, err := net.ListenUDP("udp", myAddr) if err != nil { log.Fatalf("Failed to open UDP port: %v", err) @@ -69,15 +73,15 @@ func newPeerMain(conf peerConfig) *peerMain { writeLock.Lock() n, err = conn.WriteToUDPAddrPort(b, addr) if err != nil { - log.Printf("Failed to write packet: %v", err) + logf("Failed to write packet: %v", err) } writeLock.Unlock() return n, err } var localAddr netip.AddrPort - ip, ok := netip.AddrFromSlice(config.PublicIP) - if ok { + ip, localAddrValid := netip.AddrFromSlice(config.PublicIP) + if localAddrValid { localAddr = netip.AddrPortFrom(ip, config.Port) } @@ -105,12 +109,18 @@ func newPeerMain(conf peerConfig) *peerMain { } func (p *peerMain) Run() { + go p.ifReader.Run() go p.connReader.Run() p.super.Start() - go runMCWriter(p.conf.PeerIP, p.conf.PrivSignKey) - go runMCReader(p.rt, p.super.HandleControlMsg) - p.hubPoller.Run() + + if !p.rt.Load().LocalAddr.IsValid() { + go runMCWriter(p.conf.PeerIP, p.conf.PrivSignKey) + go runMCReader(p.rt, p.super.HandleControlMsg) + } + + go p.hubPoller.Run() + select {} } func initPeerWithHub(conf peerConfig) { diff --git a/peer/state-client.go b/peer/state-client.go new file mode 100644 index 0000000..49e4375 --- /dev/null +++ b/peer/state-client.go @@ -0,0 +1,162 @@ +package peer + +import ( + "net/netip" + "time" +) + +type sentProbe struct { + SentAt time.Time + Addr netip.AddrPort +} + +type stateClient struct { + *peerData + lastSeen time.Time + syn packetSyn + probes map[uint64]sentProbe +} + +func enterStateClient(data *peerData) peerState { + ip, ipValid := netip.AddrFromSlice(data.peer.PublicIP) + + data.staged.Relay = data.peer.Relay && ipValid + data.staged.Direct = ipValid + data.staged.DirectAddr = netip.AddrPortFrom(ip, data.peer.Port) + data.publish(data.staged) + + state := &stateClient{ + peerData: data, + lastSeen: time.Now(), + syn: packetSyn{ + TraceID: newTraceID(), + SharedKey: data.staged.DataCipher.Key(), + Direct: data.staged.Direct, + PossibleAddrs: data.pubAddrs.Get(), + }, + probes: map[uint64]sentProbe{}, + } + + state.Send(state.staged, state.syn) + + data.pingTimer.Reset(pingInterval) + + state.logf("==> Client") + return state +} + +func (s *stateClient) logf(str string, args ...any) { + s.peerData.logf("CLNT | "+str, args...) +} + +func (s *stateClient) OnMsg(raw any) peerState { + switch msg := raw.(type) { + case peerUpdateMsg: + return initPeerState(s.peerData, msg.Peer) + case controlMsg[packetAck]: + s.onAck(msg) + case controlMsg[packetProbe]: + return s.onProbe(msg) + case controlMsg[packetLocalDiscovery]: + s.onLocalDiscovery(msg) + case pingTimerMsg: + return s.onPingTimer() + default: + s.logf("Ignoring message: %v", raw) + } + return s +} + +func (s *stateClient) onAck(msg controlMsg[packetAck]) { + if msg.Packet.TraceID != s.syn.TraceID { + return + } + + s.lastSeen = time.Now() + + if !s.staged.Up { + s.staged.Up = true + s.publish(s.staged) + s.logf("Got ACK.") + } + + if s.staged.Direct { + s.pubAddrs.Store(msg.Packet.ToAddr) + return + } + + // Relayed below. + + s.cleanProbes() + + for _, addr := range msg.Packet.PossibleAddrs { + if !addr.IsValid() { + break + } + s.sendProbeTo(addr) + } +} + +func (s *stateClient) onPingTimer() peerState { + if time.Since(s.lastSeen) > timeoutInterval { + if s.staged.Up { + s.logf("Timeout.") + } + return initPeerState(s.peerData, s.peer) + } + + s.Send(s.staged, s.syn) + return s +} + +func (s *stateClient) onProbe(msg controlMsg[packetProbe]) peerState { + if s.staged.Direct { + return s + } + + s.cleanProbes() + + sent, ok := s.probes[msg.Packet.TraceID] + if !ok { + return s + } + + s.staged.Direct = true + s.staged.DirectAddr = sent.Addr + s.publish(s.staged) + + s.syn.TraceID = newTraceID() + s.syn.Direct = true + s.Send(s.staged, s.syn) + s.logf("Successful probe.") + return s +} + +func (s *stateClient) onLocalDiscovery(msg controlMsg[packetLocalDiscovery]) { + if s.staged.Direct { + return + } + + // The source port will be the multicast port, so we'll have to + // construct the correct address using the peer's listed port. + addr := netip.AddrPortFrom(msg.SrcAddr.Addr(), s.peer.Port) + s.sendProbeTo(addr) +} + +func (s *stateClient) cleanProbes() { + for key, sent := range s.probes { + if time.Since(sent.SentAt) > pingInterval { + delete(s.probes, key) + } + } +} + +func (s *stateClient) sendProbeTo(addr netip.AddrPort) { + probe := packetProbe{TraceID: newTraceID()} + s.probes[probe.TraceID] = sentProbe{ + SentAt: time.Now(), + Addr: addr, + } + s.logf("Probing %v...", addr) + s.SendTo(probe, addr) +}