This commit is contained in:
jdl 2025-01-22 14:09:43 +01:00
parent a0b5058544
commit d79902a83b
25 changed files with 1168 additions and 213 deletions

View File

@ -1,9 +1,5 @@
# vppn: Virtual Potentially Private Network # vppn: Virtual Potentially Private Network
## TODO
* Add `-force-init` argument to `node` main?
## Hub Server Configuration ## Hub Server Configuration
``` ```
@ -33,7 +29,6 @@ WorkingDirectory=/home/user/
ExecStart=/home/user/hub -listen <addr>:https -root-dir=/home/user ExecStart=/home/user/hub -listen <addr>:https -root-dir=/home/user
Restart=always Restart=always
RestartSec=8 RestartSec=8
TimeoutStopSec=24
[Install] [Install]
WantedBy=default.target WantedBy=default.target
@ -70,7 +65,6 @@ WorkingDirectory=/home/user/
ExecStart=/home/user/vppn -name vppn -hub-address https://my.hub -api-key 1234567890 ExecStart=/home/user/vppn -name vppn -hub-address https://my.hub -api-key 1234567890
Restart=always Restart=always
RestartSec=8 RestartSec=8
TimeoutStopSec=24
[Install] [Install]
WantedBy=default.target WantedBy=default.target

17
node/README.md Normal file
View File

@ -0,0 +1,17 @@
# VPPN Peer Code
## Refactoring for Testability
* [ ] connWriter
* [ ] Separate send/relay calls
* [x] mcWriter
* [x] ifWriter
* [ ] ifReader
* [ ] connReader
* [ ] mcReader
* [ ] hubPoller
* [ ] supervisor
## Updates
* [ ] Send timing info w/ syn/ack packets

View File

@ -1,50 +1,3 @@
package node package node
import (
"io"
"log"
"net"
"net/netip"
"sync"
)
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type connWriter struct {
lock sync.Mutex
conn *net.UDPConn
}
func newConnWriter(conn *net.UDPConn) *connWriter {
return &connWriter{conn: conn}
}
func (w *connWriter) WriteTo(packet []byte, addr netip.AddrPort) {
// Even though a conn is safe for concurrent use, it turns out that a mutex
// in Go is more fair when there's contention. Without this lock, control
// packets may fail to be sent in a timely manner causing timeouts.
w.lock.Lock()
if _, err := w.conn.WriteToUDPAddrPort(packet, addr); err != nil {
log.Printf("Failed to write to UDP port: %v", err)
}
w.lock.Unlock()
}
// ----------------------------------------------------------------------------
type ifWriter struct {
lock sync.Mutex
iface io.ReadWriteCloser
}
func newIFWriter(iface io.ReadWriteCloser) *ifWriter {
return &ifWriter{iface: iface}
}
func (w *ifWriter) Write(packet []byte) {
w.lock.Lock()
if _, err := w.iface.Write(packet); err != nil {
log.Fatalf("Failed to write to interface: %v", err)
}
w.lock.Unlock()
}

146
node/connwriter.go Normal file
View File

@ -0,0 +1,146 @@
package node
import (
"log"
"net/netip"
"sync"
"sync/atomic"
"time"
)
// ----------------------------------------------------------------------------
type peerRoute struct {
IP byte
Up bool // True if data can be sent on the route.
Relay bool // True if the peer is a relay.
Direct bool // True if this is a direct connection.
PubSignKey []byte
ControlCipher *controlCipher
DataCipher *dataCipher
RemoteAddr netip.AddrPort // Remote address if directly connected.
}
// ----------------------------------------------------------------------------
type udpAddrPortWriter interface {
WriteToUDPAddrPort([]byte, netip.AddrPort) (int, error)
}
type marshaller interface {
Marshal([]byte) []byte
}
// ----------------------------------------------------------------------------
type connWriter struct {
localIP byte
conn udpAddrPortWriter
// For sending control packets.
cBuf1 []byte
cBuf2 []byte
// For sending data packets.
dBuf1 []byte
dBuf2 []byte
counters [256]uint64
// Lock around for sending on UDP Conn.
wLock sync.Mutex
}
func newConnWriter(conn udpAddrPortWriter, localIP byte) *connWriter {
w := &connWriter{
localIP: localIP,
conn: conn,
cBuf1: make([]byte, bufferSize),
cBuf2: make([]byte, bufferSize),
dBuf1: make([]byte, bufferSize),
dBuf2: make([]byte, bufferSize),
}
for i := range w.counters {
w.counters[i] = uint64(time.Now().Unix()<<30 + 1)
}
return w
}
// Not safe for concurrent use. Should only be called by supervisor.
func (w *connWriter) SendControlPacket(pkt marshaller, route *peerRoute) {
buf := pkt.Marshal(w.cBuf1)
h := header{
StreamID: controlStreamID,
Counter: atomic.AddUint64(&w.counters[route.IP], 1),
SourceIP: w.localIP,
DestIP: route.IP,
}
buf = route.ControlCipher.Encrypt(h, buf, w.cBuf2)
w.writeTo(buf, route.RemoteAddr)
}
func (w *connWriter) RelayControlPacket(pkt marshaller, route, relay *peerRoute) {
buf := pkt.Marshal(w.cBuf1)
h := header{
StreamID: controlStreamID,
Counter: atomic.AddUint64(&w.counters[route.IP], 1),
SourceIP: w.localIP,
DestIP: route.IP,
}
buf = route.ControlCipher.Encrypt(h, buf, w.cBuf2)
w.relayPacket(buf, w.cBuf1, route, relay)
}
// Not safe for concurrent use. Should only be called by ifReader.
func (w *connWriter) SendDataPacket(pkt []byte, route, relay *peerRoute) {
h := header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&w.counters[route.IP], 1),
SourceIP: w.localIP,
DestIP: route.IP,
}
enc := route.DataCipher.Encrypt(h, pkt, w.dBuf1)
if route.Direct {
w.writeTo(enc, route.RemoteAddr)
return
}
w.relayPacket(enc, w.dBuf2, route, relay)
}
// TODO: RelayDataPacket
// Safe for concurrent use. Should only be called by connReader.
//
// This function will send pkt to the peer directly. This is used when a peer
// is acting as a relay and is forwarding already encrypted data for another
// peer.
func (w *connWriter) SendEncryptedDataPacket(pkt []byte, route *peerRoute) {
w.writeTo(pkt, route.RemoteAddr)
}
func (w *connWriter) relayPacket(data, buf []byte, route, relay *peerRoute) {
if relay == nil || !relay.Up {
return
}
h := header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&w.counters[relay.IP], 1),
SourceIP: w.localIP,
DestIP: route.IP,
}
enc := relay.DataCipher.Encrypt(h, data, buf)
w.writeTo(enc, relay.RemoteAddr)
}
func (w *connWriter) writeTo(packet []byte, addr netip.AddrPort) {
w.wLock.Lock()
if _, err := w.conn.WriteToUDPAddrPort(packet, addr); err != nil {
log.Printf("Failed to write to UDP port: %v", err)
}
w.wLock.Unlock()
}

291
node/connwriter_test.go Normal file
View File

@ -0,0 +1,291 @@
package node
import (
"bytes"
"net/netip"
"testing"
)
// ----------------------------------------------------------------------------
type testUDPPacket struct {
Addr netip.AddrPort
Data []byte
}
type testUDPAddrPortWriter struct {
written []testUDPPacket
}
func (w *testUDPAddrPortWriter) WriteToUDPAddrPort(b []byte, addr netip.AddrPort) (int, error) {
w.written = append(w.written, testUDPPacket{
Addr: addr,
Data: bytes.Clone(b),
})
return len(b), nil
}
func (w *testUDPAddrPortWriter) Written() []testUDPPacket {
out := w.written
w.written = []testUDPPacket{}
return out
}
// ----------------------------------------------------------------------------
type testPacket string
func (p testPacket) Marshal(b []byte) []byte {
b = b[:len(p)]
copy(b, []byte(p))
return b
}
// ----------------------------------------------------------------------------
func testConnWriter_getTestRoutes() (local, remote, relayLocal, relayRemote *peerRoute) {
localKeys := generateKeys()
remoteKeys := generateKeys()
local = &peerRoute{
IP: 2,
Up: true,
Relay: false,
PubSignKey: remoteKeys.PubSignKey,
ControlCipher: newControlCipher(localKeys.PrivKey, remoteKeys.PubKey),
DataCipher: newDataCipher(),
RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 2}), 100),
}
remote = &peerRoute{
IP: 1,
Up: true,
Relay: false,
PubSignKey: localKeys.PubSignKey,
ControlCipher: newControlCipher(remoteKeys.PrivKey, localKeys.PubKey),
DataCipher: local.DataCipher,
RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 1}), 100),
}
rLocalKeys := generateKeys()
rRemoteKeys := generateKeys()
relayLocal = &peerRoute{
IP: 3,
Up: true,
Relay: true,
Direct: true,
PubSignKey: rRemoteKeys.PubSignKey,
ControlCipher: newControlCipher(rLocalKeys.PrivKey, rRemoteKeys.PubKey),
DataCipher: newDataCipher(),
RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 3}), 100),
}
relayRemote = &peerRoute{
IP: 1,
Up: true,
Relay: false,
Direct: true,
PubSignKey: rLocalKeys.PubSignKey,
ControlCipher: newControlCipher(rRemoteKeys.PrivKey, rLocalKeys.PubKey),
DataCipher: relayLocal.DataCipher,
RemoteAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{1, 1, 1, 1}), 100),
}
return
}
// ----------------------------------------------------------------------------
// Testing if we can send a control packet directly to the remote route.
func TestConnWriter_SendControlPacket_direct(t *testing.T) {
route, rRoute, _, _ := testConnWriter_getTestRoutes()
route.Direct = true
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := testPacket("hello world!")
w.SendControlPacket(in, route)
out := writer.Written()
if len(out) != 1 {
t.Fatal(out)
}
if out[0].Addr != route.RemoteAddr {
t.Fatal(out[0])
}
dec, ok := rRoute.ControlCipher.Decrypt(out[0].Data, make([]byte, 1024))
if !ok {
t.Fatal(ok)
}
if string(dec) != string(in) {
t.Fatal(dec)
}
}
// Testing if we can relay a packet via an intermediary.
func TestConnWriter_SendControlPacket_relay(t *testing.T) {
route, rRoute, relay, rRelay := testConnWriter_getTestRoutes()
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := testPacket("hello world!")
w.RelayControlPacket(in, route, relay)
out := writer.Written()
if len(out) != 1 {
t.Fatal(out)
}
if out[0].Addr != relay.RemoteAddr {
t.Fatal(out[0])
}
dec, ok := rRelay.DataCipher.Decrypt(out[0].Data, make([]byte, 1024))
if !ok {
t.Fatal(ok)
}
dec2, ok := rRoute.ControlCipher.Decrypt(dec, make([]byte, 1024))
if !ok {
t.Fatal(ok)
}
if string(dec2) != string(in) {
t.Fatal(dec2)
}
}
// Testing that a nil relay doesn't cause an issue.
func TestConnWriter_SendControlPacket_relay_relayNil(t *testing.T) {
route, rRoute, _, _ := testConnWriter_getTestRoutes()
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := testPacket("hello world!")
w.RelayControlPacket(in, route, nil)
out := writer.Written()
if len(out) != 0 {
t.Fatal(out)
}
}
// Testing that we don't send anything if the relay isn't up.
func TestConnWriter_SendControlPacket_relay_relayNotUp(t *testing.T) {
route, rRoute, relay, _ := testConnWriter_getTestRoutes()
relay.Up = false
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := testPacket("hello world!")
w.RelayControlPacket(in, route, relay)
out := writer.Written()
if len(out) != 0 {
t.Fatal(out)
}
}
// Testing that we can send a data packet directly to a remote route.
func TestConnWriter_SendDataPacket_direct(t *testing.T) {
route, rRoute, _, _ := testConnWriter_getTestRoutes()
route.Direct = true
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := []byte("hello world!")
w.SendDataPacket(in, route, nil)
out := writer.Written()
if len(out) != 1 {
t.Fatal(out)
}
if out[0].Addr != route.RemoteAddr {
t.Fatal(out[0])
}
dec, ok := rRoute.DataCipher.Decrypt(out[0].Data, make([]byte, 1024))
if !ok {
t.Fatal(ok)
}
if !bytes.Equal(dec, in) {
t.Fatal(dec)
}
}
// Testing that we can relay a data packet via a relay.
func TestConnWriter_SendDataPacket_relay(t *testing.T) {
route, rRoute, relay, rRelay := testConnWriter_getTestRoutes()
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := []byte("Hello world!")
w.SendDataPacket(in, route, relay)
out := writer.Written()
if len(out) != 1 {
t.Fatal(out)
}
if out[0].Addr != relay.RemoteAddr {
t.Fatal(out[0])
}
dec, ok := rRelay.DataCipher.Decrypt(out[0].Data, make([]byte, 1024))
if !ok {
t.Fatal(ok)
}
dec2, ok := rRoute.DataCipher.Decrypt(dec, make([]byte, 1024))
if !ok {
t.Fatal(ok)
}
if !bytes.Equal(dec2, in) {
t.Fatal(dec2)
}
}
// Testing that we don't attempt to relay if the relay is nil.
func TestConnWriter_SendDataPacket_relay_relayNil(t *testing.T) {
route, rRoute, _, _ := testConnWriter_getTestRoutes()
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := []byte("Hello world!")
w.SendDataPacket(in, route, nil)
out := writer.Written()
if len(out) != 0 {
t.Fatal(out)
}
}
// Testing that we don't attempt to relay if the relay isn't up.
func TestConnWriter_SendDataPacket_relay_relayNotUp(t *testing.T) {
route, rRoute, relay, _ := testConnWriter_getTestRoutes()
relay.Up = false
writer := &testUDPAddrPortWriter{}
w := newConnWriter(writer, rRoute.IP)
in := []byte("Hello world!")
w.SendDataPacket(in, route, relay)
out := writer.Written()
if len(out) != 0 {
t.Fatal(out)
}
}

30
node/crypto.go Normal file
View File

@ -0,0 +1,30 @@
package node
import (
"crypto/rand"
"log"
"golang.org/x/crypto/nacl/box"
"golang.org/x/crypto/nacl/sign"
)
type cryptoKeys struct {
PubKey []byte
PrivKey []byte
PubSignKey []byte
PrivSignKey []byte
}
func generateKeys() cryptoKeys {
pubKey, privKey, err := box.GenerateKey(rand.Reader)
if err != nil {
log.Fatalf("Failed to generate encryption keys: %v", err)
}
pubSignKey, privSignKey, err := sign.GenerateKey(rand.Reader)
if err != nil {
log.Fatalf("Failed to generate signing keys: %v", err)
}
return cryptoKeys{pubKey[:], privKey[:], pubSignKey[:], privSignKey[:]}
}

14
node/data-flow.dot Normal file
View File

@ -0,0 +1,14 @@
digraph d {
ifReader -> connWriter;
connReader -> ifWriter;
connReader -> connWriter;
connReader -> supervisor;
mcReader -> supervisor;
supervisor -> connWriter;
supervisor -> mcWriter;
hubPoller -> supervisor;
connWriter [shape="box"];
mcWriter [shape="box"];
ifWriter [shape="box"];
}

View File

@ -1,65 +1,8 @@
package node package node
import (
"sync/atomic"
)
func getRelayRoute() *peerRoute { func getRelayRoute() *peerRoute {
if ip := relayIP.Load(); ip != nil { if ip := relayIP.Load(); ip != nil {
return routingTable[*ip].Load() return routingTable[*ip].Load()
} }
return nil return nil
} }
func _sendControlPacket(pkt interface{ Marshal([]byte) []byte }, route peerRoute, buf1, buf2 []byte) {
buf := pkt.Marshal(buf2)
h := header{
StreamID: controlStreamID,
Counter: atomic.AddUint64(&sendCounters[route.IP], 1),
SourceIP: localIP,
DestIP: route.IP,
}
buf = route.ControlCipher.Encrypt(h, buf, buf1)
if route.Direct {
_conn.WriteTo(buf, route.RemoteAddr)
return
}
_relayPacket(route.IP, buf, buf2)
}
func _sendDataPacket(route *peerRoute, pkt, buf1, buf2 []byte) {
h := header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&sendCounters[route.IP], 1),
SourceIP: localIP,
DestIP: route.IP,
}
enc := route.DataCipher.Encrypt(h, pkt, buf1)
if route.Direct {
_conn.WriteTo(enc, route.RemoteAddr)
return
}
_relayPacket(route.IP, enc, buf2)
}
func _relayPacket(destIP byte, data, buf []byte) {
relayRoute := getRelayRoute()
if relayRoute == nil || !relayRoute.Up || !relayRoute.Relay {
return
}
h := header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&sendCounters[relayRoute.IP], 1),
SourceIP: localIP,
DestIP: destIP,
}
enc := relayRoute.DataCipher.Encrypt(h, data, buf)
_conn.WriteTo(enc, relayRoute.RemoteAddr)
}

View File

@ -5,7 +5,6 @@ import (
"net/netip" "net/netip"
"net/url" "net/url"
"sync/atomic" "sync/atomic"
"time"
) )
const ( const (
@ -17,21 +16,9 @@ const (
signOverhead = 64 signOverhead = 64
) )
var ( var multicastAddr = net.UDPAddrFromAddrPort(netip.AddrPortFrom(
multicastIP = netip.AddrFrom4([4]byte{224, 0, 0, 157}) netip.AddrFrom4([4]byte{224, 0, 0, 157}),
multicastAddr = net.UDPAddrFromAddrPort(netip.AddrPortFrom(multicastIP, 4560)) 4560))
)
type peerRoute struct {
IP byte
Up bool // True if data can be sent on the route.
Relay bool // True if the peer is a relay.
Direct bool // True if this is a direct connection.
PubSignKey []byte
ControlCipher *controlCipher
DataCipher *dataCipher
RemoteAddr netip.AddrPort // Remote address if directly connected.
}
var ( var (
hubURL *url.URL hubURL *url.URL
@ -45,20 +32,7 @@ var (
privKey []byte privKey []byte
privSignKey []byte privSignKey []byte
// Shared interface for writing. // TODO: Doesn't need to be global.
_iface *ifWriter
// Shared connection for writing.
_conn *connWriter
// Counters for sending to each peer.
sendCounters [256]uint64 = func() (out [256]uint64) {
for i := range out {
out[i] = uint64(time.Now().Unix()<<30 + 1)
}
return
}()
// Duplicate checkers for incoming packets. // Duplicate checkers for incoming packets.
dupChecks [256]*dupCheck = func() (out [256]*dupCheck) { dupChecks [256]*dupCheck = func() (out [256]*dupCheck) {
for i := range out { for i := range out {
@ -67,9 +41,11 @@ var (
return return
}() }()
// TODO: Doesn't need to be global .
// Messages for the supervisor. // Messages for the supervisor.
messages = make(chan any, 1024) messages = make(chan any, 1024)
// TODO: Doesn't need to be global .
// Global routing table. // Global routing table.
routingTable [256]*atomic.Pointer[peerRoute] = func() (out [256]*atomic.Pointer[peerRoute]) { routingTable [256]*atomic.Pointer[peerRoute] = func() (out [256]*atomic.Pointer[peerRoute]) {
for i := range out { for i := range out {
@ -82,5 +58,6 @@ var (
// Managed by the relayManager. // Managed by the relayManager.
relayIP = &atomic.Pointer[byte]{} relayIP = &atomic.Pointer[byte]{}
// TODO: Only used by supervisor: can make local there.
publicAddrs = newPubAddrStore() publicAddrs = newPubAddrStore()
) )

102
node/ifreader.go Normal file
View File

@ -0,0 +1,102 @@
package node
import (
"io"
"log"
"sync/atomic"
)
type ifReader struct {
iface io.Reader
routes [256]*atomic.Pointer[peerRoute]
relay *atomic.Pointer[peerRoute]
sendDataPacket func(pkt []byte, route *peerRoute)
relayDataPacket func(pkt []byte, route, relay *peerRoute)
}
func newIFReader(
iface io.Reader,
routes [256]*atomic.Pointer[peerRoute],
relay *atomic.Pointer[peerRoute],
sendDataPacket func(pkt []byte, route *peerRoute),
relayDackPacket func(pkt []byte, route, relay *peerRoute),
) *ifReader {
return &ifReader{
iface: iface,
routes: routes,
relay: relay,
sendDataPacket: sendDataPacket,
}
}
func (r *ifReader) Run() {
var (
packet = make([]byte, bufferSize)
remoteIP byte
ok bool
)
for {
packet = r.readNextPacket(packet)
if remoteIP, ok = r.parsePacket(packet); ok {
r.sendPacket(packet, remoteIP)
}
}
}
func (r *ifReader) sendPacket(pkt []byte, remoteIP byte) {
route := r.routes[remoteIP].Load()
if !route.Up {
log.Printf("Route not connected: %d", remoteIP)
return
}
// Direct path => early return.
if route.Direct {
r.sendDataPacket(pkt, route)
return
}
if relay := r.relay.Load(); relay != nil {
r.relayDataPacket(pkt, route, relay)
}
}
// Get next packet, returning packet, and destination ip.
func (r *ifReader) readNextPacket(buf []byte) []byte {
n, err := r.iface.Read(buf[:cap(buf)])
if err != nil {
log.Fatalf("Failed to read from interface: %v", err)
}
return buf[:n]
}
func (r *ifReader) parsePacket(buf []byte) (byte, bool) {
n := len(buf)
if n == 0 {
return 0, false
}
version := buf[0] >> 4
switch version {
case 4:
if n < 20 {
log.Printf("Short IPv4 packet: %d", len(buf))
return 0, false
}
return buf[19], true
case 6:
if len(buf) < 40 {
log.Printf("Short IPv6 packet: %d", len(buf))
return 0, false
}
return buf[39], true
default:
log.Printf("Invalid IP packet version: %v", version)
return 0, false
}
}

117
node/ifreader_test.go Normal file
View File

@ -0,0 +1,117 @@
package node
import (
"bytes"
"net"
"sync/atomic"
"testing"
)
// Test that we parse IPv4 packets correctly.
func TestIFReader_parsePacket_ipv4(t *testing.T) {
r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil)
pkt := make([]byte, 1234)
pkt[0] = 4 << 4
pkt[19] = 128
if ip, ok := r.parsePacket(pkt); !ok || ip != 128 {
t.Fatal(ip, ok)
}
}
// Test that we parse IPv6 packets correctly.
func TestIFReader_parsePacket_ipv6(t *testing.T) {
r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil)
pkt := make([]byte, 1234)
pkt[0] = 6 << 4
pkt[39] = 42
if ip, ok := r.parsePacket(pkt); !ok || ip != 42 {
t.Fatal(ip, ok)
}
}
// Test that empty packets work as expected.
func TestIFReader_parsePacket_emptyPacket(t *testing.T) {
r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil)
pkt := make([]byte, 0)
if ip, ok := r.parsePacket(pkt); ok {
t.Fatal(ip, ok)
}
}
// Test that invalid IP versions fail.
func TestIFReader_parsePacket_invalidIPVersion(t *testing.T) {
r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil)
for i := byte(1); i < 16; i++ {
if i == 4 || i == 6 {
continue
}
pkt := make([]byte, 1234)
pkt[0] = i << 4
if ip, ok := r.parsePacket(pkt); ok {
t.Fatal(i, ip, ok)
}
}
}
// Test that short IPv4 packets fail.
func TestIFReader_parsePacket_shortIPv4(t *testing.T) {
r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil)
pkt := make([]byte, 19)
pkt[0] = 4 << 4
if ip, ok := r.parsePacket(pkt); ok {
t.Fatal(ip, ok)
}
}
// Test that short IPv6 packets fail.
func TestIFReader_parsePacket_shortIPv6(t *testing.T) {
r := newIFReader(nil, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil)
pkt := make([]byte, 39)
pkt[0] = 6 << 4
if ip, ok := r.parsePacket(pkt); ok {
t.Fatal(ip, ok)
}
}
// Test that we can read a packet.
func TestIFReader_readNextpacket(t *testing.T) {
in, out := net.Pipe()
r := newIFReader(out, [256]*atomic.Pointer[peerRoute]{}, nil, nil, nil)
defer in.Close()
defer out.Close()
go in.Write([]byte("hello world!"))
pkt := r.readNextPacket(make([]byte, bufferSize))
if !bytes.Equal(pkt, []byte("hello world!")) {
t.Fatalf("%s", pkt)
}
}
// Testing that we can send a packet directly.
func TestIFReader_sendPacket_direct(t *testing.T) {
// TODO
}
// Testing that we don't send a packet if route isn't up.
func TestIFReader_sendPacket_directNotUp(t *testing.T) {
// TODO
}
// Testing that we can send a packet via a relay.
func TestIFReader_sendPacket_relayed(t *testing.T) {
// TODO
}
// Testing that we don't try to send on a nil relay IP.

5
node/ifwriter.go Normal file
View File

@ -0,0 +1,5 @@
package node
import "io"
type ifWriter io.Writer

View File

@ -20,7 +20,7 @@ func TestLocalDiscoveryPacketSigning(t *testing.T) {
privSignKey = privSigKey[:] privSignKey = privSigKey[:]
route := routingTable[localIP].Load() route := routingTable[localIP].Load()
route.IP = byte(localIP) route.IP = byte(localIP)
route.PubSignKey = pubSignKey[0:32] route.PubSignKey = pubSignKey[:]
routingTable[localIP].Store(route) routingTable[localIP].Store(route)
out := buildLocalDiscoveryPacket(buf1, buf2) out := buildLocalDiscoveryPacket(buf1, buf2)

View File

@ -143,10 +143,6 @@ func main() {
conn.SetReadBuffer(1024 * 1024 * 8) conn.SetReadBuffer(1024 * 1024 * 8)
conn.SetWriteBuffer(1024 * 1024 * 8) conn.SetWriteBuffer(1024 * 1024 * 8)
// Intialize globals.
_iface = newIFWriter(iface)
_conn = newConnWriter(conn)
localIP = config.PeerIP localIP = config.PeerIP
ip, ok := netip.AddrFromSlice(config.PublicIP) ip, ok := netip.AddrFromSlice(config.PublicIP)
@ -169,17 +165,19 @@ func main() {
} }
}() }()
go startPeerSuper() sender := newPacketSender(conn)
go startPeerSuper(routingTable, messages, sender)
go newHubPoller().Run() go newHubPoller().Run()
go readFromConn(conn) go readFromConn(conn, iface, sender)
readFromIFace(iface) readFromIFace(iface, sender)
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func readFromConn(conn *net.UDPConn) { func readFromConn(conn *net.UDPConn, iface io.ReadWriteCloser, sender dataPacketSender) {
defer panicHandler() defer panicHandler()
@ -213,7 +211,7 @@ func readFromConn(conn *net.UDPConn) {
handleControlPacket(remoteAddr, h, data, decBuf) handleControlPacket(remoteAddr, h, data, decBuf)
case dataStreamID: case dataStreamID:
handleDataPacket(h, data, decBuf) handleDataPacket(h, data, decBuf, iface, sender)
default: default:
log.Printf("Unknown stream ID: %d", h.StreamID) log.Printf("Unknown stream ID: %d", h.StreamID)
@ -263,7 +261,7 @@ func handleControlPacket(addr netip.AddrPort, h header, data, decBuf []byte) {
} }
func handleDataPacket(h header, data []byte, decBuf []byte) { func handleDataPacket(h header, data []byte, decBuf []byte, iface ifWriter, sender dataPacketSender) {
route := routingTable[h.SourceIP].Load() route := routingTable[h.SourceIP].Load()
if !route.Up { if !route.Up {
log.Printf("Not connected (recv).") log.Printf("Not connected (recv).")
@ -282,7 +280,9 @@ func handleDataPacket(h header, data []byte, decBuf []byte) {
} }
if h.DestIP == localIP { if h.DestIP == localIP {
_iface.Write(dec) if _, err := iface.Write(dec); err != nil {
log.Fatalf("Failed to write to interface: %v", err)
}
return return
} }
@ -292,16 +292,14 @@ func handleDataPacket(h header, data []byte, decBuf []byte) {
return return
} }
_conn.WriteTo(dec, destRoute.RemoteAddr) sender.SendEncryptedDataPacket(dec, destRoute.RemoteAddr)
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func readFromIFace(iface io.ReadWriteCloser) { func readFromIFace(iface io.ReadWriteCloser, sender dataPacketSender) {
var ( var (
packet = make([]byte, bufferSize) packet = make([]byte, bufferSize)
buf1 = make([]byte, bufferSize)
buf2 = make([]byte, bufferSize)
remoteIP byte remoteIP byte
err error err error
) )
@ -318,6 +316,6 @@ func readFromIFace(iface io.ReadWriteCloser) {
continue continue
} }
_sendDataPacket(route, packet, buf1, buf2) sender.SendDataPacket(packet, *route)
} }
} }

37
node/main_test.go Normal file
View File

@ -0,0 +1,37 @@
package node
import (
"crypto/rand"
"log"
"golang.org/x/crypto/nacl/box"
"golang.org/x/crypto/nacl/sign"
)
type testPeer struct {
IP byte
PubKey []byte
PrivKey []byte
PubSignKey []byte
PrivSignKey []byte
}
func newTestPeer(ip byte) testPeer {
encPubKey, encPrivKey, err := box.GenerateKey(rand.Reader)
if err != nil {
log.Fatalf("Failed to generate encryption keys: %v", err)
}
signPubKey, signPrivKey, err := sign.GenerateKey(rand.Reader)
if err != nil {
log.Fatalf("Failed to generate signing keys: %v", err)
}
return testPeer{
IP: ip,
PubKey: encPubKey[:],
PrivKey: encPrivKey[:],
PubSignKey: signPubKey[:],
PrivSignKey: signPrivKey[:],
}
}

62
node/mcwriter.go Normal file
View File

@ -0,0 +1,62 @@
package node
import (
"log"
"net"
"golang.org/x/crypto/nacl/sign"
)
// ----------------------------------------------------------------------------
type udpWriter interface {
WriteToUDP([]byte, *net.UDPAddr) (int, error)
}
// ----------------------------------------------------------------------------
func createLocalDiscoveryPacket(localIP byte, signingKey []byte) []byte {
h := header{
SourceIP: localIP,
DestIP: 255,
}
buf := make([]byte, headerSize)
h.Marshal(buf)
out := make([]byte, headerSize+signOverhead)
return sign.Sign(out[:0], buf, (*[64]byte)(signingKey))
}
func headerFromLocalDiscoveryPacket(pkt []byte) (h header, ok bool) {
if len(pkt) != headerSize+signOverhead {
return
}
h.Parse(pkt[signOverhead:])
ok = true
return
}
func verifyLocalDiscoveryPacket(pkt, buf []byte, pubSignKey []byte) bool {
_, ok := sign.Open(buf[:0], pkt, (*[32]byte)(pubSignKey))
return ok
}
// ----------------------------------------------------------------------------
type mcWriter struct {
conn udpWriter
discoveryPacket []byte
}
func newMCWriter(conn udpWriter, localIP byte, signingKey []byte) *mcWriter {
return &mcWriter{
conn: conn,
discoveryPacket: createLocalDiscoveryPacket(localIP, signingKey),
}
}
func (w *mcWriter) SendLocalDiscovery() {
if _, err := w.conn.WriteToUDP(w.discoveryPacket, multicastAddr); err != nil {
log.Printf("Failed to write multicast UDP packet: %v", err)
}
}

102
node/mcwriter_test.go Normal file
View File

@ -0,0 +1,102 @@
package node
import (
"bytes"
"net"
"testing"
)
// ----------------------------------------------------------------------------
// Testing that we can create and verify a local discovery packet.
func TestVerifyLocalDiscoveryPacket_valid(t *testing.T) {
keys := generateKeys()
created := createLocalDiscoveryPacket(55, keys.PrivSignKey)
header, ok := headerFromLocalDiscoveryPacket(created)
if !ok {
t.Fatal(ok)
}
if header.SourceIP != 55 || header.DestIP != 255 {
t.Fatal(header)
}
if !verifyLocalDiscoveryPacket(created, make([]byte, 1024), keys.PubSignKey) {
t.Fatal("Not valid")
}
}
// Testing that we don't try to parse short packets.
func TestVerifyLocalDiscoveryPacket_tooShort(t *testing.T) {
keys := generateKeys()
created := createLocalDiscoveryPacket(55, keys.PrivSignKey)
_, ok := headerFromLocalDiscoveryPacket(created[:len(created)-1])
if ok {
t.Fatal(ok)
}
}
// Testing that modifying a packet makes it invalid.
func TestVerifyLocalDiscoveryPacket_invalid(t *testing.T) {
keys := generateKeys()
created := createLocalDiscoveryPacket(55, keys.PrivSignKey)
buf := make([]byte, 1024)
for i := range created {
modified := bytes.Clone(created)
modified[i]++
if verifyLocalDiscoveryPacket(modified, buf, keys.PubSignKey) {
t.Fatal("Verification should have failed.")
}
}
}
// ----------------------------------------------------------------------------
type testUDPWriter struct {
written [][]byte
}
func (w *testUDPWriter) WriteToUDP(b []byte, addr *net.UDPAddr) (int, error) {
w.written = append(w.written, bytes.Clone(b))
return len(b), nil
}
func (w *testUDPWriter) Written() [][]byte {
out := w.written
w.written = [][]byte{}
return out
}
// ----------------------------------------------------------------------------
// Testing that the mcWriter sends local discovery packets as expected.
func TestMCWriter_SendLocalDiscovery(t *testing.T) {
keys := generateKeys()
writer := &testUDPWriter{}
mcw := newMCWriter(writer, 42, keys.PrivSignKey)
mcw.SendLocalDiscovery()
out := writer.Written()
if len(out) != 1 {
t.Fatal(out)
}
pkt := out[0]
header, ok := headerFromLocalDiscoveryPacket(pkt)
if !ok {
t.Fatal(ok)
}
if header.SourceIP != 42 || header.DestIP != 255 {
t.Fatal(header)
}
if !verifyLocalDiscoveryPacket(pkt, make([]byte, 1024), keys.PubSignKey) {
t.Fatal("Verification should succeed.")
}
}

View File

@ -10,7 +10,8 @@ import (
type controlMsg[T any] struct { type controlMsg[T any] struct {
SrcIP byte SrcIP byte
SrcAddr netip.AddrPort SrcAddr netip.AddrPort
Packet T // TODO: RecvdAt int64 // Unixmilli.
Packet T
} }
func parseControlMsg(srcIP byte, srcAddr netip.AddrPort, buf []byte) (any, error) { func parseControlMsg(srcIP byte, srcAddr netip.AddrPort, buf []byte) (any, error) {
@ -55,5 +56,3 @@ type peerUpdateMsg struct {
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type pingTimerMsg struct{} type pingTimerMsg struct{}
// ----------------------------------------------------------------------------

View File

@ -21,7 +21,8 @@ const (
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type synPacket struct { type synPacket struct {
TraceID uint64 // TraceID to match response w/ request. TraceID uint64 // TraceID to match response w/ request.
// TODO: SentAt int64 // Unixmilli.
SharedKey [32]byte // Our shared key. SharedKey [32]byte // Our shared key.
Direct bool Direct bool
PossibleAddrs [8]netip.AddrPort // Possible public addresses of the sender. PossibleAddrs [8]netip.AddrPort // Possible public addresses of the sender.

View File

@ -1,41 +1 @@
package node package node
import (
"crypto/rand"
"net/netip"
"reflect"
"testing"
)
func TestPacketSyn(t *testing.T) {
in := synPacket{
TraceID: newTraceID(),
FromAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{4, 5, 6, 7}), 22),
}
rand.Read(in.SharedKey[:])
out, err := parseSynPacket(in.Marshal(make([]byte, bufferSize)))
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(in, out) {
t.Fatal("\n", in, "\n", out)
}
}
func TestPacketSynAck(t *testing.T) {
in := ackPacket{
TraceID: newTraceID(),
FromAddr: netip.AddrPortFrom(netip.AddrFrom4([4]byte{4, 5, 6, 7}), 22),
}
out, err := parseAckPacket(in.Marshal(make([]byte, bufferSize)))
if err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(in, out) {
t.Fatal("\n", in, "\n", out)
}
}

127
node/packetsender.go Normal file
View File

@ -0,0 +1,127 @@
package node
import (
"log"
"net"
"net/netip"
"sync"
"sync/atomic"
"time"
)
type controlPacketSender interface {
SendControlPacket(pkt marshaller, route peerRoute)
}
type dataPacketSender interface {
SendDataPacket(pkt []byte, route peerRoute)
SendEncryptedDataPacket(pkt []byte, addr netip.AddrPort)
}
// ----------------------------------------------------------------------------
type packetSender struct {
conn *net.UDPConn
// For sending control packets.
cLock sync.Mutex
cBuf1 []byte
cBuf2 []byte
// For sending data packets.
dBuf1 []byte
dBuf2 []byte
counters [256]uint64
// Lock around for sending on UDP Conn.
wLock sync.Mutex
}
func newPacketSender(conn *net.UDPConn) *packetSender {
ps := &packetSender{
conn: conn,
cBuf1: make([]byte, bufferSize),
cBuf2: make([]byte, bufferSize),
dBuf1: make([]byte, bufferSize),
dBuf2: make([]byte, bufferSize),
}
for i := range ps.counters {
ps.counters[i] = uint64(time.Now().Unix()<<30 + 1)
}
return ps
}
// Safe for concurrent use.
func (sender *packetSender) SendControlPacket(pkt marshaller, route peerRoute) {
sender.cLock.Lock()
defer sender.cLock.Unlock()
buf := pkt.Marshal(sender.cBuf1)
h := header{
StreamID: controlStreamID,
Counter: atomic.AddUint64(&sender.counters[route.IP], 1),
SourceIP: localIP,
DestIP: route.IP,
}
buf = route.ControlCipher.Encrypt(h, buf, sender.cBuf2)
if route.Direct {
sender.writeTo(buf, route.RemoteAddr)
return
}
sender.relayPacket(route.IP, buf, sender.cBuf1)
}
// Not safe for concurrent use.
func (sender *packetSender) SendDataPacket(pkt []byte, route peerRoute) {
h := header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&sender.counters[route.IP], 1),
SourceIP: localIP,
DestIP: route.IP,
}
enc := route.DataCipher.Encrypt(h, pkt, sender.dBuf1)
if route.Direct {
sender.writeTo(enc, route.RemoteAddr)
return
}
sender.relayPacket(route.IP, enc, sender.dBuf2)
}
func (sender *packetSender) SendEncryptedDataPacket(pkt []byte, addr netip.AddrPort) {
sender.writeTo(pkt, addr)
}
func (sender *packetSender) relayPacket(destIP byte, data, buf []byte) {
ip := relayIP.Load()
if ip == nil {
return
}
relayRoute := routingTable[*ip].Load()
if relayRoute == nil || !relayRoute.Up || !relayRoute.Relay {
return
}
h := header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&sender.counters[relayRoute.IP], 1),
SourceIP: localIP,
DestIP: destIP,
}
enc := relayRoute.DataCipher.Encrypt(h, data, buf)
sender.writeTo(enc, relayRoute.RemoteAddr)
}
func (sender *packetSender) writeTo(packet []byte, addr netip.AddrPort) {
sender.wLock.Lock()
if _, err := sender.conn.WriteToUDPAddrPort(packet, addr); err != nil {
log.Printf("Failed to write to UDP port: %v", err)
}
sender.wLock.Unlock()
}

View File

@ -6,6 +6,7 @@ import (
"time" "time"
) )
// TODO: Make part of main loop on ping timer
func relayManager() { func relayManager() {
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
updateRelayRoute() updateRelayRoute()

59
node/shared.go Normal file
View File

@ -0,0 +1,59 @@
package node
import (
"net/netip"
"sync/atomic"
)
type sharedState struct {
// Immutable:
HubAddress string
APIKey string
NetName string
LocalIP byte
LocalPub bool
LocalAddr netip.AddrPort
PrivKey []byte
PrivSignKey []byte
// Mutable:
Routes [256]*atomic.Pointer[peerRoute]
RelayIP *atomic.Pointer[byte]
// Messages for supervisor main loop.
Messages chan any
}
func newSharedState(
netName,
hubAddress,
apiKey string,
conf localConfig,
) (
ss sharedState,
) {
ss.HubAddress = hubAddress
ss.APIKey = apiKey
ss.NetName = netName
ss.LocalIP = conf.PeerIP
ip, ok := netip.AddrFromSlice(conf.PublicIP)
if ok {
ss.LocalPub = true
ss.LocalAddr = netip.AddrPortFrom(ip, conf.Port)
}
ss.PrivKey = conf.PrivKey
ss.PrivSignKey = conf.PrivSignKey
for i := range ss.Routes {
ss.Routes[i] = &atomic.Pointer[peerRoute]{}
ss.Routes[i].Store(&peerRoute{})
}
ss.RelayIP = &atomic.Pointer[byte]{}
ss.Messages = make(chan any, 1024)
return
}

16
node/shared_test.go Normal file
View File

@ -0,0 +1,16 @@
package node
import "vppn/m"
// TODO:
var sharedStateForTesting = func() sharedState {
ss := newSharedState(
"testNet",
"http://localhost:39499",
"123",
localConfig{
PeerConfig: m.PeerConfig{},
})
return ss
}

View File

@ -19,14 +19,17 @@ const (
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func startPeerSuper() { func startPeerSuper(
routingTable [256]*atomic.Pointer[peerRoute],
messages chan any,
sender controlPacketSender,
) {
peers := [256]peerState{} peers := [256]peerState{}
for i := range peers { for i := range peers {
data := &peerStateData{ data := &peerStateData{
sender: sender,
published: routingTable[i], published: routingTable[i],
remoteIP: byte(i), remoteIP: byte(i),
buf1: make([]byte, bufferSize),
buf2: make([]byte, bufferSize),
limiter: ratelimiter.New(ratelimiter.Config{ limiter: ratelimiter.New(ratelimiter.Config{
FillPeriod: 20 * time.Millisecond, FillPeriod: 20 * time.Millisecond,
MaxWaitCount: 1, MaxWaitCount: 1,
@ -34,10 +37,10 @@ func startPeerSuper() {
} }
peers[i] = data.OnPeerUpdate(nil) peers[i] = data.OnPeerUpdate(nil)
} }
go runPeerSuper(peers) go runPeerSuper(peers, messages)
} }
func runPeerSuper(peers [256]peerState) { func runPeerSuper(peers [256]peerState, messages chan any) {
for raw := range messages { for raw := range messages {
switch msg := raw.(type) { switch msg := raw.(type) {
@ -84,6 +87,8 @@ type peerState interface {
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type peerStateData struct { type peerStateData struct {
sender controlPacketSender
// The purpose of this state machine is to manage this published data. // The purpose of this state machine is to manage this published data.
published *atomic.Pointer[peerRoute] published *atomic.Pointer[peerRoute]
staged peerRoute // Local copy of shared data. See publish(). staged peerRoute // Local copy of shared data. See publish().
@ -95,10 +100,6 @@ type peerStateData struct {
peer *m.Peer peer *m.Peer
remotePub bool remotePub bool
// Buffers for sending control packets.
buf1 []byte
buf2 []byte
// For logging. Set per-state. // For logging. Set per-state.
client bool client bool
@ -129,7 +130,7 @@ func (s *peerStateData) _sendControlPacket(pkt interface{ Marshal([]byte) []byte
s.logf("Not sending control packet: rate limited.") // Shouldn't happen. s.logf("Not sending control packet: rate limited.") // Shouldn't happen.
return return
} }
_sendControlPacket(pkt, route, s.buf1, s.buf2) s.sender.SendControlPacket(pkt, route)
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@ -175,8 +176,9 @@ func (s *peerStateData) OnPeerUpdate(peer *m.Peer) peerState {
s.peer = peer s.peer = peer
s.staged = peerRoute{ s.staged = peerRoute{
IP: s.remoteIP, IP: s.remoteIP,
PubSignKey: peer.PubSignKey, PubSignKey: peer.PubSignKey,
// TODO: privKey global.
ControlCipher: newControlCipher(privKey, peer.PubKey), ControlCipher: newControlCipher(privKey, peer.PubKey),
DataCipher: newDataCipher(), DataCipher: newDataCipher(),
} }
@ -192,6 +194,7 @@ func (s *peerStateData) OnPeerUpdate(peer *m.Peer) peerState {
} }
if s.remotePub == localPub { if s.remotePub == localPub {
// TODO: localIP is global
if localIP < s.remoteIP { if localIP < s.remoteIP {
return enterStateServer(s) return enterStateServer(s)
} }
@ -349,6 +352,7 @@ func (s *stateClient) OnAck(msg controlMsg[ackPacket]) {
} }
// Store possible public address if we're not a public node. // Store possible public address if we're not a public node.
// TODO: localPub is global, publicAddrs is global.
if !localPub && s.remotePub { if !localPub && s.remotePub {
publicAddrs.Store(msg.Packet.ToAddr) publicAddrs.Store(msg.Packet.ToAddr)
} }