package node import ( "fmt" "log" "net/netip" "sync/atomic" "time" "vppn/m" ) type remotePeers [256]*remotePeer type peerData struct { controlCipher *controlCipher dataCipher *dataCipher remoteAddr netip.AddrPort relayIP byte // Non-zero if we should relay. } type remotePeer struct { // Immutable data. localIP byte remoteIP byte privKey []byte localPublic bool // True if local node is public. iface *ifWriter conn *connWriter // Shared state. shared *atomic.Pointer[peerData] // Only used in HandlePeerUpdate. peerVersion int64 // Only used in HandlePacket / Not synchronized. dupCheck *dupCheck decryptBuf []byte // Only used in SendData / Not synchronized. encryptBuf []byte // Used for sending control and data packets. Atomic access only. counter uint64 // For communicating with the supervisor thread. peerUpdates chan *m.Peer controlPackets chan controlPacket } func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *connWriter) *remotePeer { rp := &remotePeer{ localIP: conf.PeerIP, remoteIP: remoteIP, privKey: conf.EncPrivKey, localPublic: addrIsValid(conf.PublicIP), iface: iface, conn: conn, shared: &atomic.Pointer[peerData]{}, dupCheck: newDupCheck(0), decryptBuf: make([]byte, bufferSize), encryptBuf: make([]byte, bufferSize), counter: uint64(time.Now().Unix()) << 30, peerUpdates: make(chan *m.Peer), controlPackets: make(chan controlPacket, 512), } pd := peerData{} rp.shared.Store(&pd) go newPeerSuper(rp).Run() return rp } func (rp *remotePeer) logf(msg string, args ...any) { log.Printf(fmt.Sprintf("[%03d] ", rp.remoteIP)+msg, args...) } func (rp *remotePeer) HandlePeerUpdate(peer *m.Peer) { if peer != nil && peer.Version != rp.peerVersion { rp.peerUpdates <- peer rp.peerVersion = peer.Version } } // ---------------------------------------------------------------------------- // HandlePacket accepts a raw data packet coming in from the network. // // This function is called by a single thread. func (rp *remotePeer) HandlePacket(addr netip.AddrPort, h header, data []byte) { switch h.StreamID { case controlStreamID: rp.handleControlPacket(addr, h, data) case dataStreamID: rp.handleDataPacket(data) case forwardStreamID: fallthrough // TODO //rp.handleForwardPacket(h, data) default: rp.logf("Unknown stream ID: %d", h.StreamID) } } // ---------------------------------------------------------------------------- func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []byte) { shared := rp.shared.Load() if shared.controlCipher == nil { rp.logf("Not connected (control).") return } out, ok := shared.controlCipher.Decrypt(data, rp.decryptBuf) if !ok { rp.logf("Failed to decrypt control packet.") return } if len(out) == 0 { rp.logf("Empty control packet from: %d", h.SourceIP) return } if rp.dupCheck.IsDup(h.Counter) { rp.logf("Duplicate control packet: %d", h.Counter) return } if h.DestIP != rp.localIP { // TODO: Forward control packet. // TODO: Probably this should be dropped. // Control packets should be forwarded as data for efficiency. return } pkt := controlPacket{ SrcIP: h.SourceIP, RemoteAddr: addr, } var err error switch out[0] { case packetTypePing: pkt.Payload, err = parsePingPacket(out) case packetTypePong: pkt.Payload, err = parsePongPacket(out) default: rp.logf("Unknown control packet type: %d", out[0]) return } if err != nil { rp.logf("Failed to parse control packet: %v", err) return } select { case rp.controlPackets <- pkt: default: rp.logf("Dropping control packet.") } } func (rp *remotePeer) handleDataPacket(data []byte) { shared := rp.shared.Load() if shared.dataCipher == nil { rp.logf("Not connected (recv).") return } dec, ok := shared.dataCipher.Decrypt(data, rp.decryptBuf) if !ok { rp.logf("Failed to decrypt data packet.") return } rp.iface.Write(dec) } // ---------------------------------------------------------------------------- // SendData sends data coming from the interface going to the network. // // This function is called by a single thread. func (rp *remotePeer) SendData(data []byte) { shared := rp.shared.Load() if shared.dataCipher == nil || shared.remoteAddr == zeroAddrPort { rp.logf("Not connected (send).") return } h := header{ StreamID: dataStreamID, Counter: atomic.AddUint64(&rp.counter, 1), SourceIP: rp.localIP, DestIP: rp.remoteIP, } enc := shared.dataCipher.Encrypt(h, data, rp.encryptBuf) rp.conn.WriteTo(enc, shared.remoteAddr) }