diff --git a/node/packets.go b/node/packets.go index 0126359..04db2a9 100644 --- a/node/packets.go +++ b/node/packets.go @@ -14,9 +14,8 @@ const ( packetTypeSyn = iota + 1 packetTypeSynAck packetTypeAck - packetTypePing - packetTypePong - packetTypeRelayed + packetTypeAddrReq + packetTypeAddrResp ) // ---------------------------------------------------------------------------- @@ -35,6 +34,10 @@ func (p *controlPacket) ParsePayload(buf []byte) (err error) { p.Payload, err = parseSynAckPacket(buf) case packetTypeAck: p.Payload, err = parseAckPacket(buf) + case packetTypeAddrReq: + p.Payload, err = parseAddrReqPacket(buf) + case packetTypeAddrResp: + p.Payload, err = parseAddrRespPacket(buf) default: return errUnknownPacketType } @@ -70,21 +73,66 @@ func parseSynPacket(buf []byte) (p synPacket, err error) { // ---------------------------------------------------------------------------- type synAckPacket struct { - TraceID uint64 -} - -func newSynAckPacket(traceID uint64) synAckPacket { - return synAckPacket{traceID} + TraceID uint64 + RecvAddr netip.AddrPort } func (p synAckPacket) Marshal(buf []byte) []byte { return newBinWriter(buf). Byte(packetTypeSynAck). Uint64(p.TraceID). + AddrPort(p.RecvAddr). Build() } func parseSynAckPacket(buf []byte) (p synAckPacket, err error) { + err = newBinReader(buf[1:]). + Uint64(&p.TraceID). + AddrPort(&p.RecvAddr). + Error() + return +} + +// ---------------------------------------------------------------------------- + +type ackPacket struct { + TraceID uint64 + SendAddr netip.AddrPort // Address of the sender. + RecvAddr netip.AddrPort // Address of the recipient as seen by sender. +} + +func (p ackPacket) Marshal(buf []byte) []byte { + return newBinWriter(buf). + Byte(packetTypeAck). + Uint64(p.TraceID). + AddrPort(p.SendAddr). + AddrPort(p.RecvAddr). + Build() +} + +func parseAckPacket(buf []byte) (p ackPacket, err error) { + err = newBinReader(buf[1:]). + Uint64(&p.TraceID). + AddrPort(&p.SendAddr). + AddrPort(&p.RecvAddr). + Error() + return +} + +// ---------------------------------------------------------------------------- + +type addrReqPacket struct { + TraceID uint64 +} + +func (p addrReqPacket) Marshal(buf []byte) []byte { + return newBinWriter(buf). + Byte(packetTypeAddrReq). + Uint64(p.TraceID). + Build() +} + +func parseAddrReqPacket(buf []byte) (p addrReqPacket, err error) { err = newBinReader(buf[1:]). Uint64(&p.TraceID). Error() @@ -93,20 +141,23 @@ func parseSynAckPacket(buf []byte) (p synAckPacket, err error) { // ---------------------------------------------------------------------------- -type ackPacket struct { +type addrRespPacket struct { TraceID uint64 + Addr netip.AddrPort } -func (p ackPacket) Marshal(buf []byte) []byte { +func (p addrRespPacket) Marshal(buf []byte) []byte { return newBinWriter(buf). - Byte(packetTypeAck). + Byte(packetTypeAddrResp). Uint64(p.TraceID). + AddrPort(p.Addr). Build() } -func parseAckPacket(buf []byte) (p ackPacket, err error) { +func parseAddrRespPacket(buf []byte) (p addrRespPacket, err error) { err = newBinReader(buf[1:]). Uint64(&p.TraceID). + AddrPort(&p.Addr). Error() return } diff --git a/node/peer-super-states.go b/node/peer-super-states.go index 6e615ae..2d888df 100644 --- a/node/peer-super-states.go +++ b/node/peer-super-states.go @@ -23,7 +23,7 @@ func (s *peerSuper) _peerUpdate(peer *m.Peer) stateFunc { defer s.publish() s.peer = peer - s.staged = peerRoutingData{} + s.staged = peerRouteInfo{} if s.peer == nil { return s.noPeer @@ -77,7 +77,10 @@ func (s *peerSuper) serverAccept() stateFunc { s.staged.dataCipher = newDataCipherFromKey(syn.SharedKey) s.staged.relayIP = syn.RelayIP s.publish() - s.sendControlPacket(newSynAckPacket(p.TraceID)) + s.sendControlPacket(synAckPacket{ + TraceID: syn.TraceID, + RecvAddr: pkt.RemoteAddr, + }) case ackPacket: if p.TraceID != syn.TraceID { @@ -120,7 +123,7 @@ func (s *peerSuper) _serverConnected(traceID uint64) stateFunc { return s.serverAccept } - s.sendControlPacket(ackPacket{TraceID: traceID}) + s.sendControlPacket(ackPacket{TraceID: traceID, RecvAddr: pkt.RemoteAddr}) timeoutTimer.Reset(timeoutInterval) } @@ -218,8 +221,8 @@ func (s *peerSuper) clientDial() stateFunc { if p.TraceID != syn.TraceID { continue // Hmm... } - s.sendControlPacket(ackPacket{TraceID: syn.TraceID}) - return s.clientConnected(syn.TraceID) + s.sendControlPacket(ackPacket{TraceID: syn.TraceID, RecvAddr: pkt.RemoteAddr}) + return s.clientConnected(p) } case <-timeout.C: @@ -230,13 +233,14 @@ func (s *peerSuper) clientDial() stateFunc { // ---------------------------------------------------------------------------- -func (s *peerSuper) clientConnected(traceID uint64) stateFunc { +func (s *peerSuper) clientConnected(p synAckPacket) stateFunc { s.logf("STATE: client-connected") s.staged.up = true + s.staged.localAddr = p.RecvAddr s.publish() return func() stateFunc { - return s._clientConnected(traceID) + return s._clientConnected(p.TraceID) } } diff --git a/node/peer-super.go b/node/peer-super.go index df1907f..f5e2436 100644 --- a/node/peer-super.go +++ b/node/peer-super.go @@ -9,8 +9,8 @@ import ( type peerSuper struct { // The purpose of this state machine is to manage this published data. - published *atomic.Pointer[peerRoutingData] - staged peerRoutingData // Local copy of shared data. See publish(). + published *atomic.Pointer[peerRouteInfo] + staged peerRouteInfo // Local copy of shared data. See publish(). // The other remote peers. peers *remotePeers @@ -78,3 +78,18 @@ func (s *peerSuper) sendControlPacket(pkt interface{ Marshal([]byte) []byte }) { s.conn.WriteTo(buf, s.staged.remoteAddr) } } + +// ---------------------------------------------------------------------------- + +func (s *peerSuper) sendControlPacketDirect(pkt interface{ Marshal([]byte) []byte }) { + buf := pkt.Marshal(s.buf) + h := header{ + StreamID: controlStreamID, + Counter: atomic.AddUint64(s.counter, 1), + SourceIP: s.localIP, + DestIP: s.remoteIP, + } + + buf = s.staged.controlCipher.Encrypt(h, buf, s.encBuf) + s.conn.WriteTo(buf, s.staged.remoteAddr) +} diff --git a/node/peer-supervisor.go b/node/peer-supervisor.go index 50401b8..dc7d2c6 100644 --- a/node/peer-supervisor.go +++ b/node/peer-supervisor.go @@ -16,7 +16,7 @@ func (rp *remotePeer) supervise(conf m.PeerConfig) { defer panicHandler() super := &peerSuper{ - published: rp.published, + published: rp.route, peers: rp.peers, localIP: rp.localIP, localPub: addrIsValid(conf.PublicIP), diff --git a/node/peer.go b/node/peer.go index 1fc3226..b829b39 100644 --- a/node/peer.go +++ b/node/peer.go @@ -11,13 +11,16 @@ import ( type remotePeers [256]*remotePeer -type peerRoutingData struct { +// ---------------------------------------------------------------------------- + +type peerRouteInfo struct { up bool relay bool controlCipher *controlCipher dataCipher *dataCipher remoteAddr netip.AddrPort - relayIP byte // Non-zero if we should relay. + localAddr netip.AddrPort // Local address as seen by the remote. + relayIP byte // Non-zero if we should relay. } type remotePeer struct { @@ -28,8 +31,8 @@ type remotePeer struct { conn *connWriter // Shared state. - peers *remotePeers - published *atomic.Pointer[peerRoutingData] + peers *remotePeers + route *atomic.Pointer[peerRouteInfo] // Only used in HandlePacket / Not synchronized. dupCheck *dupCheck @@ -57,7 +60,7 @@ func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *conn iface: iface, conn: conn, peers: peers, - published: &atomic.Pointer[peerRoutingData]{}, + route: &atomic.Pointer[peerRouteInfo]{}, dupCheck: newDupCheck(0), decryptBuf: make([]byte, bufferSize), encryptBuf: make([]byte, bufferSize), @@ -66,8 +69,8 @@ func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *conn controlPackets: make(chan controlPacket, 512), } - pd := peerRoutingData{} - rp.published.Store(&pd) + pd := peerRouteInfo{} + rp.route.Store(&pd) //go newPeerSuper(rp).Run() go rp.supervise(conf) @@ -111,7 +114,7 @@ func (rp *remotePeer) HandlePacket(addr netip.AddrPort, h header, data []byte) { // ---------------------------------------------------------------------------- func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []byte) { - routingData := rp.published.Load() + routingData := rp.route.Load() if routingData.controlCipher == nil { rp.logf("Not connected (control).") return @@ -158,7 +161,7 @@ func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data [] // ---------------------------------------------------------------------------- func (rp *remotePeer) handleDataPacket(data []byte) { - routingData := rp.published.Load() + routingData := rp.route.Load() if routingData.dataCipher == nil { rp.logf("Not connected (recv).") return @@ -176,7 +179,7 @@ func (rp *remotePeer) handleDataPacket(data []byte) { // ---------------------------------------------------------------------------- func (rp *remotePeer) handleRelayPacket(h header, data []byte) { - routingData := rp.published.Load() + routingData := rp.route.Load() if routingData.dataCipher == nil { rp.logf("Not connected (recv).") return @@ -201,7 +204,7 @@ func (rp *remotePeer) SendData(data []byte) { } func (rp *remotePeer) HandleInterfacePacket(data []byte) { - routingData := rp.published.Load() + routingData := rp.route.Load() if routingData.dataCipher == nil { rp.logf("Not connected (handle interface).") @@ -227,7 +230,7 @@ func (rp *remotePeer) HandleInterfacePacket(data []byte) { // ---------------------------------------------------------------------------- func (rp *remotePeer) CanRelay() bool { - data := rp.published.Load() + data := rp.route.Load() return data.relay && data.up } @@ -240,7 +243,7 @@ func (rp *remotePeer) RelayTo(destIP byte, data []byte) { // ---------------------------------------------------------------------------- func (rp *remotePeer) encryptAndSend(streamID byte, destIP byte, data []byte) { - routingData := rp.published.Load() + routingData := rp.route.Load() if routingData.dataCipher == nil || routingData.remoteAddr == zeroAddrPort { rp.logf("Not connected (encrypt and send).") return @@ -262,7 +265,7 @@ func (rp *remotePeer) encryptAndSend(streamID byte, destIP byte, data []byte) { // SendAsIs is used when forwarding already-encrypted data from one peer to // another. func (rp *remotePeer) SendAsIs(data []byte) { - routingData := rp.published.Load() + routingData := rp.route.Load() if routingData.remoteAddr == zeroAddrPort { rp.logf("Not connected (send direct).") return