From b797c5b321869c7de9a58068857a8ea2abb5ed2b Mon Sep 17 00:00:00 2001 From: jdl Date: Wed, 19 Feb 2025 16:34:03 +0100 Subject: [PATCH] Working --- peer/connreader.go | 2 - peer/hubpoller.go | 2 - peer/mcreader.go | 96 +++++++++++++++++++++++++------------------ peer/mcwriter.go | 30 +++++++------- peer/peer.go | 6 +++ peer/peerstates.go | 1 - peer/pubaddrs.go | 12 +++++- peer/pubaddrs_test.go | 2 +- peer/routingtable.go | 3 -- 9 files changed, 89 insertions(+), 65 deletions(-) diff --git a/peer/connreader.go b/peer/connreader.go index a07275e..b78e58f 100644 --- a/peer/connreader.go +++ b/peer/connreader.go @@ -50,9 +50,7 @@ func (r *connReader) Run() { func (r *connReader) handleNextPacket() { buf := r.buf[:bufferSize] - log.Printf("Getting next packet...") n, remoteAddr, err := r.readFromUDPAddrPort(buf) - log.Printf("Packet from %v...", remoteAddr) if err != nil { log.Fatalf("Failed to read from UDP port: %v", err) } diff --git a/peer/hubpoller.go b/peer/hubpoller.go index 572cb74..2b50495 100644 --- a/peer/hubpoller.go +++ b/peer/hubpoller.go @@ -51,9 +51,7 @@ func newHubPoller( } func (hp *hubPoller) Run() { - log.Printf("Running hub poller...") state, err := loadNetworkState(hp.netName) - log.Printf("Got state (%s) : %v", hp.netName, state) if err != nil { log.Printf("Failed to load network state: %v", err) log.Printf("Polling hub...") diff --git a/peer/mcreader.go b/peer/mcreader.go index a56576e..7c63f26 100644 --- a/peer/mcreader.go +++ b/peer/mcreader.go @@ -1,54 +1,70 @@ package peer -/* -type mcReader struct { - conn udpReader - super controlMsgHandler - peers [256]*atomic.Pointer[remotePeer] +import ( + "log" + "net" + "sync/atomic" + "time" +) - incoming []byte - buf []byte -} - -func newMCReader( - conn udpReader, - super controlMsgHandler, - peers [256]*atomic.Pointer[remotePeer], -) *mcReader { - return &mcReader{conn, super, peers, newBuf(), newBuf()} -} - -func (r *mcReader) Run() { +func runMCReader( + rt *atomic.Pointer[routingTable], + handleControlMsg func(destIP byte, msg any), +) { for { - r.handleNextPacket() + runMCReader2(rt, handleControlMsg) + time.Sleep(8 * time.Second) } } -func (r *mcReader) handleNextPacket() { - incoming := r.incoming[:bufferSize] - n, remoteAddr, err := r.conn.ReadFromUDPAddrPort(incoming) +func runMCReader2( + rt *atomic.Pointer[routingTable], + handleControlMsg func(destIP byte, msg any), +) { + var ( + raw = newBuf() + buf = newBuf() + logf = func(s string, args ...any) { + log.Printf("[MCReader] "+s, args...) + } + ) + + conn, err := net.ListenMulticastUDP("udp", nil, multicastAddr) if err != nil { - log.Fatalf("Failed to read from UDP multicast port: %v", err) - } - incoming = incoming[:n] - - h, ok := headerFromLocalDiscoveryPacket(incoming) - if !ok { + logf("Failed to bind to multicast address: %v", err) return } - peer := r.peers[h.SourceIP].Load() - if peer == nil || peer.PubSignKey == nil { - return - } + for { + conn.SetReadDeadline(time.Now().Add(32 * time.Second)) + n, remoteAddr, err := conn.ReadFromUDPAddrPort(raw[:bufferSize]) + if err != nil { + logf("Failed to read from UDP port): %v", err) + return + } - if !verifyLocalDiscoveryPacket(incoming, r.buf, peer.PubSignKey) { - return - } + raw = raw[:n] + h, ok := headerFromLocalDiscoveryPacket(raw) + if !ok { + logf("Failed to open discovery packet?") + continue + } - r.super.HandleControlMsg(controlMsg[packetLocalDiscovery]{ - SrcIP: h.SourceIP, - SrcAddr: remoteAddr, - }) + peer := rt.Load().Peers[h.SourceIP] + if peer.PubSignKey == nil { + logf("No signing key for peer %d.", h.SourceIP) + continue + } + + if !verifyLocalDiscoveryPacket(raw, buf, peer.PubSignKey) { + logf("Invalid signature from peer: %d", h.SourceIP) + continue + } + + msg := controlMsg[packetLocalDiscovery]{ + SrcIP: h.SourceIP, + SrcAddr: remoteAddr, + } + handleControlMsg(h.SourceIP, msg) + } } -*/ diff --git a/peer/mcwriter.go b/peer/mcwriter.go index c26c2c8..5559547 100644 --- a/peer/mcwriter.go +++ b/peer/mcwriter.go @@ -1,6 +1,10 @@ package peer import ( + "log" + "net" + "time" + "golang.org/x/crypto/nacl/sign" ) @@ -32,22 +36,18 @@ func verifyLocalDiscoveryPacket(pkt, buf []byte, pubSignKey []byte) bool { // ---------------------------------------------------------------------------- -/* -type mcWriter struct { +func runMCWriter(localIP byte, signingKey []byte) { + discoveryPacket := createLocalDiscoveryPacket(localIP, signingKey) - conn mcUDPWriter - discoveryPacket []byte -} + conn, err := net.ListenMulticastUDP("udp", nil, multicastAddr) + if err != nil { + log.Fatalf("Failed to bind to multicast address: %v", err) + } -func newMCWriter(conn mcUDPWriter, localIP byte, signingKey []byte) *mcWriter { - return &mcWriter{ - conn: conn, - discoveryPacket: createLocalDiscoveryPacket(localIP, signingKey), + for range time.Tick(16 * time.Second) { + _, err := conn.WriteToUDP(discoveryPacket, multicastAddr) + if err != nil { + log.Printf("[MCWriter] Failed to write multicast: %v", err) + } } } - -func (w *mcWriter) SendLocalDiscovery() { - if _, err := w.conn.WriteToUDP(w.discoveryPacket, multicastAddr); err != nil { - log.Printf("[MCWriter] Failed to write multicast UDP packet: %v", err) - } - }*/ diff --git a/peer/peer.go b/peer/peer.go index a0afc3b..45627b0 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -16,6 +16,8 @@ import ( ) type peerMain struct { + conf localConfig + rt *atomic.Pointer[routingTable] ifReader *ifReader connReader *connReader iface io.Writer @@ -92,6 +94,8 @@ func newPeerMain(conf peerConfig) *peerMain { } return &peerMain{ + conf: config, + rt: rtPtr, iface: iface, ifReader: ifReader, connReader: connReader, @@ -104,6 +108,8 @@ func (p *peerMain) Run() { go p.ifReader.Run() go p.connReader.Run() p.super.Start() + go runMCWriter(p.conf.PeerIP, p.conf.PrivSignKey) + go runMCReader(p.rt, p.super.HandleControlMsg) p.hubPoller.Run() } diff --git a/peer/peerstates.go b/peer/peerstates.go index 6a13f9f..a68afb1 100644 --- a/peer/peerstates.go +++ b/peer/peerstates.go @@ -70,7 +70,6 @@ func (s *pState) OnPeerUpdate(peer *m.Peer) peerState { s.staged.IP = peer.PeerIP s.staged.PubSignKey = peer.PubSignKey - log.Printf("New cipher: %x, %x", s.privKey, peer.PubKey) s.staged.ControlCipher = newControlCipher(s.privKey, peer.PubKey) s.staged.DataCipher = newDataCipher() diff --git a/peer/pubaddrs.go b/peer/pubaddrs.go index 13ab66f..027057a 100644 --- a/peer/pubaddrs.go +++ b/peer/pubaddrs.go @@ -5,10 +5,12 @@ import ( "net/netip" "runtime/debug" "sort" + "sync" "time" ) type pubAddrStore struct { + lock sync.Mutex localPub bool localAddr netip.AddrPort lastSeen map[netip.AddrPort]time.Time @@ -25,6 +27,9 @@ func newPubAddrStore(localAddr netip.AddrPort) *pubAddrStore { } func (store *pubAddrStore) Store(add netip.AddrPort) { + store.lock.Lock() + defer store.lock.Unlock() + if store.localPub { log.Printf("OOPS: Local pub but storage attempt: %s", debug.Stack()) return @@ -42,6 +47,11 @@ func (store *pubAddrStore) Store(add netip.AddrPort) { } func (store *pubAddrStore) Get() (addrs [8]netip.AddrPort) { + store.lock.Lock() + defer store.lock.Unlock() + + store.clean() + if store.localPub { addrs[0] = store.localAddr return @@ -51,7 +61,7 @@ func (store *pubAddrStore) Get() (addrs [8]netip.AddrPort) { return } -func (store *pubAddrStore) Clean() { +func (store *pubAddrStore) clean() { if store.localPub { return } diff --git a/peer/pubaddrs_test.go b/peer/pubaddrs_test.go index b79e854..fa47c22 100644 --- a/peer/pubaddrs_test.go +++ b/peer/pubaddrs_test.go @@ -20,7 +20,7 @@ func TestPubAddrStore(t *testing.T) { time.Sleep(time.Millisecond) } - s.Clean() + s.clean() l2 := s.Get() if l2[0] != l[2] || l2[1] != l[1] || l2[2] != l[0] { diff --git a/peer/routingtable.go b/peer/routingtable.go index 8caa380..3f0aac3 100644 --- a/peer/routingtable.go +++ b/peer/routingtable.go @@ -1,7 +1,6 @@ package peer import ( - "log" "net/netip" "sync/atomic" "time" @@ -68,8 +67,6 @@ func (p remotePeer) EncryptControlPacket(pkt marshaller, tmp, out []byte) []byte DestIP: p.IP, } - log.Printf("Encrypting with header: %#v", h) - return p.ControlCipher.Encrypt(h, tmp, out) }