package node import ( "fmt" "log" "math/rand" "net/netip" "sync/atomic" "time" "vppn/m" ) type peerState interface { Name() string OnSyn(netip.AddrPort, synPacket) peerState OnSynAck(netip.AddrPort, synAckPacket) peerState OnAck(netip.AddrPort, ackPacket) peerState OnPingTimer() peerState OnTimeoutTimer() peerState // When the peer is updated, we reset. Handled by base state. OnPeerUpdate(*m.Peer) peerState } // ---------------------------------------------------------------------------- type stateBase struct { // The purpose of this state machine is to manage this published data. published *atomic.Pointer[peerRoutingData] staged peerRoutingData // Local copy of shared data. See publish(). // The other remote peers. peers *remotePeers // Immutable data. localIP byte localPub bool remoteIP byte privKey []byte conn *connWriter // For sending to peer. counter *uint64 // Mutable peer data. peer *m.Peer remotePub bool // Timers pingTimer *time.Timer timeoutTimer *time.Timer buf []byte encBuf []byte } func (sb *stateBase) Name() string { return "idle" } func (s *stateBase) OnPeerUpdate(peer *m.Peer) peerState { // Both nil: no change. if peer == nil && s.peer == nil { return nil } // No change. if peer != nil && s.peer != nil && s.peer.Version == peer.Version { return nil } return s.selectStateFromPeer(peer) } func (s *stateBase) selectStateFromPeer(peer *m.Peer) peerState { s.peer = peer s.staged = peerRoutingData{} defer s.publish() if peer == nil { return newStateNoPeer(s) } s.staged.controlCipher = newControlCipher(s.privKey, peer.EncPubKey) s.staged.dataCipher = newDataCipher() s.resetPingTimer() s.resetTimeoutTimer() ip, isValid := netip.AddrFromSlice(peer.PublicIP) if isValid { s.remotePub = true s.staged.remoteAddr = netip.AddrPortFrom(ip, peer.Port) s.staged.relay = peer.Mediator } if s.remotePub == s.localPub { if s.localIP < s.remoteIP { return newStateServer2(s) } return newStateDialLocal(s) } if s.remotePub { return newStateDialLocal(s) } return newStateServer2(s) } func (s *stateBase) OnSyn(rAddr netip.AddrPort, p synPacket) peerState { return nil } func (s *stateBase) OnSynAck(rAddr netip.AddrPort, p synAckPacket) peerState { return nil } func (s *stateBase) OnAck(rAddr netip.AddrPort, p ackPacket) peerState { return nil } func (s *stateBase) OnPingTimer() peerState { return nil } func (s *stateBase) OnTimeoutTimer() peerState { return nil } // Helpers. func (s *stateBase) resetPingTimer() { s.pingTimer.Reset(pingInterval) } func (s *stateBase) resetTimeoutTimer() { s.timeoutTimer.Reset(timeoutInterval) } func (s *stateBase) stopPingTimer() { s.pingTimer.Stop() } func (s *stateBase) stopTimeoutTimer() { s.timeoutTimer.Stop() } func (s *stateBase) logf(msg string, args ...any) { log.Printf(fmt.Sprintf("[%03d] ", s.remoteIP)+msg, args...) } func (s *stateBase) publish() { data := s.staged s.published.Store(&data) } func (s *stateBase) selectRelay() byte { possible := make([]byte, 0, 8) for i, peer := range s.peers { if peer.CanRelay() { possible = append(possible, byte(i)) } } if len(possible) == 0 { return 0 } return possible[rand.Intn(len(possible))] } func (s *stateBase) sendControlPacket(pkt interface{ Marshal([]byte) []byte }) { buf := pkt.Marshal(s.buf) h := header{ StreamID: controlStreamID, Counter: atomic.AddUint64(s.counter, 1), SourceIP: s.localIP, DestIP: s.remoteIP, } buf = s.staged.controlCipher.Encrypt(h, buf, s.encBuf) if s.staged.relayIP != 0 { s.peers[s.staged.relayIP].RelayFor(s.remoteIP, buf) } else { s.conn.WriteTo(buf, s.staged.remoteAddr) } } // ---------------------------------------------------------------------------- type stateNoPeer struct{ *stateBase } func newStateNoPeer(b *stateBase) *stateNoPeer { s := &stateNoPeer{b} s.pingTimer.Stop() s.timeoutTimer.Stop() s.publish() return s } // ---------------------------------------------------------------------------- type stateServer2 struct { *stateBase syn synPacket publishedTraceID uint64 } // TODO: Server should send SynAck packets on a loop. func newStateServer2(b *stateBase) peerState { s := &stateServer2{stateBase: b} s.resetTimeoutTimer() return s } func (s *stateServer2) Name() string { return "server" } func (s *stateServer2) OnSyn(remoteAddr netip.AddrPort, p synPacket) peerState { s.syn = p s.sendControlPacket(newSynAckPacket(p.TraceID)) return nil } func (s *stateServer2) OnAck(remoteAddr netip.AddrPort, p ackPacket) peerState { if p.TraceID != s.syn.TraceID { return nil } s.resetTimeoutTimer() if p.TraceID == s.publishedTraceID { return nil } // Pubish staged s.staged.remoteAddr = remoteAddr s.staged.dataCipher = newDataCipherFromKey(s.syn.SharedKey) s.staged.relayIP = s.syn.RelayIP s.staged.up = true s.publish() s.publishedTraceID = p.TraceID return nil } func (s *stateServer) OnTimeoutTimer() peerState { // TODO: We're down. return nil } // ---------------------------------------------------------------------------- type stateDialLocal struct { *stateBase syn synPacket } func newStateDialLocal(b *stateBase) peerState { // s := stateDialLocal{stateBase: b} // TODO: check for peer local address. return newStateDialDirect(b) } func (s *stateDialLocal) Name() string { return "dial-local" } // ---------------------------------------------------------------------------- type stateDialDirect struct { *stateBase syn synPacket } func newStateDialDirect(b *stateBase) peerState { // If we don't have an address, dial via relay. if b.staged.remoteAddr == zeroAddrPort { return newStateNoPeer(b) } s := &stateDialDirect{stateBase: b} s.syn = synPacket{ TraceID: newTraceID(), SharedKey: s.staged.dataCipher.Key(), ServerAddr: b.staged.remoteAddr, } s.sendControlPacket(s.syn) s.resetTimeoutTimer() return s } func (s *stateDialDirect) Name() string { return "dial-direct" } func (s *stateDialDirect) OnSynAck(remoteAddr netip.AddrPort, p synAckPacket) peerState { if p.TraceID != s.syn.TraceID { // Hmm... return nil } s.sendControlPacket(ackPacket{TraceID: s.syn.TraceID}) s.logf("GOT SYN-ACK! TODO!") // client should continue to respond to synAck packets from server. // return newStateClientConnected(s.stateBase, s.syn.TraceID) ... return nil } func (s *stateDialDirect) OnTimeoutTimer() peerState { s.logf("Timeout when dialing") return newStateDialLocal(s.stateBase) } // ---------------------------------------------------------------------------- type stateClient struct { sharedKey [32]byte *stateBase } func newStateClient(b *stateBase) peerState { s := &stateClient{stateBase: b} s.publish() s.staged.dataCipher = newDataCipher() s.sharedKey = s.staged.dataCipher.Key() s.sendControlPacket(newPingPacket()) s.resetPingTimer() s.resetTimeoutTimer() return s } func (s *stateClient) Name() string { return "client" } // ---------------------------------------------------------------------------- type stateServer struct { *stateBase } func newStateServer(b *stateBase) peerState { s := &stateServer{b} s.publish() s.stopPingTimer() s.stopTimeoutTimer() return s } func (s *stateServer) Name() string { return "server" } // ---------------------------------------------------------------------------- type stateSelectRelay struct { *stateBase } func newStateSelectRelay(b *stateBase) peerState { s := &stateSelectRelay{stateBase: b} s.staged.dataCipher = nil s.staged.up = false s.publish() if relay := s.selectRelay(); relay != 0 { s.staged.up = false s.staged.relayIP = relay return s.selectRole() } s.resetPingTimer() s.stopTimeoutTimer() return s } func (s *stateSelectRelay) selectRole() peerState { if s.localIP < s.remoteIP { return newStateServerRelayed(s.stateBase) } return newStateClientRelayed(s.stateBase) } func (s *stateSelectRelay) Name() string { return "select-relay" } func (s *stateSelectRelay) OnPingTimer() peerState { if relay := s.selectRelay(); relay != 0 { s.logf("Got relay IP: %d", relay) s.staged.relayIP = relay return s.selectRole() } s.resetPingTimer() return nil } // ---------------------------------------------------------------------------- type stateClientRelayed struct { sharedKey [32]byte *stateBase } func newStateClientRelayed(b *stateBase) peerState { s := &stateClientRelayed{stateBase: b} s.staged.dataCipher = newDataCipher() s.sharedKey = s.staged.dataCipher.Key() s.publish() s.sendControlPacket(newPingPacket()) s.resetPingTimer() s.resetTimeoutTimer() return s } func (s *stateClientRelayed) Name() string { return "client-relayed" } // ---------------------------------------------------------------------------- type stateServerRelayed struct { *stateBase } func newStateServerRelayed(b *stateBase) peerState { s := &stateServerRelayed{b} s.stopPingTimer() s.resetTimeoutTimer() return s } func (s *stateServerRelayed) Name() string { return "server-relayed" } func (s *stateServerRelayed) OnTimeoutTimer() peerState { return newStateSelectRelay(s.stateBase) }