From 869bbfb3d623e4206b968b23bbe10bfb50002b2f Mon Sep 17 00:00:00 2001 From: jdl Date: Mon, 23 Dec 2024 08:08:23 +0100 Subject: [PATCH] wip --- node/globalfuncs.go | 79 +++++++++ node/globals.go | 48 +++++- node/header.go | 1 - node/main.go | 158 ++++++++++++++--- node/peer-pollhub.go | 30 ++-- node/peer-super-states.go | 280 ------------------------------ node/peer-super.go | 95 ----------- node/peer-supervisor.go | 350 ++++++++++++++++++++++++++++++++++++-- node/peer.go | 274 ----------------------------- 9 files changed, 606 insertions(+), 709 deletions(-) create mode 100644 node/globalfuncs.go delete mode 100644 node/peer-super-states.go delete mode 100644 node/peer-super.go delete mode 100644 node/peer.go diff --git a/node/globalfuncs.go b/node/globalfuncs.go new file mode 100644 index 0000000..9ddf90c --- /dev/null +++ b/node/globalfuncs.go @@ -0,0 +1,79 @@ +package node + +import ( + "log" + "sync/atomic" +) + +func _sendControlPacket( + pkt interface{ Marshal([]byte) []byte }, + route peerRoute, + buf1 []byte, + buf2 []byte, +) { + buf := pkt.Marshal(buf1) + h1 := header{ + StreamID: controlStreamID, + Counter: atomic.AddUint64(&sendCounters[route.IP], 1), + SourceIP: localIP, + DestIP: route.IP, + } + buf = route.ControlCipher.Encrypt(h1, buf, buf2) + + if route.RelayIP == 0 { + _conn.WriteTo(buf, route.RemoteAddr) + return + } + + relayRoute := routingTable[route.RelayIP].Load() + if !relayRoute.Up || !relayRoute.Relay { + log.Print("Failed to send control packet: relay not available.") + return + } + + h2 := header{ + StreamID: dataStreamID, + Counter: atomic.AddUint64(&sendCounters[relayRoute.IP], 1), + SourceIP: localIP, + DestIP: route.IP, + } + buf = relayRoute.DataCipher.Encrypt(h2, buf, buf1) + _conn.WriteTo(buf, relayRoute.RemoteAddr) +} + +func _sendDataPacket( + pkt []byte, + route *peerRoute, + buf1 []byte, + buf2 []byte, +) { + h := header{ + StreamID: dataStreamID, + Counter: atomic.AddUint64(&sendCounters[route.IP], 1), + SourceIP: localIP, + DestIP: route.IP, + } + + enc := route.DataCipher.Encrypt(h, pkt, buf1) + + if route.RelayIP == 0 { + _conn.WriteTo(enc, route.RemoteAddr) + return + } + + relayRoute := routingTable[route.RelayIP].Load() + if !relayRoute.Up || !relayRoute.Relay { + log.Print("Failed to send data packet: relay not available.") + return + } + + h2 := header{ + StreamID: dataStreamID, + Counter: atomic.AddUint64(&sendCounters[relayRoute.IP], 1), + SourceIP: localIP, + DestIP: route.IP, + } + + enc = relayRoute.DataCipher.Encrypt(h2, enc, buf2) + _conn.WriteTo(enc, relayRoute.RemoteAddr) +} diff --git a/node/globals.go b/node/globals.go index b78c2c9..db1e792 100644 --- a/node/globals.go +++ b/node/globals.go @@ -1,15 +1,57 @@ package node -import "net/netip" +import ( + "net/netip" + "sync/atomic" + "vppn/m" +) + +var zeroAddrPort = netip.AddrPort{} const ( bufferSize = 1536 - if_mtu = 1400 + if_mtu = 1200 if_queue_len = 2048 controlCipherOverhead = 16 dataCipherOverhead = 16 ) +type peerRoute struct { + IP byte + Up bool // True if data can be sent on the route. + Relay bool // True if the peer is a relay. + ControlCipher *controlCipher + DataCipher *dataCipher + RemoteAddr netip.AddrPort // Remote address if directly connected. + LocalAddr netip.AddrPort // Local address as seen by the remote. + RelayIP byte // Non-zero if we should relay. +} + +// Configuration for this peer. var ( - zeroAddrPort = netip.AddrPort{} + netName string + localIP byte + localPub bool + privateKey []byte ) + +// Shared interface for writing. +var _iface *ifWriter + +// Shared connection for writing. +var _conn *connWriter + +// Counters for sending to each peer. +var sendCounters [256]uint64 + +// Duplicate checkers for incoming packets. +var dupChecks [256]*dupCheck + +// Channels for incoming control packets. +var controlPackets [256]chan controlPacket + +// Channels for incoming peer updates from the hub. +var peerUpdates [256]chan *m.Peer + +// Global routing table. +var routingTable [256]*atomic.Pointer[peerRoute] diff --git a/node/header.go b/node/header.go index 1a022a2..fd28962 100644 --- a/node/header.go +++ b/node/header.go @@ -10,7 +10,6 @@ const ( controlHeaderSize = 24 dataStreamID = 1 dataHeaderSize = 12 - relayStreamID = 3 ) type header struct { diff --git a/node/main.go b/node/main.go index c291e73..e2e5c42 100644 --- a/node/main.go +++ b/node/main.go @@ -11,6 +11,8 @@ import ( "net/netip" "os" "runtime/debug" + "sync/atomic" + "time" "vppn/m" ) @@ -24,7 +26,6 @@ func Main() { defer panicHandler() var ( - netName string initURL string listenIP string port int @@ -42,14 +43,14 @@ func Main() { } if initURL != "" { - mainInit(netName, initURL) + mainInit(initURL) return } - main(netName, listenIP, uint16(port)) + main(listenIP, uint16(port)) } -func mainInit(netName, initURL string) { +func mainInit(initURL string) { if _, err := loadPeerConfig(netName); err == nil { log.Fatalf("Network is already initialized.") } @@ -79,15 +80,15 @@ func mainInit(netName, initURL string) { // ---------------------------------------------------------------------------- -func main(netName, listenIP string, port uint16) { - conf, err := loadPeerConfig(netName) +func main(listenIP string, port uint16) { + config, err := loadPeerConfig(netName) if err != nil { log.Fatalf("Failed to load configuration: %v", err) } - port = determinePort(conf.Port, port) + port = determinePort(config.Port, port) - iface, err := openInterface(conf.Network, conf.PeerIP, netName) + iface, err := openInterface(config.Network, config.PeerIP, netName) if err != nil { log.Fatalf("Failed to open interface: %v", err) } @@ -102,18 +103,34 @@ func main(netName, listenIP string, port uint16) { log.Fatalf("Failed to open UDP port: %v", err) } - connWriter := newConnWriter(conn) - ifWriter := newIFWriter(iface) + // Intialize globals. + localIP = config.PeerIP + localPub = addrIsValid(config.PublicIP) + privateKey = config.EncPrivKey - peers := remotePeers{} + _iface = newIFWriter(iface) + _conn = newConnWriter(conn) - for i := range peers { - peers[i] = newRemotePeer(conf, byte(i), ifWriter, connWriter, &peers) + for i := range 256 { + sendCounters[i] = uint64(time.Now().Unix()<<30) + 1 + dupChecks[i] = newDupCheck(0) + controlPackets[i] = make(chan controlPacket, 256) + peerUpdates[i] = make(chan *m.Peer) + routingTable[i] = &atomic.Pointer[peerRoute]{} + route := peerRoute{IP: byte(i)} + routingTable[i].Store(&route) } - go newHubPoller(netName, conf, peers).Run() - go readFromConn(conn, peers) - readFromIFace(iface, peers) + // Start supervisors. + for i := range 256 { + go newPeerSupervisor(i).Run() + } + + // -------------------- + + go newHubPoller(config).Run() + go readFromConn(conn) + readFromIFace(iface) } // ---------------------------------------------------------------------------- @@ -130,7 +147,7 @@ func determinePort(confPort, portFromCommandLine uint16) uint16 { // ---------------------------------------------------------------------------- -func readFromConn(conn *net.UDPConn, peers remotePeers) { +func readFromConn(conn *net.UDPConn) { defer panicHandler() @@ -139,6 +156,7 @@ func readFromConn(conn *net.UDPConn, peers remotePeers) { n int err error buf = make([]byte, bufferSize) + decBuf = make([]byte, bufferSize) data []byte h header ) @@ -156,27 +174,119 @@ func readFromConn(conn *net.UDPConn, peers remotePeers) { } h.Parse(data) - peers[h.SourceIP].HandlePacket(remoteAddr, h, data) + switch h.StreamID { + case controlStreamID: + handleControlPacket(remoteAddr, h, data, decBuf) + + case dataStreamID: + handleDataPacket(h, data, decBuf) + + default: + log.Printf("Unknown stream ID: %d", h.StreamID) + } } } +func handleControlPacket(addr netip.AddrPort, h header, data, decBuf []byte) { + route := routingTable[h.SourceIP].Load() + if route.ControlCipher == nil { + log.Printf("Not connected (control).") + return + } + + if h.DestIP != localIP { + log.Printf("Incorrect destination IP on control packet: %d != %d", h.DestIP, localIP) + return + } + + out, ok := route.ControlCipher.Decrypt(data, decBuf) + if !ok { + log.Printf("Failed to decrypt control packet.") + return + } + + if len(out) == 0 { + log.Printf("Empty control packet from: %d", h.SourceIP) + return + } + + if dupChecks[h.SourceIP].IsDup(h.Counter) { + log.Printf("[%03d] Duplicate control packet: %d", h.SourceIP, h.Counter) + return + } + + pkt := controlPacket{ + SrcIP: h.SourceIP, + RemoteAddr: addr, + } + + if err := pkt.ParsePayload(out); err != nil { + log.Printf("Failed to parse control packet: %v", err) + return + } + + select { + case controlPackets[h.SourceIP] <- pkt: + default: + log.Printf("Dropping control packet.") + } +} + +func handleDataPacket(h header, data []byte, decBuf []byte) { + route := routingTable[h.SourceIP].Load() + if !route.Up { + log.Printf("Not connected (recv).") + return + } + + dec, ok := route.DataCipher.Decrypt(data, decBuf) + if !ok { + log.Printf("Failed to decrypt data packet.") + return + } + + if dupChecks[h.SourceIP].IsDup(h.Counter) { + log.Printf("[%03d] Duplicate data packet: %d", h.SourceIP, h.Counter) + return + } + + if h.DestIP == localIP { + _iface.Write(dec) + return + } + + destRoute := routingTable[h.DestIP].Load() + if !destRoute.Up || destRoute.RelayIP != 0 { + log.Printf("Not connected (relay)") + return + } + + _conn.WriteTo(dec, destRoute.RemoteAddr) +} + // ---------------------------------------------------------------------------- -func readFromIFace(iface io.ReadWriteCloser, peers remotePeers) { - +func readFromIFace(iface io.ReadWriteCloser) { var ( - buf = make([]byte, bufferSize) - packet []byte + packet = make([]byte, bufferSize) + buf1 = make([]byte, bufferSize) + buf2 = make([]byte, bufferSize) remoteIP byte err error ) for { - packet, remoteIP, err = readNextPacket(iface, buf) + packet, remoteIP, err = readNextPacket(iface, packet) if err != nil { log.Fatalf("Failed to read from interface: %v", err) } - peers[remoteIP].HandleInterfacePacket(packet) + route := routingTable[remoteIP].Load() + if !route.Up { + log.Printf("Route not connected: %d", remoteIP) + continue + } + + _sendDataPacket(packet, route, buf1, buf2) } } diff --git a/node/peer-pollhub.go b/node/peer-pollhub.go index aa1c91b..ef36431 100644 --- a/node/peer-pollhub.go +++ b/node/peer-pollhub.go @@ -11,14 +11,12 @@ import ( ) type hubPoller struct { - netName string - localIP byte - client *http.Client - req *http.Request - peers remotePeers + client *http.Client + req *http.Request + versions [256]int64 } -func newHubPoller(netName string, conf m.PeerConfig, peers remotePeers) *hubPoller { +func newHubPoller(conf m.PeerConfig) *hubPoller { u, err := url.Parse(conf.HubAddress) if err != nil { log.Fatalf("Failed to parse hub address %s: %v", conf.HubAddress, err) @@ -35,18 +33,15 @@ func newHubPoller(netName string, conf m.PeerConfig, peers remotePeers) *hubPoll req.SetBasicAuth("", conf.APIKey) return &hubPoller{ - netName: netName, - localIP: conf.PeerIP, - client: client, - req: req, - peers: peers, + client: client, + req: req, } } func (hp *hubPoller) Run() { defer panicHandler() - state, err := loadNetworkState(hp.netName) + state, err := loadNetworkState(netName) if err != nil { log.Printf("Failed to load network state: %v", err) log.Printf("Polling hub...") @@ -83,15 +78,18 @@ func (hp *hubPoller) pollHub() { hp.applyNetworkState(state) - if err := storeNetworkState(hp.netName, state); err != nil { + if err := storeNetworkState(netName, state); err != nil { log.Printf("Failed to store network state: %v", err) } } func (hp *hubPoller) applyNetworkState(state m.NetworkState) { - for i := range state.Peers { - if i != int(hp.localIP) { - hp.peers[i].HandlePeerUpdate(state.Peers[i]) + for i, peer := range state.Peers { + if i != int(localIP) { + if peer != nil && peer.Version != hp.versions[i] { + peerUpdates[i] <- state.Peers[i] + hp.versions[i] = peer.Version + } } } } diff --git a/node/peer-super-states.go b/node/peer-super-states.go deleted file mode 100644 index 2d888df..0000000 --- a/node/peer-super-states.go +++ /dev/null @@ -1,280 +0,0 @@ -package node - -import ( - "math/rand" - "net/netip" - "time" - "vppn/m" -) - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) noPeer() stateFunc { - return s.peerUpdate(<-s.peerUpdates) -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) peerUpdate(peer *m.Peer) stateFunc { - return func() stateFunc { return s._peerUpdate(peer) } -} - -func (s *peerSuper) _peerUpdate(peer *m.Peer) stateFunc { - defer s.publish() - - s.peer = peer - s.staged = peerRouteInfo{} - - if s.peer == nil { - return s.noPeer - } - - s.staged.controlCipher = newControlCipher(s.privKey, peer.EncPubKey) - s.staged.dataCipher = newDataCipher() - - if ip, isValid := netip.AddrFromSlice(peer.PublicIP); isValid { - s.remotePub = true - s.staged.relay = peer.Mediator - s.staged.remoteAddr = netip.AddrPortFrom(ip, peer.Port) - } - - if s.remotePub == s.localPub { - if s.localIP < s.remoteIP { - return s.serverAccept - } - return s.clientInit - } - - if s.remotePub { - return s.clientInit - } - return s.serverAccept -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) serverAccept() stateFunc { - s.logf("STATE: server-accept") - s.staged.up = false - s.staged.dataCipher = nil - s.staged.remoteAddr = zeroAddrPort - s.staged.relayIP = 0 - s.publish() - - var syn synPacket - - for { - select { - case peer := <-s.peerUpdates: - return s.peerUpdate(peer) - - case pkt := <-s.controlPackets: - switch p := pkt.Payload.(type) { - - case synPacket: - syn = p - s.staged.remoteAddr = pkt.RemoteAddr - s.staged.dataCipher = newDataCipherFromKey(syn.SharedKey) - s.staged.relayIP = syn.RelayIP - s.publish() - s.sendControlPacket(synAckPacket{ - TraceID: syn.TraceID, - RecvAddr: pkt.RemoteAddr, - }) - - case ackPacket: - if p.TraceID != syn.TraceID { - continue - } - - // Publish. - return s.serverConnected(syn.TraceID) - } - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) serverConnected(traceID uint64) stateFunc { - s.logf("STATE: server-connected") - s.staged.up = true - s.publish() - return func() stateFunc { - return s._serverConnected(traceID) - } -} - -func (s *peerSuper) _serverConnected(traceID uint64) stateFunc { - - timeoutTimer := time.NewTimer(timeoutInterval) - defer timeoutTimer.Stop() - - for { - select { - case peer := <-s.peerUpdates: - return s.peerUpdate(peer) - - case pkt := <-s.controlPackets: - switch p := pkt.Payload.(type) { - - case ackPacket: - if p.TraceID != traceID { - return s.serverAccept - } - - s.sendControlPacket(ackPacket{TraceID: traceID, RecvAddr: pkt.RemoteAddr}) - timeoutTimer.Reset(timeoutInterval) - } - - case <-timeoutTimer.C: - s.logf("server timeout") - return s.serverAccept - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) clientInit() stateFunc { - s.logf("STATE: client-init") - if !s.remotePub { - // TODO: Check local discovery for IP. - // TODO: Attempt UDP hole punch. - // TODO: client-relayed - return s.clientSelectRelay - } - - return s.clientDial -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) clientSelectRelay() stateFunc { - s.logf("STATE: client-select-relay") - - timer := time.NewTimer(0) - defer timer.Stop() - - for { - select { - case peer := <-s.peerUpdates: - return s.peerUpdate(peer) - - case <-timer.C: - ip := s.selectRelayIP() - if ip != 0 { - s.logf("Got relay: %d", ip) - s.staged.relayIP = ip - s.publish() - return s.clientDial - } - - s.logf("No relay available.") - timer.Reset(pingInterval) - } - } -} - -func (s *peerSuper) selectRelayIP() byte { - possible := make([]byte, 0, 8) - for i, peer := range s.peers { - if peer.CanRelay() { - possible = append(possible, byte(i)) - } - } - - if len(possible) == 0 { - return 0 - } - return possible[rand.Intn(len(possible))] -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) clientDial() stateFunc { - s.logf("STATE: client-dial") - - var ( - syn = synPacket{ - TraceID: newTraceID(), - SharedKey: s.staged.dataCipher.Key(), - RelayIP: s.staged.relayIP, - } - - timeout = time.NewTimer(dialTimeout) - ) - - defer timeout.Stop() - - s.sendControlPacket(syn) - - for { - select { - - case peer := <-s.peerUpdates: - return s.peerUpdate(peer) - - case pkt := <-s.controlPackets: - switch p := pkt.Payload.(type) { - case synAckPacket: - if p.TraceID != syn.TraceID { - continue // Hmm... - } - s.sendControlPacket(ackPacket{TraceID: syn.TraceID, RecvAddr: pkt.RemoteAddr}) - return s.clientConnected(p) - } - - case <-timeout.C: - return s.clientInit - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) clientConnected(p synAckPacket) stateFunc { - s.logf("STATE: client-connected") - s.staged.up = true - s.staged.localAddr = p.RecvAddr - s.publish() - - return func() stateFunc { - return s._clientConnected(p.TraceID) - } -} - -func (s *peerSuper) _clientConnected(traceID uint64) stateFunc { - - pingTimer := time.NewTimer(pingInterval) - timeoutTimer := time.NewTimer(timeoutInterval) - - defer pingTimer.Stop() - defer timeoutTimer.Stop() - - for { - select { - case peer := <-s.peerUpdates: - return s.peerUpdate(peer) - - case pkt := <-s.controlPackets: - switch p := pkt.Payload.(type) { - - case ackPacket: - if p.TraceID != traceID { - return s.clientInit - } - timeoutTimer.Reset(timeoutInterval) - } - - case <-pingTimer.C: - s.sendControlPacket(ackPacket{TraceID: traceID}) - pingTimer.Reset(pingInterval) - - case <-timeoutTimer.C: - s.logf("client timeout") - return s.clientInit - - } - } -} diff --git a/node/peer-super.go b/node/peer-super.go deleted file mode 100644 index f5e2436..0000000 --- a/node/peer-super.go +++ /dev/null @@ -1,95 +0,0 @@ -package node - -import ( - "fmt" - "log" - "sync/atomic" - "vppn/m" -) - -type peerSuper struct { - // The purpose of this state machine is to manage this published data. - published *atomic.Pointer[peerRouteInfo] - staged peerRouteInfo // Local copy of shared data. See publish(). - - // The other remote peers. - peers *remotePeers - - // Immutable data. - localIP byte - localPub bool - remoteIP byte - privKey []byte - conn *connWriter - - // For sending to peer. - counter *uint64 - - // Mutable peer data. - peer *m.Peer - remotePub bool - - // Incoming events. - peerUpdates chan *m.Peer - controlPackets chan controlPacket - - // Buffers - buf []byte - encBuf []byte -} - -type stateFunc func() stateFunc - -func (s *peerSuper) Run() { - state := s.noPeer - for { - state = state() - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) logf(msg string, args ...any) { - log.Printf(fmt.Sprintf("[%03d] ", s.remoteIP)+msg, args...) -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) publish() { - data := s.staged - s.published.Store(&data) -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) sendControlPacket(pkt interface{ Marshal([]byte) []byte }) { - buf := pkt.Marshal(s.buf) - h := header{ - StreamID: controlStreamID, - Counter: atomic.AddUint64(s.counter, 1), - SourceIP: s.localIP, - DestIP: s.remoteIP, - } - - buf = s.staged.controlCipher.Encrypt(h, buf, s.encBuf) - if s.staged.relayIP != 0 { - s.peers[s.staged.relayIP].RelayTo(s.remoteIP, buf) - } else { - s.conn.WriteTo(buf, s.staged.remoteAddr) - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSuper) sendControlPacketDirect(pkt interface{ Marshal([]byte) []byte }) { - buf := pkt.Marshal(s.buf) - h := header{ - StreamID: controlStreamID, - Counter: atomic.AddUint64(s.counter, 1), - SourceIP: s.localIP, - DestIP: s.remoteIP, - } - - buf = s.staged.controlCipher.Encrypt(h, buf, s.encBuf) - s.conn.WriteTo(buf, s.staged.remoteAddr) -} diff --git a/node/peer-supervisor.go b/node/peer-supervisor.go index dc7d2c6..6741f48 100644 --- a/node/peer-supervisor.go +++ b/node/peer-supervisor.go @@ -1,6 +1,11 @@ package node import ( + "fmt" + "log" + "math/rand" + "net/netip" + "sync/atomic" "time" "vppn/m" ) @@ -12,23 +17,336 @@ const ( timeoutInterval = 20 * time.Second ) -func (rp *remotePeer) supervise(conf m.PeerConfig) { - defer panicHandler() +// ---------------------------------------------------------------------------- - super := &peerSuper{ - published: rp.route, - peers: rp.peers, - localIP: rp.localIP, - localPub: addrIsValid(conf.PublicIP), - remoteIP: rp.remoteIP, - privKey: conf.EncPrivKey, - conn: rp.conn, - counter: &rp.counter, - peerUpdates: rp.peerUpdates, - controlPackets: rp.controlPackets, - buf: make([]byte, bufferSize), - encBuf: make([]byte, bufferSize), +type peerSupervisor struct { + // The purpose of this state machine is to manage this published data. + published *atomic.Pointer[peerRoute] + staged peerRoute // Local copy of shared data. See publish(). + + // Immutable data. + remoteIP byte // Remote VPN IP. + + // Mutable peer data. + peer *m.Peer + remotePub bool + + // Incoming events. + peerUpdates chan *m.Peer + controlPackets chan controlPacket + + // Buffers for sending control packets. + buf1 []byte + buf2 []byte +} + +func newPeerSupervisor(i int) *peerSupervisor { + return &peerSupervisor{ + published: routingTable[i], + remoteIP: byte(i), + peerUpdates: peerUpdates[i], + controlPackets: controlPackets[i], + buf1: make([]byte, bufferSize), + buf2: make([]byte, bufferSize), + } +} + +type stateFunc func() stateFunc + +func (s *peerSupervisor) Run() { + state := s.noPeer + for { + state = state() + } +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) sendControlPacket(pkt interface{ Marshal([]byte) []byte }) { + _sendControlPacket(pkt, s.staged, s.buf1, s.buf2) +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) logf(msg string, args ...any) { + log.Printf(fmt.Sprintf("[%03d] ", s.remoteIP)+msg, args...) +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) publish() { + data := s.staged + s.published.Store(&data) +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) noPeer() stateFunc { + return s.peerUpdate(<-s.peerUpdates) +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) peerUpdate(peer *m.Peer) stateFunc { + return func() stateFunc { return s._peerUpdate(peer) } +} + +func (s *peerSupervisor) _peerUpdate(peer *m.Peer) stateFunc { + defer s.publish() + + s.peer = peer + s.staged = peerRoute{} + + if s.peer == nil { + return s.noPeer } - go super.Run() + s.staged.IP = s.remoteIP + s.staged.ControlCipher = newControlCipher(privateKey, peer.EncPubKey) + s.staged.DataCipher = newDataCipher() + + if ip, isValid := netip.AddrFromSlice(peer.PublicIP); isValid { + s.remotePub = true + s.staged.Relay = peer.Mediator + s.staged.RemoteAddr = netip.AddrPortFrom(ip, peer.Port) + } + + if s.remotePub == localPub { + if localIP < s.remoteIP { + return s.serverAccept + } + return s.clientInit + } + + if s.remotePub { + return s.clientInit + } + return s.serverAccept +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) serverAccept() stateFunc { + s.logf("STATE: server-accept") + s.staged.Up = false + s.staged.DataCipher = nil + s.staged.RemoteAddr = zeroAddrPort + s.staged.RelayIP = 0 + s.publish() + + var syn synPacket + + for { + select { + case peer := <-s.peerUpdates: + return s.peerUpdate(peer) + + case pkt := <-s.controlPackets: + switch p := pkt.Payload.(type) { + + case synPacket: + syn = p + s.staged.RemoteAddr = pkt.RemoteAddr + s.staged.DataCipher = newDataCipherFromKey(syn.SharedKey) + s.staged.RelayIP = syn.RelayIP + s.publish() + s.sendControlPacket(synAckPacket{TraceID: syn.TraceID, RecvAddr: pkt.RemoteAddr}) + + case ackPacket: + if p.TraceID != syn.TraceID { + continue + } + + // Publish. + return s.serverConnected(syn.TraceID) + } + } + } +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) serverConnected(traceID uint64) stateFunc { + s.logf("STATE: server-connected") + s.staged.Up = true + s.publish() + return func() stateFunc { + return s._serverConnected(traceID) + } +} + +func (s *peerSupervisor) _serverConnected(traceID uint64) stateFunc { + + timeoutTimer := time.NewTimer(timeoutInterval) + defer timeoutTimer.Stop() + + for { + select { + case peer := <-s.peerUpdates: + return s.peerUpdate(peer) + + case pkt := <-s.controlPackets: + switch p := pkt.Payload.(type) { + + case ackPacket: + if p.TraceID != traceID { + return s.serverAccept + } + s.sendControlPacket(ackPacket{TraceID: traceID, RecvAddr: pkt.RemoteAddr}) + timeoutTimer.Reset(timeoutInterval) + } + + case <-timeoutTimer.C: + s.logf("server timeout") + return s.serverAccept + } + } +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) clientInit() stateFunc { + s.logf("STATE: client-init") + if !s.remotePub { + // TODO: Check local discovery for IP. + // TODO: Attempt UDP hole punch. + // TODO: client-relayed + return s.clientSelectRelay + } + + return s.clientDial +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) clientSelectRelay() stateFunc { + s.logf("STATE: client-select-relay") + + timer := time.NewTimer(0) + defer timer.Stop() + + for { + select { + case peer := <-s.peerUpdates: + return s.peerUpdate(peer) + + case <-timer.C: + relay := s.selectRelay() + if relay != nil { + s.logf("Got relay: %d", relay.IP) + s.staged.RelayIP = relay.IP + s.staged.LocalAddr = relay.LocalAddr + s.publish() + return s.clientDial + } + + s.logf("No relay available.") + timer.Reset(pingInterval) + } + } +} + +func (s *peerSupervisor) selectRelay() *peerRoute { + possible := make([]*peerRoute, 0, 8) + for i := range routingTable { + route := routingTable[i].Load() + if !route.Up || !route.Relay { + continue + } + possible = append(possible, route) + } + + if len(possible) == 0 { + return nil + } + return possible[rand.Intn(len(possible))] +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) clientDial() stateFunc { + s.logf("STATE: client-dial") + + var ( + syn = synPacket{ + TraceID: newTraceID(), + SharedKey: s.staged.DataCipher.Key(), + RelayIP: s.staged.RelayIP, + } + + timeout = time.NewTimer(dialTimeout) + ) + + defer timeout.Stop() + + s.sendControlPacket(syn) + + for { + select { + + case peer := <-s.peerUpdates: + return s.peerUpdate(peer) + + case pkt := <-s.controlPackets: + switch p := pkt.Payload.(type) { + case synAckPacket: + if p.TraceID != syn.TraceID { + continue // Hmm... + } + s.sendControlPacket(ackPacket{TraceID: syn.TraceID, RecvAddr: pkt.RemoteAddr}) + return s.clientConnected(p) + } + + case <-timeout.C: + return s.clientInit + } + } +} + +// ---------------------------------------------------------------------------- + +func (s *peerSupervisor) clientConnected(p synAckPacket) stateFunc { + s.logf("STATE: client-connected") + s.staged.Up = true + s.staged.LocalAddr = p.RecvAddr + s.publish() + + return func() stateFunc { + return s._clientConnected(p.TraceID) + } +} + +func (s *peerSupervisor) _clientConnected(traceID uint64) stateFunc { + + pingTimer := time.NewTimer(pingInterval) + timeoutTimer := time.NewTimer(timeoutInterval) + + defer pingTimer.Stop() + defer timeoutTimer.Stop() + + for { + select { + case peer := <-s.peerUpdates: + return s.peerUpdate(peer) + + case pkt := <-s.controlPackets: + switch p := pkt.Payload.(type) { + + case ackPacket: + if p.TraceID != traceID { + return s.clientInit + } + timeoutTimer.Reset(timeoutInterval) + } + + case <-pingTimer.C: + s.sendControlPacket(ackPacket{TraceID: traceID}) + pingTimer.Reset(pingInterval) + + case <-timeoutTimer.C: + s.logf("client timeout") + return s.clientInit + + } + } } diff --git a/node/peer.go b/node/peer.go deleted file mode 100644 index b829b39..0000000 --- a/node/peer.go +++ /dev/null @@ -1,274 +0,0 @@ -package node - -import ( - "fmt" - "log" - "net/netip" - "sync/atomic" - "time" - "vppn/m" -) - -type remotePeers [256]*remotePeer - -// ---------------------------------------------------------------------------- - -type peerRouteInfo struct { - up bool - relay bool - controlCipher *controlCipher - dataCipher *dataCipher - remoteAddr netip.AddrPort - localAddr netip.AddrPort // Local address as seen by the remote. - relayIP byte // Non-zero if we should relay. -} - -type remotePeer struct { - // Immutable data. - localIP byte - remoteIP byte - iface *ifWriter - conn *connWriter - - // Shared state. - peers *remotePeers - route *atomic.Pointer[peerRouteInfo] - - // Only used in HandlePacket / Not synchronized. - dupCheck *dupCheck - decryptBuf []byte - - // Only used in SendData / Not synchronized. - encryptBuf []byte - - // Used for sending control and data packets. Atomic access only. - counter uint64 - - // Only accessed in HandlePeerUpdate. Used to determine if we should send - // the peer update to the peerSuper. - peerVersion int64 - - // For communicating with the supervisor thread. - peerUpdates chan *m.Peer - controlPackets chan controlPacket -} - -func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *connWriter, peers *remotePeers) *remotePeer { - rp := &remotePeer{ - localIP: conf.PeerIP, - remoteIP: remoteIP, - iface: iface, - conn: conn, - peers: peers, - route: &atomic.Pointer[peerRouteInfo]{}, - dupCheck: newDupCheck(0), - decryptBuf: make([]byte, bufferSize), - encryptBuf: make([]byte, bufferSize), - counter: uint64(time.Now().Unix()) << 30, - peerUpdates: make(chan *m.Peer), - controlPackets: make(chan controlPacket, 512), - } - - pd := peerRouteInfo{} - rp.route.Store(&pd) - - //go newPeerSuper(rp).Run() - go rp.supervise(conf) - return rp -} - -func (rp *remotePeer) logf(msg string, args ...any) { - log.Printf(fmt.Sprintf("[%03d] ", rp.remoteIP)+msg, args...) -} - -func (rp *remotePeer) HandlePeerUpdate(peer *m.Peer) { - if peer == nil { - rp.peerUpdates <- peer - } else if peer.Version != rp.peerVersion { - rp.peerVersion = peer.Version - rp.peerUpdates <- peer - } -} - -// ---------------------------------------------------------------------------- - -// HandlePacket accepts a raw data packet coming in from the network. -// -// This function is called by a single thread. -func (rp *remotePeer) HandlePacket(addr netip.AddrPort, h header, data []byte) { - switch h.StreamID { - case controlStreamID: - rp.handleControlPacket(addr, h, data) - - case dataStreamID: - rp.handleDataPacket(data) - - case relayStreamID: - rp.handleRelayPacket(h, data) - - default: - rp.logf("Unknown stream ID: %d", h.StreamID) - } -} - -// ---------------------------------------------------------------------------- - -func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []byte) { - routingData := rp.route.Load() - if routingData.controlCipher == nil { - rp.logf("Not connected (control).") - return - } - - if h.DestIP != rp.localIP { - rp.logf("Incorrect destination IP on control packet.") - return - } - - out, ok := routingData.controlCipher.Decrypt(data, rp.decryptBuf) - if !ok { - rp.logf("Failed to decrypt control packet.") - return - } - - if len(out) == 0 { - rp.logf("Empty control packet from: %d", h.SourceIP) - return - } - - if rp.dupCheck.IsDup(h.Counter) { - rp.logf("Duplicate control packet: %d", h.Counter) - return - } - - pkt := controlPacket{ - SrcIP: h.SourceIP, - RemoteAddr: addr, - } - - if err := pkt.ParsePayload(out); err != nil { - rp.logf("Failed to parse control packet: %v", err) - return - } - - select { - case rp.controlPackets <- pkt: - default: - rp.logf("Dropping control packet.") - } -} - -// ---------------------------------------------------------------------------- - -func (rp *remotePeer) handleDataPacket(data []byte) { - routingData := rp.route.Load() - if routingData.dataCipher == nil { - rp.logf("Not connected (recv).") - return - } - - dec, ok := routingData.dataCipher.Decrypt(data, rp.decryptBuf) - if !ok { - rp.logf("Failed to decrypt data packet.") - return - } - - rp.iface.Write(dec) -} - -// ---------------------------------------------------------------------------- - -func (rp *remotePeer) handleRelayPacket(h header, data []byte) { - routingData := rp.route.Load() - if routingData.dataCipher == nil { - rp.logf("Not connected (recv).") - return - } - - dec, ok := routingData.dataCipher.Decrypt(data, rp.decryptBuf) - if !ok { - rp.logf("Failed to decrypt data packet.") - return - } - - rp.peers[h.DestIP].SendAsIs(dec) -} - -// ---------------------------------------------------------------------------- - -// SendData sends data coming from the interface going to the network. -// -// This function is called by a single thread. -func (rp *remotePeer) SendData(data []byte) { - rp.encryptAndSend(dataStreamID, rp.remoteIP, data) -} - -func (rp *remotePeer) HandleInterfacePacket(data []byte) { - routingData := rp.route.Load() - - if routingData.dataCipher == nil { - rp.logf("Not connected (handle interface).") - return - } - - h := header{ - StreamID: dataStreamID, - Counter: atomic.AddUint64(&rp.counter, 1), - SourceIP: rp.localIP, - DestIP: rp.remoteIP, - } - - enc := routingData.dataCipher.Encrypt(h, data, rp.encryptBuf) - - if routingData.relayIP != 0 { - rp.peers[routingData.relayIP].RelayTo(rp.remoteIP, enc) - } else { - rp.SendData(data) - } -} - -// ---------------------------------------------------------------------------- - -func (rp *remotePeer) CanRelay() bool { - data := rp.route.Load() - return data.relay && data.up -} - -// ---------------------------------------------------------------------------- - -func (rp *remotePeer) RelayTo(destIP byte, data []byte) { - rp.encryptAndSend(relayStreamID, destIP, data) -} - -// ---------------------------------------------------------------------------- - -func (rp *remotePeer) encryptAndSend(streamID byte, destIP byte, data []byte) { - routingData := rp.route.Load() - if routingData.dataCipher == nil || routingData.remoteAddr == zeroAddrPort { - rp.logf("Not connected (encrypt and send).") - return - } - - h := header{ - StreamID: streamID, - Counter: atomic.AddUint64(&rp.counter, 1), - SourceIP: rp.localIP, - DestIP: destIP, - } - - enc := routingData.dataCipher.Encrypt(h, data, rp.encryptBuf) - rp.conn.WriteTo(enc, routingData.remoteAddr) -} - -// ---------------------------------------------------------------------------- - -// SendAsIs is used when forwarding already-encrypted data from one peer to -// another. -func (rp *remotePeer) SendAsIs(data []byte) { - routingData := rp.route.Load() - if routingData.remoteAddr == zeroAddrPort { - rp.logf("Not connected (send direct).") - return - } - rp.conn.WriteTo(data, routingData.remoteAddr) -}