wip working - modifying
This commit is contained in:
parent
08f11ce82b
commit
51d7b5f086
@ -14,9 +14,8 @@ const (
|
||||
packetTypeSyn = iota + 1
|
||||
packetTypeSynAck
|
||||
packetTypeAck
|
||||
packetTypePing
|
||||
packetTypePong
|
||||
packetTypeRelayed
|
||||
packetTypeAddrReq
|
||||
packetTypeAddrResp
|
||||
)
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
@ -35,6 +34,10 @@ func (p *controlPacket) ParsePayload(buf []byte) (err error) {
|
||||
p.Payload, err = parseSynAckPacket(buf)
|
||||
case packetTypeAck:
|
||||
p.Payload, err = parseAckPacket(buf)
|
||||
case packetTypeAddrReq:
|
||||
p.Payload, err = parseAddrReqPacket(buf)
|
||||
case packetTypeAddrResp:
|
||||
p.Payload, err = parseAddrRespPacket(buf)
|
||||
default:
|
||||
return errUnknownPacketType
|
||||
}
|
||||
@ -70,21 +73,66 @@ func parseSynPacket(buf []byte) (p synPacket, err error) {
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
type synAckPacket struct {
|
||||
TraceID uint64
|
||||
}
|
||||
|
||||
func newSynAckPacket(traceID uint64) synAckPacket {
|
||||
return synAckPacket{traceID}
|
||||
TraceID uint64
|
||||
RecvAddr netip.AddrPort
|
||||
}
|
||||
|
||||
func (p synAckPacket) Marshal(buf []byte) []byte {
|
||||
return newBinWriter(buf).
|
||||
Byte(packetTypeSynAck).
|
||||
Uint64(p.TraceID).
|
||||
AddrPort(p.RecvAddr).
|
||||
Build()
|
||||
}
|
||||
|
||||
func parseSynAckPacket(buf []byte) (p synAckPacket, err error) {
|
||||
err = newBinReader(buf[1:]).
|
||||
Uint64(&p.TraceID).
|
||||
AddrPort(&p.RecvAddr).
|
||||
Error()
|
||||
return
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
type ackPacket struct {
|
||||
TraceID uint64
|
||||
SendAddr netip.AddrPort // Address of the sender.
|
||||
RecvAddr netip.AddrPort // Address of the recipient as seen by sender.
|
||||
}
|
||||
|
||||
func (p ackPacket) Marshal(buf []byte) []byte {
|
||||
return newBinWriter(buf).
|
||||
Byte(packetTypeAck).
|
||||
Uint64(p.TraceID).
|
||||
AddrPort(p.SendAddr).
|
||||
AddrPort(p.RecvAddr).
|
||||
Build()
|
||||
}
|
||||
|
||||
func parseAckPacket(buf []byte) (p ackPacket, err error) {
|
||||
err = newBinReader(buf[1:]).
|
||||
Uint64(&p.TraceID).
|
||||
AddrPort(&p.SendAddr).
|
||||
AddrPort(&p.RecvAddr).
|
||||
Error()
|
||||
return
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
type addrReqPacket struct {
|
||||
TraceID uint64
|
||||
}
|
||||
|
||||
func (p addrReqPacket) Marshal(buf []byte) []byte {
|
||||
return newBinWriter(buf).
|
||||
Byte(packetTypeAddrReq).
|
||||
Uint64(p.TraceID).
|
||||
Build()
|
||||
}
|
||||
|
||||
func parseAddrReqPacket(buf []byte) (p addrReqPacket, err error) {
|
||||
err = newBinReader(buf[1:]).
|
||||
Uint64(&p.TraceID).
|
||||
Error()
|
||||
@ -93,20 +141,23 @@ func parseSynAckPacket(buf []byte) (p synAckPacket, err error) {
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
type ackPacket struct {
|
||||
type addrRespPacket struct {
|
||||
TraceID uint64
|
||||
Addr netip.AddrPort
|
||||
}
|
||||
|
||||
func (p ackPacket) Marshal(buf []byte) []byte {
|
||||
func (p addrRespPacket) Marshal(buf []byte) []byte {
|
||||
return newBinWriter(buf).
|
||||
Byte(packetTypeAck).
|
||||
Byte(packetTypeAddrResp).
|
||||
Uint64(p.TraceID).
|
||||
AddrPort(p.Addr).
|
||||
Build()
|
||||
}
|
||||
|
||||
func parseAckPacket(buf []byte) (p ackPacket, err error) {
|
||||
func parseAddrRespPacket(buf []byte) (p addrRespPacket, err error) {
|
||||
err = newBinReader(buf[1:]).
|
||||
Uint64(&p.TraceID).
|
||||
AddrPort(&p.Addr).
|
||||
Error()
|
||||
return
|
||||
}
|
||||
|
@ -23,7 +23,7 @@ func (s *peerSuper) _peerUpdate(peer *m.Peer) stateFunc {
|
||||
defer s.publish()
|
||||
|
||||
s.peer = peer
|
||||
s.staged = peerRoutingData{}
|
||||
s.staged = peerRouteInfo{}
|
||||
|
||||
if s.peer == nil {
|
||||
return s.noPeer
|
||||
@ -77,7 +77,10 @@ func (s *peerSuper) serverAccept() stateFunc {
|
||||
s.staged.dataCipher = newDataCipherFromKey(syn.SharedKey)
|
||||
s.staged.relayIP = syn.RelayIP
|
||||
s.publish()
|
||||
s.sendControlPacket(newSynAckPacket(p.TraceID))
|
||||
s.sendControlPacket(synAckPacket{
|
||||
TraceID: syn.TraceID,
|
||||
RecvAddr: pkt.RemoteAddr,
|
||||
})
|
||||
|
||||
case ackPacket:
|
||||
if p.TraceID != syn.TraceID {
|
||||
@ -120,7 +123,7 @@ func (s *peerSuper) _serverConnected(traceID uint64) stateFunc {
|
||||
return s.serverAccept
|
||||
}
|
||||
|
||||
s.sendControlPacket(ackPacket{TraceID: traceID})
|
||||
s.sendControlPacket(ackPacket{TraceID: traceID, RecvAddr: pkt.RemoteAddr})
|
||||
timeoutTimer.Reset(timeoutInterval)
|
||||
}
|
||||
|
||||
@ -218,8 +221,8 @@ func (s *peerSuper) clientDial() stateFunc {
|
||||
if p.TraceID != syn.TraceID {
|
||||
continue // Hmm...
|
||||
}
|
||||
s.sendControlPacket(ackPacket{TraceID: syn.TraceID})
|
||||
return s.clientConnected(syn.TraceID)
|
||||
s.sendControlPacket(ackPacket{TraceID: syn.TraceID, RecvAddr: pkt.RemoteAddr})
|
||||
return s.clientConnected(p)
|
||||
}
|
||||
|
||||
case <-timeout.C:
|
||||
@ -230,13 +233,14 @@ func (s *peerSuper) clientDial() stateFunc {
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (s *peerSuper) clientConnected(traceID uint64) stateFunc {
|
||||
func (s *peerSuper) clientConnected(p synAckPacket) stateFunc {
|
||||
s.logf("STATE: client-connected")
|
||||
s.staged.up = true
|
||||
s.staged.localAddr = p.RecvAddr
|
||||
s.publish()
|
||||
|
||||
return func() stateFunc {
|
||||
return s._clientConnected(traceID)
|
||||
return s._clientConnected(p.TraceID)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -9,8 +9,8 @@ import (
|
||||
|
||||
type peerSuper struct {
|
||||
// The purpose of this state machine is to manage this published data.
|
||||
published *atomic.Pointer[peerRoutingData]
|
||||
staged peerRoutingData // Local copy of shared data. See publish().
|
||||
published *atomic.Pointer[peerRouteInfo]
|
||||
staged peerRouteInfo // Local copy of shared data. See publish().
|
||||
|
||||
// The other remote peers.
|
||||
peers *remotePeers
|
||||
@ -78,3 +78,18 @@ func (s *peerSuper) sendControlPacket(pkt interface{ Marshal([]byte) []byte }) {
|
||||
s.conn.WriteTo(buf, s.staged.remoteAddr)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (s *peerSuper) sendControlPacketDirect(pkt interface{ Marshal([]byte) []byte }) {
|
||||
buf := pkt.Marshal(s.buf)
|
||||
h := header{
|
||||
StreamID: controlStreamID,
|
||||
Counter: atomic.AddUint64(s.counter, 1),
|
||||
SourceIP: s.localIP,
|
||||
DestIP: s.remoteIP,
|
||||
}
|
||||
|
||||
buf = s.staged.controlCipher.Encrypt(h, buf, s.encBuf)
|
||||
s.conn.WriteTo(buf, s.staged.remoteAddr)
|
||||
}
|
||||
|
@ -16,7 +16,7 @@ func (rp *remotePeer) supervise(conf m.PeerConfig) {
|
||||
defer panicHandler()
|
||||
|
||||
super := &peerSuper{
|
||||
published: rp.published,
|
||||
published: rp.route,
|
||||
peers: rp.peers,
|
||||
localIP: rp.localIP,
|
||||
localPub: addrIsValid(conf.PublicIP),
|
||||
|
31
node/peer.go
31
node/peer.go
@ -11,13 +11,16 @@ import (
|
||||
|
||||
type remotePeers [256]*remotePeer
|
||||
|
||||
type peerRoutingData struct {
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
type peerRouteInfo struct {
|
||||
up bool
|
||||
relay bool
|
||||
controlCipher *controlCipher
|
||||
dataCipher *dataCipher
|
||||
remoteAddr netip.AddrPort
|
||||
relayIP byte // Non-zero if we should relay.
|
||||
localAddr netip.AddrPort // Local address as seen by the remote.
|
||||
relayIP byte // Non-zero if we should relay.
|
||||
}
|
||||
|
||||
type remotePeer struct {
|
||||
@ -28,8 +31,8 @@ type remotePeer struct {
|
||||
conn *connWriter
|
||||
|
||||
// Shared state.
|
||||
peers *remotePeers
|
||||
published *atomic.Pointer[peerRoutingData]
|
||||
peers *remotePeers
|
||||
route *atomic.Pointer[peerRouteInfo]
|
||||
|
||||
// Only used in HandlePacket / Not synchronized.
|
||||
dupCheck *dupCheck
|
||||
@ -57,7 +60,7 @@ func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *conn
|
||||
iface: iface,
|
||||
conn: conn,
|
||||
peers: peers,
|
||||
published: &atomic.Pointer[peerRoutingData]{},
|
||||
route: &atomic.Pointer[peerRouteInfo]{},
|
||||
dupCheck: newDupCheck(0),
|
||||
decryptBuf: make([]byte, bufferSize),
|
||||
encryptBuf: make([]byte, bufferSize),
|
||||
@ -66,8 +69,8 @@ func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *conn
|
||||
controlPackets: make(chan controlPacket, 512),
|
||||
}
|
||||
|
||||
pd := peerRoutingData{}
|
||||
rp.published.Store(&pd)
|
||||
pd := peerRouteInfo{}
|
||||
rp.route.Store(&pd)
|
||||
|
||||
//go newPeerSuper(rp).Run()
|
||||
go rp.supervise(conf)
|
||||
@ -111,7 +114,7 @@ func (rp *remotePeer) HandlePacket(addr netip.AddrPort, h header, data []byte) {
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []byte) {
|
||||
routingData := rp.published.Load()
|
||||
routingData := rp.route.Load()
|
||||
if routingData.controlCipher == nil {
|
||||
rp.logf("Not connected (control).")
|
||||
return
|
||||
@ -158,7 +161,7 @@ func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (rp *remotePeer) handleDataPacket(data []byte) {
|
||||
routingData := rp.published.Load()
|
||||
routingData := rp.route.Load()
|
||||
if routingData.dataCipher == nil {
|
||||
rp.logf("Not connected (recv).")
|
||||
return
|
||||
@ -176,7 +179,7 @@ func (rp *remotePeer) handleDataPacket(data []byte) {
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (rp *remotePeer) handleRelayPacket(h header, data []byte) {
|
||||
routingData := rp.published.Load()
|
||||
routingData := rp.route.Load()
|
||||
if routingData.dataCipher == nil {
|
||||
rp.logf("Not connected (recv).")
|
||||
return
|
||||
@ -201,7 +204,7 @@ func (rp *remotePeer) SendData(data []byte) {
|
||||
}
|
||||
|
||||
func (rp *remotePeer) HandleInterfacePacket(data []byte) {
|
||||
routingData := rp.published.Load()
|
||||
routingData := rp.route.Load()
|
||||
|
||||
if routingData.dataCipher == nil {
|
||||
rp.logf("Not connected (handle interface).")
|
||||
@ -227,7 +230,7 @@ func (rp *remotePeer) HandleInterfacePacket(data []byte) {
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (rp *remotePeer) CanRelay() bool {
|
||||
data := rp.published.Load()
|
||||
data := rp.route.Load()
|
||||
return data.relay && data.up
|
||||
}
|
||||
|
||||
@ -240,7 +243,7 @@ func (rp *remotePeer) RelayTo(destIP byte, data []byte) {
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (rp *remotePeer) encryptAndSend(streamID byte, destIP byte, data []byte) {
|
||||
routingData := rp.published.Load()
|
||||
routingData := rp.route.Load()
|
||||
if routingData.dataCipher == nil || routingData.remoteAddr == zeroAddrPort {
|
||||
rp.logf("Not connected (encrypt and send).")
|
||||
return
|
||||
@ -262,7 +265,7 @@ func (rp *remotePeer) encryptAndSend(streamID byte, destIP byte, data []byte) {
|
||||
// SendAsIs is used when forwarding already-encrypted data from one peer to
|
||||
// another.
|
||||
func (rp *remotePeer) SendAsIs(data []byte) {
|
||||
routingData := rp.published.Load()
|
||||
routingData := rp.route.Load()
|
||||
if routingData.remoteAddr == zeroAddrPort {
|
||||
rp.logf("Not connected (send direct).")
|
||||
return
|
||||
|
Loading…
x
Reference in New Issue
Block a user