From f87c2e59b4203bb1104eca53956a70c4e7e2c7d8 Mon Sep 17 00:00:00 2001 From: jdl Date: Fri, 20 Dec 2024 16:13:59 +0100 Subject: [PATCH] wip: cleaning --- node/cipher.go | 6 - node/conn.go | 205 +------------------------- node/crypto.go | 50 ------- node/crypto_test.go | 137 ------------------ node/header.go | 34 +---- node/interface.go | 2 +- node/main.go | 2 +- node/node.go | 1 - node/peer.go | 4 +- node/peerstate.go | 1 - node/peersupervisor.go | 319 ----------------------------------------- node/router.go | 189 ------------------------ node/routingpacket.go | 33 ----- node/tmp-server.go | 185 ------------------------ 14 files changed, 10 insertions(+), 1158 deletions(-) delete mode 100644 node/cipher.go delete mode 100644 node/crypto.go delete mode 100644 node/crypto_test.go delete mode 100644 node/node.go delete mode 100644 node/peerstate.go delete mode 100644 node/peersupervisor.go delete mode 100644 node/routingpacket.go delete mode 100644 node/tmp-server.go diff --git a/node/cipher.go b/node/cipher.go deleted file mode 100644 index cb7accd..0000000 --- a/node/cipher.go +++ /dev/null @@ -1,6 +0,0 @@ -package node - -type packetCipher interface { - Encrypt(h xHeader, data, out []byte) []byte - Decrypt(encrypted, out []byte) (data []byte, ok bool) -} diff --git a/node/conn.go b/node/conn.go index 7f7e4e3..344d8d5 100644 --- a/node/conn.go +++ b/node/conn.go @@ -6,22 +6,20 @@ import ( "net" "net/netip" "sync" - "sync/atomic" - "vppn/fasttime" ) // ---------------------------------------------------------------------------- -type connWriter2 struct { +type connWriter struct { lock sync.Mutex conn *net.UDPConn } -func newConnWriter2(conn *net.UDPConn) *connWriter2 { - return &connWriter2{conn: conn} +func newConnWriter(conn *net.UDPConn) *connWriter { + return &connWriter{conn: conn} } -func (w *connWriter2) WriteTo(packet []byte, addr netip.AddrPort) { +func (w *connWriter) WriteTo(packet []byte, addr netip.AddrPort) { w.lock.Lock() if _, err := w.conn.WriteToUDPAddrPort(packet, addr); err != nil { log.Fatalf("Failed to write to UDP port: %v", err) @@ -47,198 +45,3 @@ func (w *ifWriter) Write(packet []byte) { } w.lock.Unlock() } - -// ---------------------------------------------------------------------------- - -// TODO: Delete below?? - -type connWriter struct { - *net.UDPConn - lock sync.Mutex - localIP byte - buf []byte - buf2 []byte - counters [256]uint64 - routing *routingTable -} - -func newConnWriter(conn *net.UDPConn, localIP byte, routing *routingTable) *connWriter { - w := &connWriter{ - UDPConn: conn, - localIP: localIP, - buf: make([]byte, bufferSize), - buf2: make([]byte, bufferSize), - routing: routing, - } - - for i := range w.counters { - w.counters[i] = uint64(fasttime.Now() << 30) - } - - return w -} - -/* - func (w *connWriter) SendRouting(remoteIP byte, data []byte) { - dstPeer := w.routing.Get(remoteIP) - if dstPeer == nil { - log.Printf("No peer: %d", remoteIP) - return - } - - var viaPeer *peer - - if dstPeer.Addr == zeroAddrPort { - viaPeer = w.routing.Mediator() - if viaPeer == nil { - log.Printf("No mediator: %d", remoteIP) - return - } - } - - w.sendRouting(dstPeer, viaPeer, data) - } -*/ - -func (w *connWriter) SendData(remoteIP byte, data []byte) { - // TODO -} - -// TODO: deprecated -func (w *connWriter) WriteTo(remoteIP, stream byte, data []byte) { - dstPeer := w.routing.Get(remoteIP) - if dstPeer == nil { - log.Printf("No peer: %d", remoteIP) - return - } - - if stream == streamData && !dstPeer.Up { - log.Printf("Peer down: %d", remoteIP) - return - } - - var viaPeer *peer - if dstPeer.Mediated { - viaPeer = w.routing.mediator.Load() - if viaPeer == nil || viaPeer.Addr == zeroAddrPort { - log.Printf("Mediator not connected") - return - } - } else if dstPeer.Addr == zeroAddrPort { - log.Printf("Peer doesn't have address: %d", remoteIP) - return - } - - w.WriteToPeer(dstPeer, viaPeer, stream, data) -} - -// TODO: deprecated -func (w *connWriter) WriteToPeer(dstPeer, viaPeer *peer, stream byte, data []byte) { - w.lock.Lock() - - addr := dstPeer.Addr - - h := header{ - Counter: atomic.AddUint64(&w.counters[dstPeer.IP], 1), - SourceIP: w.localIP, - DestIP: dstPeer.IP, - Stream: stream, - } - - buf := encryptPacketAsym(&h, dstPeer.SharedKey, data, w.buf) - - if viaPeer != nil { - h := header{ - Counter: atomic.AddUint64(&w.counters[viaPeer.IP], 1), - SourceIP: w.localIP, - DestIP: dstPeer.IP, - Forward: 1, - Stream: stream, - } - - buf = encryptPacketAsym(&h, viaPeer.SharedKey, buf, w.buf2) - addr = viaPeer.Addr - } - - if _, err := w.WriteToUDPAddrPort(buf, addr); err != nil { - log.Fatalf("Failed to write to UDP port: %v", err) - } - w.lock.Unlock() -} - -// TODO: deprecated -func (w *connWriter) Forward(dstIP byte, packet []byte) { - dstPeer := w.routing.Get(dstIP) - if dstPeer == nil || dstPeer.Addr == zeroAddrPort { - log.Printf("No peer: %d", dstIP) - return - } - - if _, err := w.WriteToUDPAddrPort(packet, dstPeer.Addr); err != nil { - log.Fatalf("Failed to write to UDP port: %v", err) - } -} - -// ---------------------------------------------------------------------------- - -type connReader struct { - *net.UDPConn - localIP byte - dupChecks [256]*dupCheck - routing *routingTable - buf []byte -} - -func newConnReader(conn *net.UDPConn, localIP byte, routing *routingTable) *connReader { - r := &connReader{ - UDPConn: conn, - localIP: localIP, - routing: routing, - buf: make([]byte, bufferSize), - } - for i := range r.dupChecks { - r.dupChecks[i] = newDupCheck(0) - } - return r -} - -func (r *connReader) Read(buf []byte) (remoteAddr netip.AddrPort, h header, data []byte) { - var ( - n int - err error - ) - - for { - n, remoteAddr, err = r.ReadFromUDPAddrPort(buf[:bufferSize]) - if err != nil { - log.Fatalf("Failed to read from UDP port: %v", err) - } - - data = buf[:n] - - if n < headerSize { - continue // Packet it soo short. - } - - h.Parse(data) - - peer := r.routing.Get(h.SourceIP) - if peer == nil { - continue - } - - out, ok := decryptPacketAsym(peer.SharedKey, data, r.buf) - if !ok { - continue - } - - out, data = data, out - - if r.dupChecks[h.SourceIP].IsDup(h.Counter) { - log.Printf("Duplicate: %d", h.Counter) - continue - } - - return - } -} diff --git a/node/crypto.go b/node/crypto.go deleted file mode 100644 index 0f7710f..0000000 --- a/node/crypto.go +++ /dev/null @@ -1,50 +0,0 @@ -package node - -import ( - "sync" - "vppn/fasttime" - - "golang.org/x/crypto/nacl/box" -) - -// Encrypting the packet will also set the header's DataSize field. -func encryptPacketAsym(h *header, sharedKey, data, out []byte) []byte { - out = out[:headerSize] - h.Marshal(out) - b := box.SealAfterPrecomputation(out[headerSize:headerSize], data, (*[24]byte)(out[:headerSize]), (*[32]byte)(sharedKey)) - return out[:len(b)+headerSize] -} - -func decryptPacketAsym(sharedKey, packetAndHeader, out []byte) (decrypted []byte, ok bool) { - return box.OpenAfterPrecomputation( - out[:0], - packetAndHeader[headerSize:], - (*[24]byte)(packetAndHeader[:headerSize]), - (*[32]byte)(sharedKey)) -} - -func computeSharedKey(peerPubKey, privKey []byte) []byte { - shared := [32]byte{} - box.Precompute(&shared, (*[32]byte)(peerPubKey), (*[32]byte)(privKey)) - return shared[:] -} - -var ( - traceIDLock sync.Mutex - traceIDTime uint64 - traceIDCounter uint64 -) - -func newTraceID() (id uint64) { - traceIDLock.Lock() - defer traceIDLock.Unlock() - - now := uint64(fasttime.Now()) - if traceIDTime < now { - traceIDTime = now - traceIDCounter = 0 - } - traceIDCounter++ - - return traceIDTime<<30 + traceIDCounter -} diff --git a/node/crypto_test.go b/node/crypto_test.go deleted file mode 100644 index 76f408f..0000000 --- a/node/crypto_test.go +++ /dev/null @@ -1,137 +0,0 @@ -package node - -import ( - "bytes" - "crypto/rand" - "reflect" - "testing" - - "golang.org/x/crypto/nacl/box" -) - -func TestEncryptDecryptAsym(t *testing.T) { - pubKey1, privKey1, err := box.GenerateKey(rand.Reader) - if err != nil { - t.Fatal(err) - } - - pubKey2, privKey2, err := box.GenerateKey(rand.Reader) - if err != nil { - t.Fatal(err) - } - - sharedEncKey := [32]byte{} - box.Precompute(&sharedEncKey, pubKey2, privKey1) - - sharedDecKey := [32]byte{} - box.Precompute(&sharedDecKey, pubKey1, privKey2) - - original := make([]byte, if_mtu-64) - rand.Read(original) - - h := header{ - Counter: 2893749238, - SourceIP: 5, - DestIP: 12, - Forward: 1, - Stream: 1, - } - - encrypted := make([]byte, bufferSize) - encrypted = encryptPacketAsym(&h, sharedEncKey[:], original, encrypted) - - decrypted := make([]byte, bufferSize) - var ok bool - decrypted, ok = decryptPacketAsym(sharedDecKey[:], encrypted, decrypted) - if !ok { - t.Fatal(ok) - } - - var h2 header - h2.Parse(encrypted) - - if !reflect.DeepEqual(h, h2) { - t.Fatal(h, h2) - } - - if !bytes.Equal(original, decrypted) { - t.Fatal("mismatch") - } -} - -func BenchmarkEncryptAsym(b *testing.B) { - _, privKey1, err := box.GenerateKey(rand.Reader) - if err != nil { - b.Fatal(err) - } - - pubKey2, _, err := box.GenerateKey(rand.Reader) - if err != nil { - b.Fatal(err) - } - - sharedEncKey := [32]byte{} - box.Precompute(&sharedEncKey, pubKey2, privKey1) - - original := make([]byte, if_mtu) - rand.Read(original) - - nonce := make([]byte, headerSize) - rand.Read(nonce) - - encrypted := make([]byte, bufferSize) - - h := header{ - Counter: 2893749238, - SourceIP: 5, - DestIP: 12, - Forward: 1, - Stream: 1, - } - - for i := 0; i < b.N; i++ { - encrypted = encryptPacketAsym(&h, sharedEncKey[:], original, encrypted) - } -} - -func BenchmarkDecryptAsym(b *testing.B) { - pubKey1, privKey1, err := box.GenerateKey(rand.Reader) - if err != nil { - b.Fatal(err) - } - - pubKey2, privKey2, err := box.GenerateKey(rand.Reader) - if err != nil { - b.Fatal(err) - } - - sharedEncKey := [32]byte{} - box.Precompute(&sharedEncKey, pubKey2, privKey1) - - sharedDecKey := [32]byte{} - box.Precompute(&sharedDecKey, pubKey1, privKey2) - - original := make([]byte, if_mtu) - rand.Read(original) - - nonce := make([]byte, headerSize) - rand.Read(nonce) - - h := header{ - Counter: 2893749238, - SourceIP: 5, - DestIP: 12, - Forward: 1, - Stream: 1, - } - - encrypted := encryptPacketAsym(&h, sharedEncKey[:], original, make([]byte, bufferSize)) - decrypted := make([]byte, bufferSize) - var ok bool - for i := 0; i < b.N; i++ { - decrypted, ok = decryptPacketAsym(sharedDecKey[:], encrypted, decrypted) - if !ok { - panic(ok) - } - } -} diff --git a/node/header.go b/node/header.go index d2eb142..f2e300f 100644 --- a/node/header.go +++ b/node/header.go @@ -5,6 +5,8 @@ import "unsafe" // ---------------------------------------------------------------------------- const ( + headerSize = 12 + controlStreamID = 2 controlHeaderSize = 24 controlCipherOverhead = 16 @@ -37,35 +39,3 @@ func (h *xHeader) Marshal(buf []byte) { buf[10] = h.DestIP buf[11] = 0 } - -// ---------------------------------------------------------------------------- -// TODO: Remove this code. -const ( - headerSize = 24 - streamData = 1 - streamControl = 2 -) - -type header struct { - Counter uint64 // Init with fasttime.Now() << 30 to ensure monotonic. - SourceIP byte - DestIP byte - Forward byte - Stream byte // See stream* constants. -} - -func (hdr *header) Parse(nb []byte) { - hdr.Counter = *(*uint64)(unsafe.Pointer(&nb[0])) - hdr.SourceIP = nb[8] - hdr.DestIP = nb[9] - hdr.Forward = nb[10] - hdr.Stream = nb[11] -} - -func (hdr header) Marshal(buf []byte) { - *(*uint64)(unsafe.Pointer(&buf[0])) = hdr.Counter - buf[8] = hdr.SourceIP - buf[9] = hdr.DestIP - buf[10] = hdr.Forward - buf[11] = hdr.Stream -} diff --git a/node/interface.go b/node/interface.go index c5edf3e..e066b2b 100644 --- a/node/interface.go +++ b/node/interface.go @@ -51,7 +51,7 @@ func readNextPacket(iface io.ReadWriteCloser, buf []byte) ([]byte, byte, error) } const ( - if_mtu = 1200 + if_mtu = 1350 if_queue_len = 2048 ) diff --git a/node/main.go b/node/main.go index f5c9bc7..00d7f9c 100644 --- a/node/main.go +++ b/node/main.go @@ -102,7 +102,7 @@ func main(netName, listenIP string, port uint16) { log.Fatalf("Failed to open UDP port: %v", err) } - connWriter := newConnWriter2(conn) + connWriter := newConnWriter(conn) ifWriter := newIFWriter(iface) peers := remotePeers{} diff --git a/node/node.go b/node/node.go deleted file mode 100644 index 2b4023a..0000000 --- a/node/node.go +++ /dev/null @@ -1 +0,0 @@ -package node diff --git a/node/peer.go b/node/peer.go index 19cddfd..ab7ca77 100644 --- a/node/peer.go +++ b/node/peer.go @@ -24,7 +24,7 @@ type remotePeer struct { privKey []byte localPublic bool // True if local node is public. iface *ifWriter - conn *connWriter2 + conn *connWriter // Shared state. shared *atomic.Pointer[peerData] @@ -47,7 +47,7 @@ type remotePeer struct { controlPackets chan controlPacket } -func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *connWriter2) *remotePeer { +func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *connWriter) *remotePeer { rp := &remotePeer{ localIP: conf.PeerIP, remoteIP: remoteIP, diff --git a/node/peerstate.go b/node/peerstate.go deleted file mode 100644 index 2b4023a..0000000 --- a/node/peerstate.go +++ /dev/null @@ -1 +0,0 @@ -package node diff --git a/node/peersupervisor.go b/node/peersupervisor.go deleted file mode 100644 index 90763b4..0000000 --- a/node/peersupervisor.go +++ /dev/null @@ -1,319 +0,0 @@ -package node - -import ( - "fmt" - "log" - "net/netip" - "time" - "vppn/m" -) - -type routingPacketWrapper struct { - routingPacket - Addr netip.AddrPort // Source. -} - -type peerSupervisor struct { - // Constants: - localIP byte - localPublic bool - remoteIP byte - privKey []byte - - // Shared data: - w *connWriter - table *routingTable - - packets chan routingPacketWrapper - peerUpdates chan *m.Peer - - // Peer-related items. - version int64 // Ony accessed in HandlePeerUpdate. - peer *m.Peer - remoteAddrPort netip.AddrPort - mediated bool - sharedKey []byte - - // Used by our state functions. - pingTimer *time.Timer - timeoutTimer *time.Timer - buf []byte -} - -// ---------------------------------------------------------------------------- - -func newPeerSupervisor( - conf m.PeerConfig, - remoteIP byte, - w *connWriter, - table *routingTable, -) *peerSupervisor { - s := &peerSupervisor{ - localIP: conf.PeerIP, - remoteIP: remoteIP, - privKey: conf.EncPrivKey, - w: w, - table: table, - packets: make(chan routingPacketWrapper, 256), - peerUpdates: make(chan *m.Peer, 1), - pingTimer: time.NewTimer(pingInterval), - timeoutTimer: time.NewTimer(timeoutInterval), - buf: make([]byte, bufferSize), - } - - _, s.localPublic = netip.AddrFromSlice(conf.PublicIP) - - go s.mainLoop() - return s -} - -func (s *peerSupervisor) logf(msg string, args ...any) { - msg = fmt.Sprintf("[%03d] ", s.remoteIP) + msg - log.Printf(msg, args...) -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) mainLoop() { - defer panicHandler() - state := s.stateInit - for { - state = state() - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) HandlePeerUpdate(p *m.Peer) { - if p != nil { - if p.Version == s.version { - return - } - s.version = p.Version - } else { - s.version = 0 - } - - s.peerUpdates <- p -} - -func (s *peerSupervisor) HandlePacket(w routingPacketWrapper) { - select { - case s.packets <- w: - default: - // Drop - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) stateInit() stateFunc { - if s.peer == nil { - return s.stateDisconnected - } - - addr, ok := netip.AddrFromSlice(s.peer.PublicIP) - if ok { - addrPort := netip.AddrPortFrom(addr, s.peer.Port) - s.remoteAddrPort = addrPort - } else { - s.remoteAddrPort = zeroAddrPort - } - s.sharedKey = computeSharedKey(s.peer.EncPubKey, s.privKey) - - return s.stateSelectRole() -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) stateDisconnected() stateFunc { - s.clearRoutingTable() - - for { - select { - case <-s.packets: - // Drop - case s.peer = <-s.peerUpdates: - return s.stateInit - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) stateSelectRole() stateFunc { - s.logf("STATE: SelectRole") - s.updateRoutingTable(false) - - if s.remoteAddrPort != zeroAddrPort { - s.mediated = false - - // If both remote and local are public, one side acts as client, and one - // side as server. - if s.localPublic && s.localIP < s.peer.PeerIP { - return s.stateAccept - } - return s.stateDial - } - - // We're public, remote is not => can only wait for connection - if s.localPublic { - s.mediated = false - return s.stateAccept - } - - // Both non-public => need to use mediator. - return s.stateMediated -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) stateAccept() stateFunc { - s.logf("STATE: Accept") - - for { - - select { - case pkt := <-s.packets: - switch pkt.Type { - - case packetTypePing: - s.remoteAddrPort = pkt.Addr - s.updateRoutingTable(true) - s.sendPong(pkt.TraceID) - return s.stateConnected - - default: - // Still waiting for ping... - } - - case s.peer = <-s.peerUpdates: - return s.stateInit - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) stateDial() stateFunc { - s.logf("STATE: Dial") - s.updateRoutingTable(false) - - s.sendPing() - - for { - select { - case pkt := <-s.packets: - - switch pkt.Type { - - case packetTypePong: - s.updateRoutingTable(true) - return s.stateConnected - - default: - // Ignore - } - - case <-s.pingTimer.C: - s.sendPing() - - case s.peer = <-s.peerUpdates: - return s.stateInit - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) stateConnected() stateFunc { - s.logf("STATE: Connected") - - s.timeoutTimer.Reset(timeoutInterval) - - for { - select { - - case <-s.pingTimer.C: - s.sendPing() - - case <-s.timeoutTimer.C: - s.logf("Timeout") - return s.stateInit - - case pkt := <-s.packets: - switch pkt.Type { - case packetTypePing: - s.sendPong(pkt.TraceID) - - // Server should always follow remote port. - if s.localPublic { - if pkt.Addr != s.remoteAddrPort { - s.remoteAddrPort = pkt.Addr - s.updateRoutingTable(true) - } - } - - case packetTypePong: - s.timeoutTimer.Reset(timeoutInterval) - - default: - // Drop packet. - } - - case s.peer = <-s.peerUpdates: - s.logf("New peer: %v", s.peer) - return s.stateInit - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) stateMediated() stateFunc { - s.logf("STATE: Mediated") - s.mediated = true - s.updateRoutingTable(true) - - for { - select { - case <-s.packets: - // Drop. - case s.peer = <-s.peerUpdates: - s.logf("New peer: %v", s.peer) - return s.stateInit - } - } -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) clearRoutingTable() { - s.table.Set(s.remoteIP, nil) -} - -func (s *peerSupervisor) updateRoutingTable(up bool) { - s.table.Set(s.remoteIP, &peer{ - Up: up, - Mediator: s.peer.Mediator, - Mediated: s.mediated, - IP: s.remoteIP, - Addr: s.remoteAddrPort, - SharedKey: s.sharedKey, - }) -} - -// ---------------------------------------------------------------------------- - -func (s *peerSupervisor) sendPing() uint64 { - traceID := newTraceID() - pkt := newRoutingPacket(packetTypePing, traceID) - s.w.WriteTo(s.peer.PeerIP, streamControl, pkt.Marshal(s.buf)) - s.pingTimer.Reset(pingInterval) - return traceID -} - -func (s *peerSupervisor) sendPong(traceID uint64) { - pkt := newRoutingPacket(packetTypePong, traceID) - s.w.WriteTo(s.peer.PeerIP, streamControl, pkt.Marshal(s.buf)) -} diff --git a/node/router.go b/node/router.go index 0e74d14..116b4d0 100644 --- a/node/router.go +++ b/node/router.go @@ -1,196 +1,7 @@ package node import ( - "encoding/json" - "io" - "log" - "net/http" "net/netip" - "net/url" - "sync/atomic" - "time" - "vppn/m" ) var zeroAddrPort = netip.AddrPort{} - -type peer struct { - IP byte // The VPN IP. - Up bool // No data will be sent to peers that are down. - Addr netip.AddrPort // If we have direct connection, otherwise use mediator. - Mediator bool // True if the peer will mediate. - RoutingCipher controlCipher - DataCipher dataCipher - - // TODO: Deprecated below. - Mediated bool - SharedKey []byte -} - -// ---------------------------------------------------------------------------- - -type routingTable struct { - table [256]*atomic.Pointer[peer] - mediator *atomic.Pointer[peer] -} - -func newRoutingTable() *routingTable { - r := routingTable{ - mediator: &atomic.Pointer[peer]{}, - } - - for i := range r.table { - r.table[i] = &atomic.Pointer[peer]{} - } - - return &r -} - -func (r *routingTable) Get(ip byte) *peer { - return r.table[ip].Load() -} - -func (r *routingTable) Set(ip byte, p *peer) { - r.table[ip].Store(p) -} - -func (r *routingTable) Mediator() *peer { - return r.mediator.Load() -} - -// ---------------------------------------------------------------------------- - -type router struct { - *routingTable - netName string - peerSupers [256]*peerSupervisor -} - -func newRouter(netName string, conf m.PeerConfig, routingData *routingTable, w *connWriter) *router { - r := &router{ - netName: netName, - routingTable: routingData, - } - - for i := range r.peerSupers { - r.peerSupers[i] = newPeerSupervisor( - conf, - byte(i), - w, - r.routingTable) - } - - go r.selectMediator() - go r.pollHub(conf) - - return r -} - -// ---------------------------------------------------------------------------- - -func (r *router) HandlePacket(sourceIP byte, remoteAddr netip.AddrPort, data []byte) { - p := routingPacket{} - if err := p.Parse(data); err != nil { - log.Printf("Dropping malformed routing packet: %v", err) - return - } - - w := routingPacketWrapper{ - routingPacket: p, - Addr: remoteAddr, - } - - r.peerSupers[sourceIP].HandlePacket(w) -} - -// ---------------------------------------------------------------------------- - -func (r *router) pollHub(conf m.PeerConfig) { - defer panicHandler() - - u, err := url.Parse(conf.HubAddress) - if err != nil { - log.Fatalf("Failed to parse hub address %s: %v", conf.HubAddress, err) - } - u.Path = "/peer/fetch-state/" - - client := &http.Client{Timeout: 8 * time.Second} - - req := &http.Request{ - Method: http.MethodGet, - URL: u, - Header: http.Header{}, - } - req.SetBasicAuth("", conf.APIKey) - - state, err := loadNetworkState(r.netName) - if err != nil { - log.Printf("Failed to load network state: %v", err) - log.Printf("Polling hub...") - r._pollHub(conf, client, req) - } else { - r.applyNetworkState(conf, state) - } - - for range time.Tick(64 * time.Second) { - r._pollHub(conf, client, req) - } -} - -func (r *router) _pollHub(conf m.PeerConfig, client *http.Client, req *http.Request) { - var state m.NetworkState - - log.Printf("Fetching peer state from %s...", conf.HubAddress) - resp, err := client.Do(req) - if err != nil { - log.Printf("Failed to fetch peer state: %v", err) - return - } - body, err := io.ReadAll(resp.Body) - _ = resp.Body.Close() - if err != nil { - log.Printf("Failed to read body from hub: %v", err) - return - } - - if err := json.Unmarshal(body, &state); err != nil { - log.Printf("Failed to unmarshal response from hub: %v", err) - return - } - - r.applyNetworkState(conf, state) - - if err := storeNetworkState(r.netName, state); err != nil { - log.Printf("Failed to store network state: %v", err) - } -} - -func (r *router) applyNetworkState(conf m.PeerConfig, state m.NetworkState) { - for i := range state.Peers { - if i != int(conf.PeerIP) { - r.peerSupers[i].HandlePeerUpdate(state.Peers[i]) - } - } -} - -// ---------------------------------------------------------------------------- - -func (r *router) selectMediator() { - for range time.Tick(8 * time.Second) { - current := r.mediator.Load() - if current != nil && current.Up { - continue - } - - for i := range r.table { - peer := r.table[i].Load() - if peer != nil && peer.Up && peer.Mediator { - log.Printf("Got mediator: %v", *peer) - r.mediator.Store(peer) - return - } - } - - r.mediator.Store(nil) - } -} diff --git a/node/routingpacket.go b/node/routingpacket.go deleted file mode 100644 index 1b5aed1..0000000 --- a/node/routingpacket.go +++ /dev/null @@ -1,33 +0,0 @@ -package node - -import ( - "unsafe" -) - -type routingPacket struct { - Type byte // One of the packetType* constants. - TraceID uint64 // For matching requests and responses. -} - -func newRoutingPacket(reqType byte, traceID uint64) routingPacket { - return routingPacket{ - Type: reqType, - TraceID: traceID, - } -} - -func (p routingPacket) Marshal(buf []byte) []byte { - buf = buf[:32] // Reserve 32 bytes just in case we need to add anything. - buf[0] = p.Type - *(*uint64)(unsafe.Pointer(&buf[1])) = uint64(p.TraceID) - return buf -} - -func (p *routingPacket) Parse(buf []byte) error { - if len(buf) != 32 { - return errMalformedPacket - } - p.Type = buf[0] - p.TraceID = *(*uint64)(unsafe.Pointer(&buf[1])) - return nil -} diff --git a/node/tmp-server.go b/node/tmp-server.go deleted file mode 100644 index 179a8a4..0000000 --- a/node/tmp-server.go +++ /dev/null @@ -1,185 +0,0 @@ -package node - -/* -var ( - network = []byte{10, 1, 1, 0} - serverIP = byte(1) - clientIP = byte(2) - port = uint16(5151) - netName = "testnet" - pubKey1 = []byte{0x43, 0xde, 0xd4, 0xb2, 0x1d, 0x71, 0x58, 0x9a, 0x96, 0x3a, 0x23, 0xfc, 0x2, 0xe, 0xfa, 0x42, 0x3, 0x94, 0xbc, 0xf8, 0x25, 0xf, 0x54, 0xcc, 0x98, 0x42, 0x8b, 0xe5, 0x27, 0x86, 0x49, 0x33} - privKey1 = []byte{0xae, 0x4d, 0xc5, 0xaa, 0xc9, 0xbc, 0x65, 0x41, 0x55, 0xb, 0x61, 0x52, 0xc4, 0x6c, 0xce, 0x2f, 0x1b, 0xf5, 0xb3, 0xbf, 0xb5, 0x54, 0x61, 0x7c, 0x26, 0x2e, 0xba, 0x5a, 0x19, 0xe2, 0x9c, 0xe0} - pubKey2 = []byte{0x8c, 0xfe, 0x12, 0xd9, 0x2d, 0x37, 0x5, 0x43, 0xab, 0x70, 0x59, 0x20, 0x3d, 0x82, 0x93, 0x9b, 0xb3, 0xaa, 0x35, 0x23, 0xc1, 0xb4, 0x4, 0x1f, 0x92, 0x97, 0x6f, 0xfd, 0x55, 0x17, 0x5a, 0x4b} - privKey2 = []byte{0xd9, 0xe1, 0xc6, 0x64, 0x3e, 0x29, 0x29, 0x78, 0x81, 0x53, 0xc2, 0x31, 0xd9, 0x34, 0x5b, 0x41, 0xf5, 0x80, 0xb0, 0x27, 0x9f, 0x65, 0x85, 0xd4, 0x78, 0xd5, 0x9, 0x2, 0xca, 0x56, 0x42, 0x80} -) - -func must(err error) { - if err != nil { - panic(err) - } -} - -type TmpNode struct { - network []byte - localIP byte - router *router - port uint16 - netName string - iface io.ReadWriteCloser - pubKey []byte - privKey []byte - w *connWriter - r *connReader -} - -// ---------------------------------------------------------------------------- - -func NewTmpNodeServer() *TmpNode { - n := &TmpNode{ - localIP: serverIP, - network: network, - router: &router{table: newPeerRepo()}, - port: port, - netName: netName, - pubKey: pubKey1, - privKey: privKey1, - } - - var err error - n.iface, err = openInterface(n.network, n.localIP, n.netName) - must(err) - - myAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", n.port)) - must(err) - - conn, err := net.ListenUDP("udp", myAddr) - must(err) - - n.w = newConnWriter(conn, n.localIP, n.router) - n.r = newConnReader(conn, n.localIP, n.router) - - n.router.table.Set(clientIP, &peer{ - IP: clientIP, - SharedKey: computeSharedKey(pubKey2, n.privKey), - }) - - return n -} - -// ---------------------------------------------------------------------------- - -func NewTmpNodeClient(srvAddrStr string) *TmpNode { - n := &TmpNode{ - localIP: clientIP, - network: network, - router: &router{table: newPeerRepo()}, - port: port, - netName: netName, - pubKey: pubKey2, - privKey: privKey2, - } - - var err error - n.iface, err = openInterface(n.network, n.localIP, n.netName) - must(err) - - myAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", n.port)) - must(err) - - conn, err := net.ListenUDP("udp", myAddr) - must(err) - - n.w = newConnWriter(conn, n.localIP, n.router) - n.r = newConnReader(conn, n.localIP, n.router) - - serverAddr, err := netip.ParseAddrPort(fmt.Sprintf("%s:%d", srvAddrStr, port)) - must(err) - - n.router.table.Set(serverIP, &peer{ - IP: serverIP, - Addr: &serverAddr, - SharedKey: computeSharedKey(pubKey1, n.privKey), - }) - - return n -} - -// ---------------------------------------------------------------------------- - -func (n *TmpNode) RunServer() { - defer func() { - if r := recover(); r != nil { - fmt.Printf("%v", r) - debug.PrintStack() - } - }() - - // Get remoteAddr from a packet. - buf := make([]byte, bufferSize) - remoteAddr, h, _, err := n.r.Read(buf) - must(err) - log.Printf("Got remote addr: %d -> %v", h.SourceIP, remoteAddr) - must(err) - - n.router.table.Set(h.SourceIP, &peer{ - IP: h.SourceIP, - Addr: &remoteAddr, - SharedKey: computeSharedKey(pubKey2, n.privKey), - }) - - go n.readFromIFace() - n.readFromConn() -} - -// ---------------------------------------------------------------------------- - -func (n *TmpNode) RunClient() { - defer func() { - if r := recover(); r != nil { - fmt.Printf("%v\n", r) - debug.PrintStack() - } - }() - - log.Printf("Sending to server...") - must(n.w.WriteTo(serverIP, 1, []byte{1, 2, 3, 4, 5, 6, 7, 8})) - - go n.readFromIFace() - n.readFromConn() -} - -func (n *TmpNode) readFromIFace() { - var ( - buf = make([]byte, bufferSize) - packet []byte - remoteIP byte - err error - ) - - for { - packet, remoteIP, err = readNextPacket(n.iface, buf) - must(err) - must(n.w.WriteTo(remoteIP, 1, packet)) - } -} - -func (node *TmpNode) readFromConn() { - var ( - buf = make([]byte, bufferSize) - packet []byte - err error - ) - - for { - _, _, packet, err = node.r.Read(buf) - must(err) - // We assume that we're only receiving packets from one source. - - _, err = node.iface.Write(packet) - if err != nil { - log.Printf("Got error: %v", err) - } - //must(err) - } -} -*/