From a263f65c5d036d72f9e732a98046f25bd1d4a1b7 Mon Sep 17 00:00:00 2001 From: jdl Date: Sun, 22 Dec 2024 13:58:09 +0100 Subject: [PATCH] wip --- node/packets.go | 24 +++-- node/peer-states.go | 233 ++++++++++++++++++++++------------------ node/peer-supervisor.go | 10 +- 3 files changed, 147 insertions(+), 120 deletions(-) diff --git a/node/packets.go b/node/packets.go index bbc1262..ffda859 100644 --- a/node/packets.go +++ b/node/packets.go @@ -37,6 +37,10 @@ func (p *controlPacket) ParsePayload(buf []byte) (err error) { p.Payload, err = parsePongPacket(buf) case packetTypeSyn: p.Payload, err = parseSynPacket(buf) + case packetTypeSynAck: + p.Payload, err = parseSynAckPacket(buf) + case packetTypeAck: + p.Payload, err = parseAckPacket(buf) default: return errUnknownPacketType } @@ -49,7 +53,7 @@ type synPacket struct { TraceID uint64 // TraceID to match response w/ request. SharedKey [32]byte // Our shared key. ServerAddr netip.AddrPort // The address we're sending to. - Direct bool // True if this is request isn't relayed. + RelayIP byte } func (p synPacket) Marshal(buf []byte) []byte { @@ -58,7 +62,7 @@ func (p synPacket) Marshal(buf []byte) []byte { Uint64(p.TraceID). SharedKey(p.SharedKey). AddrPort(p.ServerAddr). - Bool(p.Direct). + Byte(p.RelayIP). Build() } @@ -67,7 +71,7 @@ func parseSynPacket(buf []byte) (p synPacket, err error) { Uint64(&p.TraceID). SharedKey(&p.SharedKey). AddrPort(&p.ServerAddr). - Bool(&p.Direct). + Byte(&p.RelayIP). Error() return } @@ -78,6 +82,10 @@ type synAckPacket struct { TraceID uint64 } +func newSynAckPacket(traceID uint64) synAckPacket { + return synAckPacket{traceID} +} + func (p synAckPacket) Marshal(buf []byte) []byte { return newBinWriter(buf). Byte(packetTypeSynAck). @@ -100,7 +108,7 @@ type ackPacket struct { func (p ackPacket) Marshal(buf []byte) []byte { return newBinWriter(buf). - Byte(packetTypeSynAck). + Byte(packetTypeAck). Uint64(p.TraceID). Build() } @@ -118,13 +126,11 @@ func parseAckPacket(buf []byte) (p ackPacket, err error) { // as a server. It always contains the shared key the client is expecting // to use for data encryption with the server. type pingPacket struct { - SentAt int64 // UnixMilli. // Not used. Use traceID. - SharedKey [32]byte + SentAt int64 // UnixMilli. // Not used. Use traceID. } -func newPingPacket(sharedKey [32]byte) (pp pingPacket) { +func newPingPacket() (pp pingPacket) { pp.SentAt = time.Now().UnixMilli() - pp.SharedKey = sharedKey return } @@ -132,14 +138,12 @@ func (p pingPacket) Marshal(buf []byte) []byte { return newBinWriter(buf). Byte(packetTypePing). Int64(p.SentAt). - SharedKey(p.SharedKey). Build() } func parsePingPacket(buf []byte) (p pingPacket, err error) { err = newBinReader(buf[1:]). Int64(&p.SentAt). - SharedKey(&p.SharedKey). Error() return } diff --git a/node/peer-states.go b/node/peer-states.go index 35ebc0b..39990bd 100644 --- a/node/peer-states.go +++ b/node/peer-states.go @@ -16,14 +16,11 @@ type peerState interface { OnSynAck(netip.AddrPort, synAckPacket) peerState OnAck(netip.AddrPort, ackPacket) peerState - // When the peer is updated, we reset. Handled by base state. - OnPeerUpdate(*m.Peer) peerState - - // To determe up / dataCipher. Handled by base state. - OnPing(netip.AddrPort, pingPacket) peerState - OnPong(netip.AddrPort, pongPacket) peerState OnPingTimer() peerState OnTimeoutTimer() peerState + + // When the peer is updated, we reset. Handled by base state. + OnPeerUpdate(*m.Peer) peerState } // ---------------------------------------------------------------------------- @@ -82,37 +79,39 @@ func (s *stateBase) selectStateFromPeer(peer *m.Peer) peerState { if peer == nil { return newStateNoPeer(s) } + s.staged.controlCipher = newControlCipher(s.privKey, peer.EncPubKey) + s.staged.dataCipher = newDataCipher() + + s.resetPingTimer() + s.resetTimeoutTimer() ip, isValid := netip.AddrFromSlice(peer.PublicIP) if isValid { s.remotePub = true s.staged.remoteAddr = netip.AddrPortFrom(ip, peer.Port) s.staged.relay = peer.Mediator + } - if s.localPub && s.localIP < s.remoteIP { - return newStateServer(s) + if s.remotePub == s.localPub { + if s.localIP < s.remoteIP { + return newStateServer2(s) } - return newStateClient(s) + return newStateDialLocal(s) } - if s.localPub { - return newStateServer(s) + if s.remotePub { + return newStateDialLocal(s) } - - return newStateSelectRelay(s) + return newStateServer2(s) } func (s *stateBase) OnSyn(rAddr netip.AddrPort, p synPacket) peerState { return nil } func (s *stateBase) OnSynAck(rAddr netip.AddrPort, p synAckPacket) peerState { return nil } func (s *stateBase) OnAck(rAddr netip.AddrPort, p ackPacket) peerState { return nil } -func (s *stateBase) OnPing(rAddr netip.AddrPort, p pingPacket) peerState { return nil } -func (s *stateBase) OnPong(rAddr netip.AddrPort, p pongPacket) peerState { return nil } -func (s *stateBase) OnPingTimer() peerState { return nil } -func (s *stateBase) OnTimeoutTimer() peerState { - return s.selectStateFromPeer(s.peer) -} +func (s *stateBase) OnPingTimer() peerState { return nil } +func (s *stateBase) OnTimeoutTimer() peerState { return nil } // Helpers. @@ -144,14 +143,6 @@ func (s *stateBase) selectRelay() byte { return possible[rand.Intn(len(possible))] } -func (s *stateBase) sendPing(sharedKey [32]byte) { - s.sendControlPacket(newPingPacket(sharedKey)) -} - -func (s *stateBase) sendPong(ping pingPacket) { - s.sendControlPacket(newPongPacket(ping.SentAt)) -} - func (s *stateBase) sendControlPacket(pkt interface{ Marshal([]byte) []byte }) { buf := pkt.Marshal(s.buf) h := header{ @@ -183,6 +174,117 @@ func newStateNoPeer(b *stateBase) *stateNoPeer { // ---------------------------------------------------------------------------- +type stateServer2 struct { + *stateBase + syn synPacket + publishedTraceID uint64 +} + +// TODO: Server should send SynAck packets on a loop. +func newStateServer2(b *stateBase) peerState { + s := &stateServer2{stateBase: b} + s.resetTimeoutTimer() + return s +} + +func (s *stateServer2) Name() string { return "server" } + +func (s *stateServer2) OnSyn(remoteAddr netip.AddrPort, p synPacket) peerState { + s.syn = p + s.sendControlPacket(newSynAckPacket(p.TraceID)) + return nil +} + +func (s *stateServer2) OnAck(remoteAddr netip.AddrPort, p ackPacket) peerState { + if p.TraceID != s.syn.TraceID { + return nil + } + + s.resetTimeoutTimer() + + if p.TraceID == s.publishedTraceID { + return nil + } + + // Pubish staged + s.staged.remoteAddr = remoteAddr + s.staged.dataCipher = newDataCipherFromKey(s.syn.SharedKey) + s.staged.relayIP = s.syn.RelayIP + s.staged.up = true + s.publish() + + s.publishedTraceID = p.TraceID + return nil +} + +func (s *stateServer) OnTimeoutTimer() peerState { + // TODO: We're down. + return nil +} + +// ---------------------------------------------------------------------------- + +type stateDialLocal struct { + *stateBase + syn synPacket +} + +func newStateDialLocal(b *stateBase) peerState { + // s := stateDialLocal{stateBase: b} + // TODO: check for peer local address. + return newStateDialDirect(b) +} + +func (s *stateDialLocal) Name() string { return "dial-local" } + +// ---------------------------------------------------------------------------- + +type stateDialDirect struct { + *stateBase + syn synPacket +} + +func newStateDialDirect(b *stateBase) peerState { + // If we don't have an address, dial via relay. + if b.staged.remoteAddr == zeroAddrPort { + return newStateNoPeer(b) + } + + s := &stateDialDirect{stateBase: b} + s.syn = synPacket{ + TraceID: newTraceID(), + SharedKey: s.staged.dataCipher.Key(), + ServerAddr: b.staged.remoteAddr, + } + + s.sendControlPacket(s.syn) + s.resetTimeoutTimer() + + return s +} + +func (s *stateDialDirect) Name() string { return "dial-direct" } + +func (s *stateDialDirect) OnSynAck(remoteAddr netip.AddrPort, p synAckPacket) peerState { + if p.TraceID != s.syn.TraceID { + // Hmm... + return nil + } + + s.sendControlPacket(ackPacket{TraceID: s.syn.TraceID}) + s.logf("GOT SYN-ACK! TODO!") + // client should continue to respond to synAck packets from server. + // return newStateClientConnected(s.stateBase, s.syn.TraceID) ... + return nil +} + +func (s *stateDialDirect) OnTimeoutTimer() peerState { + s.logf("Timeout when dialing") + return newStateDialLocal(s.stateBase) +} + +// ---------------------------------------------------------------------------- + type stateClient struct { sharedKey [32]byte *stateBase @@ -195,7 +297,7 @@ func newStateClient(b *stateBase) peerState { s.staged.dataCipher = newDataCipher() s.sharedKey = s.staged.dataCipher.Key() - s.sendPing(s.sharedKey) + s.sendControlPacket(newPingPacket()) s.resetPingTimer() s.resetTimeoutTimer() return s @@ -203,27 +305,6 @@ func newStateClient(b *stateBase) peerState { func (s *stateClient) Name() string { return "client" } -func (s *stateClient) OnPong(addr netip.AddrPort, p pongPacket) peerState { - if !s.staged.up { - s.staged.up = true - s.publish() - } - s.resetTimeoutTimer() - return nil -} - -func (s *stateClient) OnPingTimer() peerState { - s.sendPing(s.sharedKey) - s.resetPingTimer() - return nil -} - -func (s *stateClient) OnTimeoutTimer() peerState { - s.staged.up = false - s.publish() - return nil -} - // ---------------------------------------------------------------------------- type stateServer struct { @@ -240,25 +321,6 @@ func newStateServer(b *stateBase) peerState { func (s *stateServer) Name() string { return "server" } -func (s *stateServer) OnPing(addr netip.AddrPort, p pingPacket) peerState { - if addr != s.staged.remoteAddr { - s.logf("Got new peer address: %v", addr) - s.staged.remoteAddr = addr - s.staged.up = true - s.publish() - } - - if s.staged.dataCipher == nil || p.SharedKey != s.staged.dataCipher.Key() { - s.logf("Got new shared key.") - s.staged.dataCipher = newDataCipherFromKey(p.SharedKey) - s.staged.up = true - s.publish() - } - - s.sendPong(p) - return nil -} - // ---------------------------------------------------------------------------- type stateSelectRelay struct { @@ -315,7 +377,7 @@ func newStateClientRelayed(b *stateBase) peerState { s.sharedKey = s.staged.dataCipher.Key() s.publish() - s.sendPing(s.sharedKey) + s.sendControlPacket(newPingPacket()) s.resetPingTimer() s.resetTimeoutTimer() return s @@ -323,26 +385,6 @@ func newStateClientRelayed(b *stateBase) peerState { func (s *stateClientRelayed) Name() string { return "client-relayed" } -func (s *stateClientRelayed) OnPong(addr netip.AddrPort, p pongPacket) peerState { - if !s.staged.up { - s.staged.up = true - s.publish() - } - - s.resetTimeoutTimer() - return nil -} - -func (s *stateClientRelayed) OnPingTimer() peerState { - s.sendPing(s.sharedKey) - s.resetPingTimer() - return nil -} - -func (s *stateClientRelayed) OnTimeoutTimer() peerState { - return newStateSelectRelay(s.stateBase) -} - // ---------------------------------------------------------------------------- type stateServerRelayed struct { @@ -358,19 +400,6 @@ func newStateServerRelayed(b *stateBase) peerState { func (s *stateServerRelayed) Name() string { return "server-relayed" } -func (s *stateServerRelayed) OnPing(addr netip.AddrPort, p pingPacket) peerState { - if s.staged.dataCipher == nil || p.SharedKey != s.staged.dataCipher.Key() { - s.logf("Got new shared key.") - s.staged.up = true - s.staged.dataCipher = newDataCipherFromKey(p.SharedKey) - s.publish() - } - - s.sendPong(p) - s.resetTimeoutTimer() - return nil -} - func (s *stateServerRelayed) OnTimeoutTimer() peerState { return newStateSelectRelay(s.stateBase) } diff --git a/node/peer-supervisor.go b/node/peer-supervisor.go index 08691aa..3f3e0a0 100644 --- a/node/peer-supervisor.go +++ b/node/peer-supervisor.go @@ -6,6 +6,7 @@ import ( ) const ( + dialTimeout = 8 * time.Second connectTimeout = 6 * time.Second pingInterval = 6 * time.Second timeoutInterval = 20 * time.Second @@ -29,11 +30,8 @@ func (rp *remotePeer) supervise(conf m.PeerConfig) { encBuf: make([]byte, bufferSize), } - base.pingTimer.Stop() - base.timeoutTimer.Stop() - var ( - curState peerState = base + curState peerState = newStateNoPeer(base) nextState peerState ) @@ -52,10 +50,6 @@ func (rp *remotePeer) supervise(conf m.PeerConfig) { nextState = curState.OnSynAck(pkt.RemoteAddr, p) case ackPacket: nextState = curState.OnAck(pkt.RemoteAddr, p) - case pingPacket: - nextState = curState.OnPing(pkt.RemoteAddr, p) - case pongPacket: - nextState = curState.OnPong(pkt.RemoteAddr, p) default: // Unknown packet type. }