diff --git a/README.md b/README.md index c6cc0e1..4567196 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,5 @@ # vppn: Virtual Potentially Private Network -## TODO - -* Add `-force-init` argument to `node` main? - ## Hub Server Configuration ``` @@ -33,7 +29,6 @@ WorkingDirectory=/home/user/ ExecStart=/home/user/hub -listen :https -root-dir=/home/user Restart=always RestartSec=8 -TimeoutStopSec=24 [Install] WantedBy=default.target @@ -70,7 +65,6 @@ WorkingDirectory=/home/user/ ExecStart=/home/user/vppn -name vppn -hub-address https://my.hub -api-key 1234567890 Restart=always RestartSec=8 -TimeoutStopSec=24 [Install] WantedBy=default.target diff --git a/node/README.md b/node/README.md new file mode 100644 index 0000000..30b77c4 --- /dev/null +++ b/node/README.md @@ -0,0 +1,17 @@ +# VPPN Peer Code + +## Refactoring for Testability + +* [ ] connWriter + * [ ] Separate send/relay calls +* [x] mcWriter +* [x] ifWriter +* [ ] ifReader +* [ ] connReader +* [ ] mcReader +* [ ] hubPoller +* [ ] supervisor + +## Updates + +* [ ] Send timing info w/ syn/ack packets diff --git a/node/conn.go b/node/conn.go index 2a1e762..e000557 100644 --- a/node/conn.go +++ b/node/conn.go @@ -1,50 +1,3 @@ package node -import ( - "io" - "log" - "net" - "net/netip" - "sync" -) - // ---------------------------------------------------------------------------- - -type connWriter struct { - lock sync.Mutex - conn *net.UDPConn -} - -func newConnWriter(conn *net.UDPConn) *connWriter { - return &connWriter{conn: conn} -} - -func (w *connWriter) WriteTo(packet []byte, addr netip.AddrPort) { - // Even though a conn is safe for concurrent use, it turns out that a mutex - // in Go is more fair when there's contention. Without this lock, control - // packets may fail to be sent in a timely manner causing timeouts. - w.lock.Lock() - if _, err := w.conn.WriteToUDPAddrPort(packet, addr); err != nil { - log.Printf("Failed to write to UDP port: %v", err) - } - w.lock.Unlock() -} - -// ---------------------------------------------------------------------------- - -type ifWriter struct { - lock sync.Mutex - iface io.ReadWriteCloser -} - -func newIFWriter(iface io.ReadWriteCloser) *ifWriter { - return &ifWriter{iface: iface} -} - -func (w *ifWriter) Write(packet []byte) { - w.lock.Lock() - if _, err := w.iface.Write(packet); err != nil { - log.Fatalf("Failed to write to interface: %v", err) - } - w.lock.Unlock() -} diff --git a/node/connwriter.go b/node/connwriter.go new file mode 100644 index 0000000..597b886 --- /dev/null +++ b/node/connwriter.go @@ -0,0 +1,146 @@ +package node + +import ( + "log" + "net/netip" + "sync" + "sync/atomic" + "time" +) + +// ---------------------------------------------------------------------------- + +type peerRoute struct { + IP byte + Up bool // True if data can be sent on the route. + Relay bool // True if the peer is a relay. + Direct bool // True if this is a direct connection. + PubSignKey []byte + ControlCipher *controlCipher + DataCipher *dataCipher + RemoteAddr netip.AddrPort // Remote address if directly connected. +} + +// ---------------------------------------------------------------------------- + +type udpAddrPortWriter interface { + WriteToUDPAddrPort([]byte, netip.AddrPort) (int, error) +} + +type marshaller interface { + Marshal([]byte) []byte +} + +// ---------------------------------------------------------------------------- + +type connWriter struct { + localIP byte + conn udpAddrPortWriter + + // For sending control packets. + cBuf1 []byte + cBuf2 []byte + + // For sending data packets. + dBuf1 []byte + dBuf2 []byte + + counters [256]uint64 + + // Lock around for sending on UDP Conn. + wLock sync.Mutex +} + +func newConnWriter(conn udpAddrPortWriter, localIP byte) *connWriter { + w := &connWriter{ + localIP: localIP, + conn: conn, + cBuf1: make([]byte, bufferSize), + cBuf2: make([]byte, bufferSize), + dBuf1: make([]byte, bufferSize), + dBuf2: make([]byte, bufferSize), + } + for i := range w.counters { + w.counters[i] = uint64(time.Now().Unix()<<30 + 1) + } + return w +} + +// Not safe for concurrent use. Should only be called by supervisor. +func (w *connWriter) SendControlPacket(pkt marshaller, route *peerRoute) { + buf := pkt.Marshal(w.cBuf1) + h := header{ + StreamID: controlStreamID, + Counter: atomic.AddUint64(&w.counters[route.IP], 1), + SourceIP: w.localIP, + DestIP: route.IP, + } + buf = route.ControlCipher.Encrypt(h, buf, w.cBuf2) + w.writeTo(buf, route.RemoteAddr) +} + +func (w *connWriter) RelayControlPacket(pkt marshaller, route, relay *peerRoute) { + buf := pkt.Marshal(w.cBuf1) + h := header{ + StreamID: controlStreamID, + Counter: atomic.AddUint64(&w.counters[route.IP], 1), + SourceIP: w.localIP, + DestIP: route.IP, + } + buf = route.ControlCipher.Encrypt(h, buf, w.cBuf2) + w.relayPacket(buf, w.cBuf1, route, relay) +} + +// Not safe for concurrent use. Should only be called by ifReader. +func (w *connWriter) SendDataPacket(pkt []byte, route, relay *peerRoute) { + h := header{ + StreamID: dataStreamID, + Counter: atomic.AddUint64(&w.counters[route.IP], 1), + SourceIP: w.localIP, + DestIP: route.IP, + } + + enc := route.DataCipher.Encrypt(h, pkt, w.dBuf1) + + if route.Direct { + w.writeTo(enc, route.RemoteAddr) + return + } + + w.relayPacket(enc, w.dBuf2, route, relay) +} + +// TODO: RelayDataPacket + +// Safe for concurrent use. Should only be called by connReader. +// +// This function will send pkt to the peer directly. This is used when a peer +// is acting as a relay and is forwarding already encrypted data for another +// peer. +func (w *connWriter) SendEncryptedDataPacket(pkt []byte, route *peerRoute) { + w.writeTo(pkt, route.RemoteAddr) +} + +func (w *connWriter) relayPacket(data, buf []byte, route, relay *peerRoute) { + if relay == nil || !relay.Up { + return + } + + h := header{ + StreamID: dataStreamID, + Counter: atomic.AddUint64(&w.counters[relay.IP], 1), + SourceIP: w.localIP, + DestIP: route.IP, + } + + enc := relay.DataCipher.Encrypt(h, data, buf) + w.writeTo(enc, relay.RemoteAddr) +} + +func (w *connWriter) writeTo(packet []byte, addr netip.AddrPort) { + w.wLock.Lock() + if _, err := w.conn.WriteToUDPAddrPort(packet, addr); err != nil { + log.Printf("Failed to write to UDP port: %v", err) + } + w.wLock.Unlock() +} diff --git a/node/connwriter_test.go b/node/connwriter_test.go new file mode 100644 index 0000000..595d5b7 --- /dev/null +++ b/node/connwriter_test.go @@ -0,0 +1,291 @@ +package node + +import ( + "bytes" + "net/netip" + "testing" +) + +// ---------------------------------------------------------------------------- + +type testUDPPacket struct { + Addr netip.AddrPort + Data []byte +} + +type testUDPAddrPortWriter struct { + written []testUDPPacket +} + +func (w *testUDPAddrPortWriter) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) { + w.written = append(w.written, testUDPPacket{ + Addr: addr, + Data: bytes.Clone(b), + }) + return len(b), nil +} + +func (w *testUDPAddrPortWriter) Written() []testUDPPacket { + out := w.written + w.written = []testUDPPacket{} + return out +} + +// ---------------------------------------------------------------------------- + +type testPacket string + +func (p testPacket) Marshal(b []byte) []byte { + b = b[:len(p)] + copy(b, []byte(p)) + return b +} + +// ---------------------------------------------------------------------------- + +func testConnWriter_getTestRoutes() (local, remote, relayLocal, relayRemote *peerRoute) { + localKeys := generateKeys() + remoteKeys := generateKeys() + + local = &peerRoute{ + IP: 2, + Up: true, + Relay: false, + PubSignKey: remoteKeys.PubSignKey, + ControlCipher: newControlCipher(localKeys.PrivKey, remoteKeys.PubKey), + DataCipher: newDataCipher(), + RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 2}), 100), + } + + remote = &peerRoute{ + IP: 1, + Up: true, + Relay: false, + PubSignKey: localKeys.PubSignKey, + ControlCipher: newControlCipher(remoteKeys.PrivKey, localKeys.PubKey), + DataCipher: local.DataCipher, + RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 1}), 100), + } + + rLocalKeys := generateKeys() + rRemoteKeys := generateKeys() + + relayLocal = &peerRoute{ + IP: 3, + Up: true, + Relay: true, + Direct: true, + PubSignKey: rRemoteKeys.PubSignKey, + ControlCipher: newControlCipher(rLocalKeys.PrivKey, rRemoteKeys.PubKey), + DataCipher: newDataCipher(), + RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 3}), 100), + } + + relayRemote = &peerRoute{ + IP: 1, + Up: true, + Relay: false, + Direct: true, + PubSignKey: rLocalKeys.PubSignKey, + ControlCipher: newControlCipher(rRemoteKeys.PrivKey, rLocalKeys.PubKey), + DataCipher: relayLocal.DataCipher, + RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 1}), 100), + } + + return +} + +// ---------------------------------------------------------------------------- + +// Testing if we can send a control packet directly to the remote route. +func TestConnWriter_SendControlPacket_direct(t *testing.T) { + route, rRoute, _, _ := testConnWriter_getTestRoutes() + route.Direct = true + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + in := testPacket("hello world!") + + w.SendControlPacket(in, route) + out := writer.Written() + if len(out) != 1 { + t.Fatal(out) + } + + if out[0].Addr != route.RemoteAddr { + t.Fatal(out[0]) + } + + dec, ok := rRoute.ControlCipher.Decrypt(out[0].Data, make([]byte, 1024)) + if !ok { + t.Fatal(ok) + } + if string(dec) != string(in) { + t.Fatal(dec) + } +} + +// Testing if we can relay a packet via an intermediary. +func TestConnWriter_SendControlPacket_relay(t *testing.T) { + route, rRoute, relay, rRelay := testConnWriter_getTestRoutes() + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + in := testPacket("hello world!") + + w.RelayControlPacket(in, route, relay) + + out := writer.Written() + if len(out) != 1 { + t.Fatal(out) + } + + if out[0].Addr != relay.RemoteAddr { + t.Fatal(out[0]) + } + + dec, ok := rRelay.DataCipher.Decrypt(out[0].Data, make([]byte, 1024)) + if !ok { + t.Fatal(ok) + } + + dec2, ok := rRoute.ControlCipher.Decrypt(dec, make([]byte, 1024)) + if !ok { + t.Fatal(ok) + } + + if string(dec2) != string(in) { + t.Fatal(dec2) + } +} + +// Testing that a nil relay doesn't cause an issue. +func TestConnWriter_SendControlPacket_relay_relayNil(t *testing.T) { + route, rRoute, _, _ := testConnWriter_getTestRoutes() + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + in := testPacket("hello world!") + + w.RelayControlPacket(in, route, nil) + + out := writer.Written() + if len(out) != 0 { + t.Fatal(out) + } + +} + +// Testing that we don't send anything if the relay isn't up. +func TestConnWriter_SendControlPacket_relay_relayNotUp(t *testing.T) { + route, rRoute, relay, _ := testConnWriter_getTestRoutes() + relay.Up = false + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + in := testPacket("hello world!") + + w.RelayControlPacket(in, route, relay) + + out := writer.Written() + if len(out) != 0 { + t.Fatal(out) + } +} + +// Testing that we can send a data packet directly to a remote route. +func TestConnWriter_SendDataPacket_direct(t *testing.T) { + route, rRoute, _, _ := testConnWriter_getTestRoutes() + route.Direct = true + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + + in := []byte("hello world!") + w.SendDataPacket(in, route, nil) + + out := writer.Written() + if len(out) != 1 { + t.Fatal(out) + } + + if out[0].Addr != route.RemoteAddr { + t.Fatal(out[0]) + } + + dec, ok := rRoute.DataCipher.Decrypt(out[0].Data, make([]byte, 1024)) + if !ok { + t.Fatal(ok) + } + + if !bytes.Equal(dec, in) { + t.Fatal(dec) + } +} + +// Testing that we can relay a data packet via a relay. +func TestConnWriter_SendDataPacket_relay(t *testing.T) { + route, rRoute, relay, rRelay := testConnWriter_getTestRoutes() + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + in := []byte("Hello world!") + + w.SendDataPacket(in, route, relay) + + out := writer.Written() + if len(out) != 1 { + t.Fatal(out) + } + + if out[0].Addr != relay.RemoteAddr { + t.Fatal(out[0]) + } + + dec, ok := rRelay.DataCipher.Decrypt(out[0].Data, make([]byte, 1024)) + if !ok { + t.Fatal(ok) + } + + dec2, ok := rRoute.DataCipher.Decrypt(dec, make([]byte, 1024)) + if !ok { + t.Fatal(ok) + } + + if !bytes.Equal(dec2, in) { + t.Fatal(dec2) + } +} + +// Testing that we don't attempt to relay if the relay is nil. +func TestConnWriter_SendDataPacket_relay_relayNil(t *testing.T) { + route, rRoute, _, _ := testConnWriter_getTestRoutes() + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + in := []byte("Hello world!") + + w.SendDataPacket(in, route, nil) + + out := writer.Written() + if len(out) != 0 { + t.Fatal(out) + } +} + +// Testing that we don't attempt to relay if the relay isn't up. +func TestConnWriter_SendDataPacket_relay_relayNotUp(t *testing.T) { + route, rRoute, relay, _ := testConnWriter_getTestRoutes() + relay.Up = false + + writer := &testUDPAddrPortWriter{} + w := newConnWriter(writer, rRoute.IP) + in := []byte("Hello world!") + + w.SendDataPacket(in, route, relay) + + out := writer.Written() + if len(out) != 0 { + t.Fatal(out) + } +} diff --git a/node/crypto.go b/node/crypto.go new file mode 100644 index 0000000..c24aaad --- /dev/null +++ b/node/crypto.go @@ -0,0 +1,30 @@ +package node + +import ( + "crypto/rand" + "log" + + "golang.org/x/crypto/nacl/box" + "golang.org/x/crypto/nacl/sign" +) + +type cryptoKeys struct { + PubKey []byte + PrivKey []byte + PubSignKey []byte + PrivSignKey []byte +} + +func generateKeys() cryptoKeys { + pubKey, privKey, err := box.GenerateKey(rand.Reader) + if err != nil { + log.Fatalf("Failed to generate encryption keys: %v", err) + } + + pubSignKey, privSignKey, err := sign.GenerateKey(rand.Reader) + if err != nil { + log.Fatalf("Failed to generate signing keys: %v", err) + } + + return cryptoKeys{pubKey[:], privKey[:], pubSignKey[:], privSignKey[:]} +} diff --git a/node/data-flow.dot b/node/data-flow.dot new file mode 100644 index 0000000..45b6f05 --- /dev/null +++ b/node/data-flow.dot @@ -0,0 +1,14 @@ +digraph d { + ifReader -> connWriter; + connReader -> ifWriter; + connReader -> connWriter; + connReader -> supervisor; + mcReader -> supervisor; + supervisor -> connWriter; + supervisor -> mcWriter; + hubPoller -> supervisor; + + connWriter [shape="box"]; + mcWriter [shape="box"]; + ifWriter [shape="box"]; +} \ No newline at end of file diff --git a/node/globalfuncs.go b/node/globalfuncs.go index f32ec0b..2d13f57 100644 --- a/node/globalfuncs.go +++ b/node/globalfuncs.go @@ -1,65 +1,8 @@ package node -import ( - "sync/atomic" -) - func getRelayRoute() *peerRoute { if ip := relayIP.Load(); ip != nil { return routingTable[*ip].Load() } return nil } - -func _sendControlPacket(pkt interface{ Marshal([]byte) []byte }, route peerRoute, buf1, buf2 []byte) { - buf := pkt.Marshal(buf2) - h := header{ - StreamID: controlStreamID, - Counter: atomic.AddUint64(&sendCounters[route.IP], 1), - SourceIP: localIP, - DestIP: route.IP, - } - buf = route.ControlCipher.Encrypt(h, buf, buf1) - - if route.Direct { - _conn.WriteTo(buf, route.RemoteAddr) - return - } - - _relayPacket(route.IP, buf, buf2) -} - -func _sendDataPacket(route *peerRoute, pkt, buf1, buf2 []byte) { - h := header{ - StreamID: dataStreamID, - Counter: atomic.AddUint64(&sendCounters[route.IP], 1), - SourceIP: localIP, - DestIP: route.IP, - } - - enc := route.DataCipher.Encrypt(h, pkt, buf1) - - if route.Direct { - _conn.WriteTo(enc, route.RemoteAddr) - return - } - - _relayPacket(route.IP, enc, buf2) -} - -func _relayPacket(destIP byte, data, buf []byte) { - relayRoute := getRelayRoute() - if relayRoute == nil || !relayRoute.Up || !relayRoute.Relay { - return - } - - h := header{ - StreamID: dataStreamID, - Counter: atomic.AddUint64(&sendCounters[relayRoute.IP], 1), - SourceIP: localIP, - DestIP: destIP, - } - - enc := relayRoute.DataCipher.Encrypt(h, data, buf) - _conn.WriteTo(enc, relayRoute.RemoteAddr) -} diff --git a/node/globals.go b/node/globals.go index b72acc4..8538c4a 100644 --- a/node/globals.go +++ b/node/globals.go @@ -5,7 +5,6 @@ import ( "net/netip" "net/url" "sync/atomic" - "time" ) const ( @@ -17,21 +16,9 @@ const ( signOverhead = 64 ) -var ( - multicastIP = netip.AddrFrom4([4]byte{224, 0, 0, 157}) - multicastAddr = net.UDPAddrFromAddrPort(netip.AddrPortFrom(multicastIP, 4560)) -) - -type peerRoute struct { - IP byte - Up bool // True if data can be sent on the route. - Relay bool // True if the peer is a relay. - Direct bool // True if this is a direct connection. - PubSignKey []byte - ControlCipher *controlCipher - DataCipher *dataCipher - RemoteAddr netip.AddrPort // Remote address if directly connected. -} +var multicastAddr = net.UDPAddrFromAddrPort(netip.AddrPortFrom( + netip.AddrFrom4([4]byte{224, 0, 0, 157}), + 4560)) var ( hubURL *url.URL @@ -45,20 +32,7 @@ var ( privKey []byte privSignKey []byte - // Shared interface for writing. - _iface *ifWriter - - // Shared connection for writing. - _conn *connWriter - - // Counters for sending to each peer. - sendCounters [256]uint64 = func() (out [256]uint64) { - for i := range out { - out[i] = uint64(time.Now().Unix()<<30 + 1) - } - return - }() - + // TODO: Doesn't need to be global. // Duplicate checkers for incoming packets. dupChecks [256]*dupCheck = func() (out [256]*dupCheck) { for i := range out { @@ -67,9 +41,11 @@ var ( return }() + // TODO: Doesn't need to be global . // Messages for the supervisor. messages = make(chan any, 1024) + // TODO: Doesn't need to be global . // Global routing table. routingTable [256]*atomic.Pointer[peerRoute] = func() (out [256]*atomic.Pointer[peerRoute]) { for i := range out { @@ -82,5 +58,6 @@ var ( // Managed by the relayManager. relayIP = &atomic.Pointer[byte]{} + // TODO: Only used by supervisor: can make local there. publicAddrs = newPubAddrStore() ) diff --git a/node/ifreader.go b/node/ifreader.go new file mode 100644 index 0000000..a0e7a54 --- /dev/null +++ b/node/ifreader.go @@ -0,0 +1,102 @@ +package node + +import ( + "io" + "log" + "sync/atomic" +) + +type ifReader struct { + iface io.Reader + routes [256]*atomic.Pointer[peerRoute] + relay *atomic.Pointer[peerRoute] + sendDataPacket func(pkt []byte, route *peerRoute) + relayDataPacket func(pkt []byte, route, relay *peerRoute) +} + +func newIFReader( + iface io.Reader, + routes [256]*atomic.Pointer[peerRoute], + relay *atomic.Pointer[peerRoute], + sendDataPacket func(pkt []byte, route *peerRoute), + relayDackPacket func(pkt []byte, route, relay *peerRoute), +) *ifReader { + return &ifReader{ + iface: iface, + routes: routes, + relay: relay, + sendDataPacket: sendDataPacket, + } +} + +func (r *ifReader) Run() { + var ( + packet = make([]byte, bufferSize) + remoteIP byte + ok bool + ) + + for { + packet = r.readNextPacket(packet) + if remoteIP, ok = r.parsePacket(packet); ok { + r.sendPacket(packet, remoteIP) + } + } +} + +func (r *ifReader) sendPacket(pkt []byte, remoteIP byte) { + route := r.routes[remoteIP].Load() + if !route.Up { + log.Printf("Route not connected: %d", remoteIP) + return + } + + // Direct path => early return. + if route.Direct { + r.sendDataPacket(pkt, route) + return + } + + if relay := r.relay.Load(); relay != nil { + r.relayDataPacket(pkt, route, relay) + } +} + +// Get next packet, returning packet, and destination ip. +func (r *ifReader) readNextPacket(buf []byte) []byte { + n, err := r.iface.Read(buf[:cap(buf)]) + if err != nil { + log.Fatalf("Failed to read from interface: %v", err) + } + + return buf[:n] +} + +func (r *ifReader) parsePacket(buf []byte) (byte, bool) { + n := len(buf) + if n == 0 { + return 0, false + } + + version := buf[0] >> 4 + + switch version { + case 4: + if n < 20 { + log.Printf("Short IPv4 packet: %d", len(buf)) + return 0, false + } + return buf[19], true + + case 6: + if len(buf) < 40 { + log.Printf("Short IPv6 packet: %d", len(buf)) + return 0, false + } + return buf[39], true + + default: + log.Printf("Invalid IP packet version: %v", version) + return 0, false + } +} diff --git a/node/ifreader_test.go b/node/ifreader_test.go new file mode 100644 index 0000000..8f173f4 --- /dev/null +++ b/node/ifreader_test.go @@ -0,0 +1,117 @@ +package node + +import ( + "bytes" + "net" + "sync/atomic" + "testing" +) + +// Test that we parse IPv4 packets correctly. +func TestIFReader_parsePacket_ipv4(t *testing.T) { + r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil) + + pkt := make([]byte, 1234) + pkt[0] = 4 << 4 + pkt[19] = 128 + + if ip, ok := r.parsePacket(pkt); !ok || ip != 128 { + t.Fatal(ip, ok) + } +} + +// Test that we parse IPv6 packets correctly. +func TestIFReader_parsePacket_ipv6(t *testing.T) { + r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil) + + pkt := make([]byte, 1234) + pkt[0] = 6 << 4 + pkt[39] = 42 + + if ip, ok := r.parsePacket(pkt); !ok || ip != 42 { + t.Fatal(ip, ok) + } +} + +// Test that empty packets work as expected. +func TestIFReader_parsePacket_emptyPacket(t *testing.T) { + r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil) + + pkt := make([]byte, 0) + if ip, ok := r.parsePacket(pkt); ok { + t.Fatal(ip, ok) + } +} + +// Test that invalid IP versions fail. +func TestIFReader_parsePacket_invalidIPVersion(t *testing.T) { + r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil) + + for i := byte(1); i < 16; i++ { + if i == 4 || i == 6 { + continue + } + pkt := make([]byte, 1234) + pkt[0] = i << 4 + + if ip, ok := r.parsePacket(pkt); ok { + t.Fatal(i, ip, ok) + } + } +} + +// Test that short IPv4 packets fail. +func TestIFReader_parsePacket_shortIPv4(t *testing.T) { + r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil) + + pkt := make([]byte, 19) + pkt[0] = 4 << 4 + + if ip, ok := r.parsePacket(pkt); ok { + t.Fatal(ip, ok) + } +} + +// Test that short IPv6 packets fail. +func TestIFReader_parsePacket_shortIPv6(t *testing.T) { + r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil) + + pkt := make([]byte, 39) + pkt[0] = 6 << 4 + + if ip, ok := r.parsePacket(pkt); ok { + t.Fatal(ip, ok) + } +} + +// Test that we can read a packet. +func TestIFReader_readNextpacket(t *testing.T) { + in, out := net.Pipe() + r := newIFReader(out, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil) + defer in.Close() + defer out.Close() + + go in.Write([]byte("hello world!")) + + pkt := r.readNextPacket(make([]byte, bufferSize)) + if !bytes.Equal(pkt, []byte("hello world!")) { + t.Fatalf("%s", pkt) + } +} + +// Testing that we can send a packet directly. +func TestIFReader_sendPacket_direct(t *testing.T) { + // TODO +} + +// Testing that we don't send a packet if route isn't up. +func TestIFReader_sendPacket_directNotUp(t *testing.T) { + // TODO +} + +// Testing that we can send a packet via a relay. +func TestIFReader_sendPacket_relayed(t *testing.T) { + // TODO +} + +// Testing that we don't try to send on a nil relay IP. diff --git a/node/ifwriter.go b/node/ifwriter.go new file mode 100644 index 0000000..adb74e3 --- /dev/null +++ b/node/ifwriter.go @@ -0,0 +1,5 @@ +package node + +import "io" + +type ifWriter io.Writer diff --git a/node/localdiscovery_test.go b/node/localdiscovery_test.go index 7f4eaa3..b00b29d 100644 --- a/node/localdiscovery_test.go +++ b/node/localdiscovery_test.go @@ -20,7 +20,7 @@ func TestLocalDiscoveryPacketSigning(t *testing.T) { privSignKey = privSigKey[:] route := routingTable[localIP].Load() route.IP = byte(localIP) - route.PubSignKey = pubSignKey[0:32] + route.PubSignKey = pubSignKey[:] routingTable[localIP].Store(route) out := buildLocalDiscoveryPacket(buf1, buf2) diff --git a/node/main.go b/node/main.go index 4e59cf7..8e53cb4 100644 --- a/node/main.go +++ b/node/main.go @@ -143,10 +143,6 @@ func main() { conn.SetReadBuffer(1024 * 1024 * 8) conn.SetWriteBuffer(1024 * 1024 * 8) - // Intialize globals. - _iface = newIFWriter(iface) - _conn = newConnWriter(conn) - localIP = config.PeerIP ip, ok := netip.AddrFromSlice(config.PublicIP) @@ -169,17 +165,19 @@ func main() { } }() - go startPeerSuper() + sender := newPacketSender(conn) + + go startPeerSuper(routingTable, messages, sender) go newHubPoller().Run() - go readFromConn(conn) + go readFromConn(conn, iface, sender) - readFromIFace(iface) + readFromIFace(iface, sender) } // ---------------------------------------------------------------------------- -func readFromConn(conn *net.UDPConn) { +func readFromConn(conn *net.UDPConn, iface io.ReadWriteCloser, sender dataPacketSender) { defer panicHandler() @@ -213,7 +211,7 @@ func readFromConn(conn *net.UDPConn) { handleControlPacket(remoteAddr, h, data, decBuf) case dataStreamID: - handleDataPacket(h, data, decBuf) + handleDataPacket(h, data, decBuf, iface, sender) default: log.Printf("Unknown stream ID: %d", h.StreamID) @@ -263,7 +261,7 @@ func handleControlPacket(addr netip.AddrPort, h header, data, decBuf []byte) { } -func handleDataPacket(h header, data []byte, decBuf []byte) { +func handleDataPacket(h header, data []byte, decBuf []byte, iface ifWriter, sender dataPacketSender) { route := routingTable[h.SourceIP].Load() if !route.Up { log.Printf("Not connected (recv).") @@ -282,7 +280,9 @@ func handleDataPacket(h header, data []byte, decBuf []byte) { } if h.DestIP == localIP { - _iface.Write(dec) + if _, err := iface.Write(dec); err != nil { + log.Fatalf("Failed to write to interface: %v", err) + } return } @@ -292,16 +292,14 @@ func handleDataPacket(h header, data []byte, decBuf []byte) { return } - _conn.WriteTo(dec, destRoute.RemoteAddr) + sender.SendEncryptedDataPacket(dec, destRoute.RemoteAddr) } // ---------------------------------------------------------------------------- -func readFromIFace(iface io.ReadWriteCloser) { +func readFromIFace(iface io.ReadWriteCloser, sender dataPacketSender) { var ( packet = make([]byte, bufferSize) - buf1 = make([]byte, bufferSize) - buf2 = make([]byte, bufferSize) remoteIP byte err error ) @@ -318,6 +316,6 @@ func readFromIFace(iface io.ReadWriteCloser) { continue } - _sendDataPacket(route, packet, buf1, buf2) + sender.SendDataPacket(packet, *route) } } diff --git a/node/main_test.go b/node/main_test.go new file mode 100644 index 0000000..bf077a2 --- /dev/null +++ b/node/main_test.go @@ -0,0 +1,37 @@ +package node + +import ( + "crypto/rand" + "log" + + "golang.org/x/crypto/nacl/box" + "golang.org/x/crypto/nacl/sign" +) + +type testPeer struct { + IP byte + PubKey []byte + PrivKey []byte + PubSignKey []byte + PrivSignKey []byte +} + +func newTestPeer(ip byte) testPeer { + encPubKey, encPrivKey, err := box.GenerateKey(rand.Reader) + if err != nil { + log.Fatalf("Failed to generate encryption keys: %v", err) + } + + signPubKey, signPrivKey, err := sign.GenerateKey(rand.Reader) + if err != nil { + log.Fatalf("Failed to generate signing keys: %v", err) + } + + return testPeer{ + IP: ip, + PubKey: encPubKey[:], + PrivKey: encPrivKey[:], + PubSignKey: signPubKey[:], + PrivSignKey: signPrivKey[:], + } +} diff --git a/node/mcwriter.go b/node/mcwriter.go new file mode 100644 index 0000000..99e5b58 --- /dev/null +++ b/node/mcwriter.go @@ -0,0 +1,62 @@ +package node + +import ( + "log" + "net" + + "golang.org/x/crypto/nacl/sign" +) + +// ---------------------------------------------------------------------------- + +type udpWriter interface { + WriteToUDP([]byte, *net.UDPAddr) (int, error) +} + +// ---------------------------------------------------------------------------- + +func createLocalDiscoveryPacket(localIP byte, signingKey []byte) []byte { + h := header{ + SourceIP: localIP, + DestIP: 255, + } + buf := make([]byte, headerSize) + h.Marshal(buf) + out := make([]byte, headerSize+signOverhead) + return sign.Sign(out[:0], buf, (*[64]byte)(signingKey)) +} + +func headerFromLocalDiscoveryPacket(pkt []byte) (h header, ok bool) { + if len(pkt) != headerSize+signOverhead { + return + } + + h.Parse(pkt[signOverhead:]) + ok = true + return +} + +func verifyLocalDiscoveryPacket(pkt, buf []byte, pubSignKey []byte) bool { + _, ok := sign.Open(buf[:0], pkt, (*[32]byte)(pubSignKey)) + return ok +} + +// ---------------------------------------------------------------------------- + +type mcWriter struct { + conn udpWriter + discoveryPacket []byte +} + +func newMCWriter(conn udpWriter, localIP byte, signingKey []byte) *mcWriter { + return &mcWriter{ + conn: conn, + discoveryPacket: createLocalDiscoveryPacket(localIP, signingKey), + } +} + +func (w *mcWriter) SendLocalDiscovery() { + if _, err := w.conn.WriteToUDP(w.discoveryPacket, multicastAddr); err != nil { + log.Printf("Failed to write multicast UDP packet: %v", err) + } +} diff --git a/node/mcwriter_test.go b/node/mcwriter_test.go new file mode 100644 index 0000000..d182239 --- /dev/null +++ b/node/mcwriter_test.go @@ -0,0 +1,102 @@ +package node + +import ( + "bytes" + "net" + "testing" +) + +// ---------------------------------------------------------------------------- + +// Testing that we can create and verify a local discovery packet. +func TestVerifyLocalDiscoveryPacket_valid(t *testing.T) { + keys := generateKeys() + + created := createLocalDiscoveryPacket(55, keys.PrivSignKey) + + header, ok := headerFromLocalDiscoveryPacket(created) + if !ok { + t.Fatal(ok) + } + if header.SourceIP != 55 || header.DestIP != 255 { + t.Fatal(header) + } + + if !verifyLocalDiscoveryPacket(created, make([]byte, 1024), keys.PubSignKey) { + t.Fatal("Not valid") + } +} + +// Testing that we don't try to parse short packets. +func TestVerifyLocalDiscoveryPacket_tooShort(t *testing.T) { + keys := generateKeys() + + created := createLocalDiscoveryPacket(55, keys.PrivSignKey) + + _, ok := headerFromLocalDiscoveryPacket(created[:len(created)-1]) + if ok { + t.Fatal(ok) + } +} + +// Testing that modifying a packet makes it invalid. +func TestVerifyLocalDiscoveryPacket_invalid(t *testing.T) { + keys := generateKeys() + + created := createLocalDiscoveryPacket(55, keys.PrivSignKey) + buf := make([]byte, 1024) + for i := range created { + modified := bytes.Clone(created) + modified[i]++ + if verifyLocalDiscoveryPacket(modified, buf, keys.PubSignKey) { + t.Fatal("Verification should have failed.") + } + } +} + +// ---------------------------------------------------------------------------- + +type testUDPWriter struct { + written [][]byte +} + +func (w *testUDPWriter) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) { + w.written = append(w.written, bytes.Clone(b)) + return len(b), nil +} + +func (w *testUDPWriter) Written() [][]byte { + out := w.written + w.written = [][]byte{} + return out +} + +// ---------------------------------------------------------------------------- + +// Testing that the mcWriter sends local discovery packets as expected. +func TestMCWriter_SendLocalDiscovery(t *testing.T) { + keys := generateKeys() + writer := &testUDPWriter{} + + mcw := newMCWriter(writer, 42, keys.PrivSignKey) + mcw.SendLocalDiscovery() + + out := writer.Written() + if len(out) != 1 { + t.Fatal(out) + } + + pkt := out[0] + + header, ok := headerFromLocalDiscoveryPacket(pkt) + if !ok { + t.Fatal(ok) + } + if header.SourceIP != 42 || header.DestIP != 255 { + t.Fatal(header) + } + + if !verifyLocalDiscoveryPacket(pkt, make([]byte, 1024), keys.PubSignKey) { + t.Fatal("Verification should succeed.") + } +} diff --git a/node/messages.go b/node/messages.go index 76d86d4..64ca5fe 100644 --- a/node/messages.go +++ b/node/messages.go @@ -10,7 +10,8 @@ import ( type controlMsg[T any] struct { SrcIP byte SrcAddr netip.AddrPort - Packet T + // TODO: RecvdAt int64 // Unixmilli. + Packet T } func parseControlMsg(srcIP byte, srcAddr netip.AddrPort, buf []byte) (any, error) { @@ -55,5 +56,3 @@ type peerUpdateMsg struct { // ---------------------------------------------------------------------------- type pingTimerMsg struct{} - -// ---------------------------------------------------------------------------- diff --git a/node/packets.go b/node/packets.go index 14d7377..f3aa523 100644 --- a/node/packets.go +++ b/node/packets.go @@ -21,7 +21,8 @@ const ( // ---------------------------------------------------------------------------- type synPacket struct { - TraceID uint64 // TraceID to match response w/ request. + TraceID uint64 // TraceID to match response w/ request. + // TODO: SentAt int64 // Unixmilli. SharedKey [32]byte // Our shared key. Direct bool PossibleAddrs [8]netip.AddrPort // Possible public addresses of the sender. diff --git a/node/packets_test.go b/node/packets_test.go index 254bcc7..2b4023a 100644 --- a/node/packets_test.go +++ b/node/packets_test.go @@ -1,41 +1 @@ package node - -import ( - "crypto/rand" - "net/netip" - "reflect" - "testing" -) - -func TestPacketSyn(t *testing.T) { - in := synPacket{ - TraceID: newTraceID(), - FromAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{4, 5, 6, 7}), 22), - } - rand.Read(in.SharedKey[:]) - - out, err := parseSynPacket(in.Marshal(make([]byte, bufferSize))) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(in, out) { - t.Fatal("\n", in, "\n", out) - } -} - -func TestPacketSynAck(t *testing.T) { - in := ackPacket{ - TraceID: newTraceID(), - FromAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{4, 5, 6, 7}), 22), - } - - out, err := parseAckPacket(in.Marshal(make([]byte, bufferSize))) - if err != nil { - t.Fatal(err) - } - - if !reflect.DeepEqual(in, out) { - t.Fatal("\n", in, "\n", out) - } -} diff --git a/node/packetsender.go b/node/packetsender.go new file mode 100644 index 0000000..07e083a --- /dev/null +++ b/node/packetsender.go @@ -0,0 +1,127 @@ +package node + +import ( + "log" + "net" + "net/netip" + "sync" + "sync/atomic" + "time" +) + +type controlPacketSender interface { + SendControlPacket(pkt marshaller, route peerRoute) +} + +type dataPacketSender interface { + SendDataPacket(pkt []byte, route peerRoute) + SendEncryptedDataPacket(pkt []byte, addr netip.AddrPort) +} + +// ---------------------------------------------------------------------------- + +type packetSender struct { + conn *net.UDPConn + + // For sending control packets. + cLock sync.Mutex + cBuf1 []byte + cBuf2 []byte + + // For sending data packets. + dBuf1 []byte + dBuf2 []byte + + counters [256]uint64 + + // Lock around for sending on UDP Conn. + wLock sync.Mutex +} + +func newPacketSender(conn *net.UDPConn) *packetSender { + ps := &packetSender{ + conn: conn, + cBuf1: make([]byte, bufferSize), + cBuf2: make([]byte, bufferSize), + dBuf1: make([]byte, bufferSize), + dBuf2: make([]byte, bufferSize), + } + for i := range ps.counters { + ps.counters[i] = uint64(time.Now().Unix()<<30 + 1) + } + return ps +} + +// Safe for concurrent use. +func (sender *packetSender) SendControlPacket(pkt marshaller, route peerRoute) { + sender.cLock.Lock() + defer sender.cLock.Unlock() + + buf := pkt.Marshal(sender.cBuf1) + h := header{ + StreamID: controlStreamID, + Counter: atomic.AddUint64(&sender.counters[route.IP], 1), + SourceIP: localIP, + DestIP: route.IP, + } + buf = route.ControlCipher.Encrypt(h, buf, sender.cBuf2) + + if route.Direct { + sender.writeTo(buf, route.RemoteAddr) + return + } + + sender.relayPacket(route.IP, buf, sender.cBuf1) +} + +// Not safe for concurrent use. +func (sender *packetSender) SendDataPacket(pkt []byte, route peerRoute) { + h := header{ + StreamID: dataStreamID, + Counter: atomic.AddUint64(&sender.counters[route.IP], 1), + SourceIP: localIP, + DestIP: route.IP, + } + + enc := route.DataCipher.Encrypt(h, pkt, sender.dBuf1) + + if route.Direct { + sender.writeTo(enc, route.RemoteAddr) + return + } + + sender.relayPacket(route.IP, enc, sender.dBuf2) +} + +func (sender *packetSender) SendEncryptedDataPacket(pkt []byte, addr netip.AddrPort) { + sender.writeTo(pkt, addr) +} + +func (sender *packetSender) relayPacket(destIP byte, data, buf []byte) { + ip := relayIP.Load() + if ip == nil { + return + } + relayRoute := routingTable[*ip].Load() + if relayRoute == nil || !relayRoute.Up || !relayRoute.Relay { + return + } + + h := header{ + StreamID: dataStreamID, + Counter: atomic.AddUint64(&sender.counters[relayRoute.IP], 1), + SourceIP: localIP, + DestIP: destIP, + } + + enc := relayRoute.DataCipher.Encrypt(h, data, buf) + sender.writeTo(enc, relayRoute.RemoteAddr) +} + +func (sender *packetSender) writeTo(packet []byte, addr netip.AddrPort) { + sender.wLock.Lock() + if _, err := sender.conn.WriteToUDPAddrPort(packet, addr); err != nil { + log.Printf("Failed to write to UDP port: %v", err) + } + sender.wLock.Unlock() +} diff --git a/node/relaymanager.go b/node/relaymanager.go index 5c44ea8..a333ce1 100644 --- a/node/relaymanager.go +++ b/node/relaymanager.go @@ -6,6 +6,7 @@ import ( "time" ) +// TODO: Make part of main loop on ping timer func relayManager() { time.Sleep(2 * time.Second) updateRelayRoute() diff --git a/node/shared.go b/node/shared.go new file mode 100644 index 0000000..dbdb6ee --- /dev/null +++ b/node/shared.go @@ -0,0 +1,59 @@ +package node + +import ( + "net/netip" + "sync/atomic" +) + +type sharedState struct { + // Immutable: + HubAddress string + APIKey string + NetName string + LocalIP byte + LocalPub bool + LocalAddr netip.AddrPort + PrivKey []byte + PrivSignKey []byte + + // Mutable: + Routes [256]*atomic.Pointer[peerRoute] + RelayIP *atomic.Pointer[byte] + + // Messages for supervisor main loop. + Messages chan any +} + +func newSharedState( + netName, + hubAddress, + apiKey string, + conf localConfig, +) ( + ss sharedState, +) { + ss.HubAddress = hubAddress + + ss.APIKey = apiKey + ss.NetName = netName + ss.LocalIP = conf.PeerIP + + ip, ok := netip.AddrFromSlice(conf.PublicIP) + if ok { + ss.LocalPub = true + ss.LocalAddr = netip.AddrPortFrom(ip, conf.Port) + } + + ss.PrivKey = conf.PrivKey + ss.PrivSignKey = conf.PrivSignKey + + for i := range ss.Routes { + ss.Routes[i] = &atomic.Pointer[peerRoute]{} + ss.Routes[i].Store(&peerRoute{}) + } + + ss.RelayIP = &atomic.Pointer[byte]{} + + ss.Messages = make(chan any, 1024) + return +} diff --git a/node/shared_test.go b/node/shared_test.go new file mode 100644 index 0000000..4009e7d --- /dev/null +++ b/node/shared_test.go @@ -0,0 +1,16 @@ +package node + +import "vppn/m" + +// TODO: +var sharedStateForTesting = func() sharedState { + ss := newSharedState( + "testNet", + "http://localhost:39499", + "123", + localConfig{ + PeerConfig: m.PeerConfig{}, + }) + + return ss +} diff --git a/node/supervisor.go b/node/supervisor.go index 6b5e96a..726d47f 100644 --- a/node/supervisor.go +++ b/node/supervisor.go @@ -19,14 +19,17 @@ const ( // ---------------------------------------------------------------------------- -func startPeerSuper() { +func startPeerSuper( + routingTable [256]*atomic.Pointer[peerRoute], + messages chan any, + sender controlPacketSender, +) { peers := [256]peerState{} for i := range peers { data := &peerStateData{ + sender: sender, published: routingTable[i], remoteIP: byte(i), - buf1: make([]byte, bufferSize), - buf2: make([]byte, bufferSize), limiter: ratelimiter.New(ratelimiter.Config{ FillPeriod: 20 * time.Millisecond, MaxWaitCount: 1, @@ -34,10 +37,10 @@ func startPeerSuper() { } peers[i] = data.OnPeerUpdate(nil) } - go runPeerSuper(peers) + go runPeerSuper(peers, messages) } -func runPeerSuper(peers [256]peerState) { +func runPeerSuper(peers [256]peerState, messages chan any) { for raw := range messages { switch msg := raw.(type) { @@ -84,6 +87,8 @@ type peerState interface { // ---------------------------------------------------------------------------- type peerStateData struct { + sender controlPacketSender + // The purpose of this state machine is to manage this published data. published *atomic.Pointer[peerRoute] staged peerRoute // Local copy of shared data. See publish(). @@ -95,10 +100,6 @@ type peerStateData struct { peer *m.Peer remotePub bool - // Buffers for sending control packets. - buf1 []byte - buf2 []byte - // For logging. Set per-state. client bool @@ -129,7 +130,7 @@ func (s *peerStateData) _sendControlPacket(pkt interface{ Marshal([]byte) []byte s.logf("Not sending control packet: rate limited.") // Shouldn't happen. return } - _sendControlPacket(pkt, route, s.buf1, s.buf2) + s.sender.SendControlPacket(pkt, route) } // ---------------------------------------------------------------------------- @@ -175,8 +176,9 @@ func (s *peerStateData) OnPeerUpdate(peer *m.Peer) peerState { s.peer = peer s.staged = peerRoute{ - IP: s.remoteIP, - PubSignKey: peer.PubSignKey, + IP: s.remoteIP, + PubSignKey: peer.PubSignKey, + // TODO: privKey global. ControlCipher: newControlCipher(privKey, peer.PubKey), DataCipher: newDataCipher(), } @@ -192,6 +194,7 @@ func (s *peerStateData) OnPeerUpdate(peer *m.Peer) peerState { } if s.remotePub == localPub { + // TODO: localIP is global if localIP < s.remoteIP { return enterStateServer(s) } @@ -349,6 +352,7 @@ func (s *stateClient) OnAck(msg controlMsg[ackPacket]) { } // Store possible public address if we're not a public node. + // TODO: localPub is global, publicAddrs is global. if !localPub && s.remotePub { publicAddrs.Store(msg.Packet.ToAddr) }