package peer import ( "fmt" "log" "net/netip" "strings" "sync/atomic" "time" "vppn/m" "git.crumpington.com/lib/go/ratelimiter" ) // ---------------------------------------------------------------------------- // The remoteConfig is the shared, immutable configuration for a remote // peer. It's read and written atomically. See remote.config. // ---------------------------------------------------------------------------- type remoteConfig struct { Up bool // True if peer is up and we can send data. Server bool // True if role is server. Direct bool // True if this is a direct connection. DirectAddr netip.AddrPort // Remote address if directly connected. ControlCipher *controlCipher DataCipher *dataCipher Peer *m.Peer } // CanRelay returns true if the remote configuration is able to relay packets. // to other hosts. func (rc remoteConfig) CanRelay() bool { return rc.Up && rc.Direct && rc.Peer.Relay } // A Remote represents a remote peer and contains functions for handling // incoming control, data, and multicast packets, peer udpates, as well as // sending, forwarding, and relaying packets. type Remote struct { Globals RemotePeerIP byte // Immutable. limiter *ratelimiter.Limiter dupCheck *dupCheck sendCounter uint64 // init to startupCount << 48. Atomic access only. // config should be accessed via conf() and updateConf(...) methods. config atomic.Pointer[remoteConfig] messages chan any } func newRemote(g Globals, remotePeerIP byte) *Remote { r := &Remote{ Globals: g, RemotePeerIP: remotePeerIP, limiter: ratelimiter.New(ratelimiter.Config{ FillPeriod: 20 * time.Millisecond, MaxWaitCount: 1, }), dupCheck: newDupCheck(0), sendCounter: uint64(time.Now().Unix()<<30) + 1, messages: make(chan any, 8), } r.config.Store(&remoteConfig{}) return r } // ---------------------------------------------------------------------------- func (r *Remote) conf() remoteConfig { return *(r.config.Load()) } func (r *Remote) updateConf(conf remoteConfig) { old := r.config.Load() r.config.Store(&conf) if !old.CanRelay() && conf.CanRelay() { r.RelayHandler.Add(r) } if old.CanRelay() && !conf.CanRelay() { r.RelayHandler.Remove(r) } } // ---------------------------------------------------------------------------- func (r *Remote) sendUDP(b []byte, addr netip.AddrPort) { if _, err := r.SendUDP(b, addr); err != nil { r.logf("Failed to send URP packet: %v", err) } } // ---------------------------------------------------------------------------- func (r *Remote) encryptData(conf remoteConfig, destIP byte, packet []byte) []byte { h := Header{ StreamID: dataStreamID, Counter: atomic.AddUint64(&r.sendCounter, 1), SourceIP: r.Globals.LocalPeerIP, DestIP: destIP, } return conf.DataCipher.Encrypt(h, packet, packet[len(packet):cap(packet)]) } func (r *Remote) encryptControl(conf remoteConfig, packet []byte) []byte { h := Header{ StreamID: controlStreamID, Counter: atomic.AddUint64(&r.sendCounter, 1), SourceIP: r.LocalPeerIP, DestIP: r.RemotePeerIP, } return conf.ControlCipher.Encrypt(h, packet, packet[len(packet):cap(packet)]) } // ---------------------------------------------------------------------------- // SendDataTo sends a data packet to the remote, called by the IFReader. func (r *Remote) SendDataTo(data []byte) { conf := r.conf() if !conf.Up { r.logf("Cannot send: link down") return } if conf.Direct { r.sendDataDirect(conf, data) } else { r.sendDataRelayed(conf, data) } } // sendDataRelayed sends data to the remote via the relay. func (r *Remote) sendDataRelayed(conf remoteConfig, data []byte) { relay := r.RelayHandler.Load() if relay == nil { r.logf("Connot send: no relay") return } relay.relayData(conf.Peer.PeerIP, r.encryptData(conf, conf.Peer.PeerIP, data)) } // sendDataDirect sends data to the remote directly. func (r *Remote) sendDataDirect(conf remoteConfig, data []byte) { r.sendUDP(r.encryptData(conf, conf.Peer.PeerIP, data), conf.DirectAddr) } func (r *Remote) relayData(toIP byte, enc []byte) { conf := r.conf() if !conf.Up || !conf.Direct { r.logf("Cannot relay: not up or not a direct connection") return } r.sendUDP(r.encryptData(conf, toIP, enc), conf.DirectAddr) } func (r *Remote) sendControl(conf remoteConfig, data []byte) { if err := r.limiter.Limit(); err != nil { r.logf("Rate limiter") return } // Direct: if conf.Direct { enc := r.encryptControl(conf, data) r.sendUDP(enc, conf.DirectAddr) return } // Relayed: relay := r.RelayHandler.Load() if relay == nil { r.logf("Connot send: no relay") return } relay.relayData(conf.Peer.PeerIP, r.encryptControl(conf, data)) } func (r *Remote) sendControlToAddr(buf []byte, addr netip.AddrPort) { enc := r.encryptControl(r.conf(), buf) r.sendUDP(enc, addr) } func (r *Remote) forwardPacket(data []byte) { conf := r.conf() if !conf.Up || !conf.Direct { r.logf("Cannot forward to %d: not a direct connection", conf.Peer.PeerIP) return } r.sendUDP(data, conf.DirectAddr) } // ---------------------------------------------------------------------------- // HandlePacket is called by the ConnReader to handle an incoming packet. func (r *Remote) HandlePacket(h Header, srcAddr netip.AddrPort, data []byte) { switch h.StreamID { case controlStreamID: r.handleControlPacket(h, srcAddr, data) case dataStreamID: r.handleDataPacket(h, data) default: r.logf("Unknown stream ID: %d", h.StreamID) } } // Handle a control packet. Decrypt, verify, etc. func (r *Remote) handleControlPacket(h Header, srcAddr netip.AddrPort, data []byte) { conf := r.conf() if conf.ControlCipher == nil { r.logf("No control cipher") return } dec, ok := conf.ControlCipher.Decrypt(data, data[len(data):cap(data)]) if !ok { r.logf("Failed to decrypt control packet") return } if r.dupCheck.IsDup(h.Counter) { r.logf("Dropping control packet as duplicate: %d", h.Counter) return } msg, err := parseControlMsg(h.SourceIP, srcAddr, dec) if err != nil { r.logf("Failed to parse control packet: %v", err) return } select { case r.messages <- msg: default: r.logf("Dropping control message") } } func (r *Remote) handleDataPacket(h Header, data []byte) { conf := r.conf() if conf.DataCipher == nil { return } dec, ok := conf.DataCipher.Decrypt(data, data[len(data):cap(data)]) if !ok { r.logf("Failed to decrypt data packet") return } if r.dupCheck.IsDup(h.Counter) { r.logf("Dropping data packet as duplicate: %d", h.Counter) return } // For local. if h.DestIP == r.LocalPeerIP { if _, err := r.IFace.Write(dec); err != nil { // This could be a malformed packet from a peer, so we don't crash if it // happens. r.logf("Failed to write to interface: %v", err) } return } // Forward. dest := r.RemotePeers[h.DestIP].Load() dest.forwardPacket(dec) } // ---------------------------------------------------------------------------- // HandleLocalDiscoveryPacket is called by the MCReader. func (r *Remote) HandleLocalDiscoveryPacket(h Header, srcAddr netip.AddrPort, data []byte) { conf := r.conf() if conf.Peer.PubSignKey == nil { r.logf("No signing key for discovery packet.") return } if !verifyLocalDiscoveryPacket(data, data[len(data):cap(data)], conf.Peer.PubSignKey) { r.logf("Invalid signature on discovery packet.") return } msg := controlMsg[packetLocalDiscovery]{ SrcIP: h.SourceIP, SrcAddr: srcAddr, } select { case r.messages <- msg: default: r.logf("Dropping discovery message.") } } // ---------------------------------------------------------------------------- // HandlePeerUpdate is called by the HubPoller when it gets a new version of // the associated peer configuration. func (r *Remote) HandlePeerUpdate(msg peerUpdateMsg) { r.messages <- msg } // ---------------------------------------------------------------------------- func (s *Remote) logf(format string, args ...any) { conf := s.conf() b := strings.Builder{} name := "" if conf.Peer != nil { name = conf.Peer.Name } b.WriteString(fmt.Sprintf("%03d", s.RemotePeerIP)) b.WriteString(fmt.Sprintf("%30s: ", name)) if conf.Server { b.WriteString("SERVER | ") } else { b.WriteString("CLIENT | ") } if conf.Direct { b.WriteString("DIRECT | ") } else { b.WriteString("RELAYED | ") } if conf.Up { b.WriteString("UP | ") } else { b.WriteString("DOWN | ") } log.Printf(b.String()+format, args...) }