package node import ( "fmt" "log" "net/netip" "sync/atomic" "time" "vppn/m" ) type remotePeers [256]*remotePeer type peerRoutingData struct { up bool relay bool controlCipher *controlCipher dataCipher *dataCipher remoteAddr netip.AddrPort relayIP byte // Non-zero if we should relay. } type remotePeer struct { // Immutable data. localIP byte remoteIP byte iface *ifWriter conn *connWriter // Shared state. peers *remotePeers published *atomic.Pointer[peerRoutingData] // Only used in HandlePacket / Not synchronized. dupCheck *dupCheck decryptBuf []byte // Only used in SendData / Not synchronized. encryptBuf []byte // Used for sending control and data packets. Atomic access only. counter uint64 // Only accessed in HandlePeerUpdate. Used to determine if we should send // the peer update to the peerSuper. peerVersion int64 // For communicating with the supervisor thread. peerUpdates chan *m.Peer controlPackets chan controlPacket } func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *connWriter, peers *remotePeers) *remotePeer { rp := &remotePeer{ localIP: conf.PeerIP, remoteIP: remoteIP, iface: iface, conn: conn, peers: peers, published: &atomic.Pointer[peerRoutingData]{}, dupCheck: newDupCheck(0), decryptBuf: make([]byte, bufferSize), encryptBuf: make([]byte, bufferSize), counter: uint64(time.Now().Unix()) << 30, peerUpdates: make(chan *m.Peer), controlPackets: make(chan controlPacket, 512), } pd := peerRoutingData{} rp.published.Store(&pd) //go newPeerSuper(rp).Run() go rp.supervise(conf) return rp } func (rp *remotePeer) logf(msg string, args ...any) { log.Printf(fmt.Sprintf("[%03d] ", rp.remoteIP)+msg, args...) } func (rp *remotePeer) HandlePeerUpdate(peer *m.Peer) { if peer == nil { rp.peerUpdates <- peer } else if peer.Version != rp.peerVersion { rp.peerVersion = peer.Version rp.peerUpdates <- peer } } // ---------------------------------------------------------------------------- // HandlePacket accepts a raw data packet coming in from the network. // // This function is called by a single thread. func (rp *remotePeer) HandlePacket(addr netip.AddrPort, h header, data []byte) { switch h.StreamID { case controlStreamID: rp.handleControlPacket(addr, h, data) case dataStreamID: rp.handleDataPacket(data) case relayStreamID: rp.handleRelayPacket(h, data) default: rp.logf("Unknown stream ID: %d", h.StreamID) } } // ---------------------------------------------------------------------------- func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []byte) { routingData := rp.published.Load() if routingData.controlCipher == nil { rp.logf("Not connected (control).") return } if h.DestIP != rp.localIP { rp.logf("Incorrect destination IP on control packet.") return } out, ok := routingData.controlCipher.Decrypt(data, rp.decryptBuf) if !ok { rp.logf("Failed to decrypt control packet.") return } if len(out) == 0 { rp.logf("Empty control packet from: %d", h.SourceIP) return } if rp.dupCheck.IsDup(h.Counter) { rp.logf("Duplicate control packet: %d", h.Counter) return } pkt := controlPacket{ SrcIP: h.SourceIP, RemoteAddr: addr, } if err := pkt.ParsePayload(out); err != nil { rp.logf("Failed to parse control packet: %v", err) return } select { case rp.controlPackets <- pkt: default: rp.logf("Dropping control packet.") } } // ---------------------------------------------------------------------------- func (rp *remotePeer) handleDataPacket(data []byte) { routingData := rp.published.Load() if routingData.dataCipher == nil { rp.logf("Not connected (recv).") return } dec, ok := routingData.dataCipher.Decrypt(data, rp.decryptBuf) if !ok { rp.logf("Failed to decrypt data packet.") return } rp.iface.Write(dec) } // ---------------------------------------------------------------------------- func (rp *remotePeer) handleRelayPacket(h header, data []byte) { routingData := rp.published.Load() if routingData.dataCipher == nil { rp.logf("Not connected (recv).") return } dec, ok := routingData.dataCipher.Decrypt(data, rp.decryptBuf) if !ok { rp.logf("Failed to decrypt data packet.") return } rp.peers[h.DestIP].SendAsIs(dec) } // ---------------------------------------------------------------------------- // SendData sends data coming from the interface going to the network. // // This function is called by a single thread. func (rp *remotePeer) SendData(data []byte) { rp.encryptAndSend(dataStreamID, rp.remoteIP, data) } func (rp *remotePeer) HandleInterfacePacket(data []byte) { routingData := rp.published.Load() if routingData.dataCipher == nil { rp.logf("Not connected (handle interface).") return } h := header{ StreamID: dataStreamID, Counter: atomic.AddUint64(&rp.counter, 1), SourceIP: rp.localIP, DestIP: rp.remoteIP, } enc := routingData.dataCipher.Encrypt(h, data, rp.encryptBuf) if routingData.relayIP != 0 { rp.peers[routingData.relayIP].RelayTo(rp.remoteIP, enc) } else { rp.SendData(data) } } // ---------------------------------------------------------------------------- func (rp *remotePeer) CanRelay() bool { data := rp.published.Load() return data.relay && data.up } // ---------------------------------------------------------------------------- func (rp *remotePeer) RelayTo(destIP byte, data []byte) { rp.encryptAndSend(relayStreamID, destIP, data) } // ---------------------------------------------------------------------------- func (rp *remotePeer) encryptAndSend(streamID byte, destIP byte, data []byte) { routingData := rp.published.Load() if routingData.dataCipher == nil || routingData.remoteAddr == zeroAddrPort { rp.logf("Not connected (encrypt and send).") return } h := header{ StreamID: streamID, Counter: atomic.AddUint64(&rp.counter, 1), SourceIP: rp.localIP, DestIP: destIP, } enc := routingData.dataCipher.Encrypt(h, data, rp.encryptBuf) rp.conn.WriteTo(enc, routingData.remoteAddr) } // ---------------------------------------------------------------------------- // SendAsIs is used when forwarding already-encrypted data from one peer to // another. func (rp *remotePeer) SendAsIs(data []byte) { routingData := rp.published.Load() if routingData.remoteAddr == zeroAddrPort { rp.logf("Not connected (send direct).") return } rp.conn.WriteTo(data, routingData.remoteAddr) }