This commit is contained in:
jdl 2024-12-13 21:30:06 +01:00
parent d9233af4c9
commit 6fcd8637a3
16 changed files with 191 additions and 364 deletions

View File

@ -2,15 +2,16 @@
## Roadmap ## Roadmap
* Peer: router: create process for managing the routing table * Use default port 456
* Peer: router: track mediators, enable / disable ... * Remove signing key from hub
* Hub: track peer last-seen timestamp (?) * Peer: UDP hole-punching
* Peer: local peer discovery - part of RoutingProcessor * Peer: local peer discovery - part of RoutingProcessor
* Peer: update hub w/ latest port on startup * Peer: update hub w/ latest port on startup
## Learnings ## Learnings
* Encryption / decryption is 20x faster than signing/opening. * Encryption / decryption is 20x faster than signing/opening.
* Allowing out-of order packets is massively important for throughput with TCP
## Principles ## Principles
@ -111,11 +112,3 @@ TimeoutStopSec=24
[Install] [Install]
WantedBy=default.target WantedBy=default.target
``` ```
---
## Sub-packets
If we make our MTU large, like 8k, our computations become more efficient.
We can send packets with header like:

View File

@ -14,12 +14,9 @@ const (
type connData struct { type connData struct {
// Shared data. // Shared data.
routes [256]*atomic.Pointer[route] routes [MAX_IP]*atomic.Pointer[route]
route *atomic.Pointer[route] route *atomic.Pointer[route]
// Local data.
mediatorIP byte
// Peer data. // Peer data.
server bool // Never changes. server bool // Never changes.
peerIP byte // Never changes. peerIP byte // Never changes.
@ -29,12 +26,13 @@ type connData struct {
encSharedKey []byte // From hub + private key. encSharedKey []byte // From hub + private key.
publicAddr netip.AddrPort // From hub. publicAddr netip.AddrPort // From hub.
// Connection establishment and maintenance.
pingTimer *time.Timer pingTimer *time.Timer
timeoutTimer *time.Timer timeoutTimer *time.Timer
// Routing data. // Routing data.
addr netip.AddrPort addr netip.AddrPort
viaIP byte useMediator bool
up bool up bool
// For sending. // For sending.
@ -47,10 +45,9 @@ func (d *connData) Route() *route {
PeerIP: d.peerIP, PeerIP: d.peerIP,
Up: d.up, Up: d.up,
Mediator: d.peer.Mediator, Mediator: d.peer.Mediator,
SignPubKey: d.peer.SignPubKey,
EncSharedKey: d.encSharedKey, EncSharedKey: d.encSharedKey,
Addr: d.addr, Addr: d.addr,
ViaIP: d.viaIP, useMediator: d.useMediator,
} }
} }
@ -61,14 +58,14 @@ func (d *connData) HandlePeerUpdate(state connState, update peerUpdate) connStat
if d.peer == nil && update.Peer == nil { if d.peer == nil && update.Peer == nil {
return state return state
} }
return newConnStateFromPeer(update, d) return newStateFromPeerUpdate(update, d)
} }
func (d *connData) HandleSendPing() { func (d *connData) HandleSendPing() {
route := d.route.Load() route := d.route.Load()
req := Ping{SentAt: time.Now().UnixMilli()} req := Ping{SentAt: time.Now().UnixMilli()}
req.Marshal(d.buf[:PING_SIZE]) req.Marshal(d.buf[:PING_SIZE])
d.sender.send(PACKET_TYPE_PING, d.buf[:PING_SIZE], route) d.sender.send(PACKET_TYPE_PING, d.buf[:PING_SIZE], route, nil)
d.pingTimer.Reset(pingInterval) d.pingTimer.Reset(pingInterval)
} }
@ -79,5 +76,5 @@ func (d *connData) sendPong(w wrapper[Ping]) {
RecvdAt: time.Now().UnixMilli(), RecvdAt: time.Now().UnixMilli(),
} }
pong.Marshal(d.buf[:PONG_SIZE]) pong.Marshal(d.buf[:PONG_SIZE])
d.sender.send(PACKET_TYPE_PONG, d.buf[:PONG_SIZE], route) d.sender.send(PACKET_TYPE_PONG, d.buf[:PONG_SIZE], route, nil)
} }

View File

@ -21,7 +21,7 @@ type connHandler struct {
func newConnHandler( func newConnHandler(
server bool, server bool,
peerIP byte, peerIP byte,
routes [256]*atomic.Pointer[route], routes [MAX_IP]*atomic.Pointer[route],
encPrivKey []byte, encPrivKey []byte,
sender *safeConnSender, sender *safeConnSender,
) *connHandler { ) *connHandler {
@ -65,9 +65,6 @@ func (h *connHandler) mainLoop() {
for { for {
select { select {
case ip := <-h.mediatorUpdates:
state = state.HandleMediatorUpdate(ip)
case update := <-h.peerUpdates: case update := <-h.peerUpdates:
state = data.HandlePeerUpdate(state, update) state = data.HandlePeerUpdate(state, update)
@ -92,13 +89,6 @@ func (h *connHandler) mainLoop() {
} }
} }
func (c *connHandler) UpdateMediator(ip byte) {
select {
case c.mediatorUpdates <- ip:
default:
}
}
func (c *connHandler) HandlePing(w wrapper[Ping]) { func (c *connHandler) HandlePing(w wrapper[Ping]) {
select { select {
case c.pings <- w: case c.pings <- w:

View File

@ -15,10 +15,9 @@ type connSender struct {
encrypted []byte encrypted []byte
nonceBuf []byte nonceBuf []byte
counter uint64 counter uint64
signingKey []byte
} }
func newConnSender(conn *net.UDPConn, srcIP, streamID byte, signingPrivKey []byte) *connSender { func newConnSender(conn *net.UDPConn, srcIP, streamID byte) *connSender {
return &connSender{ return &connSender{
conn: conn, conn: conn,
sourceIP: srcIP, sourceIP: srcIP,
@ -26,34 +25,42 @@ func newConnSender(conn *net.UDPConn, srcIP, streamID byte, signingPrivKey []byt
encrypted: make([]byte, BUFFER_SIZE), encrypted: make([]byte, BUFFER_SIZE),
nonceBuf: make([]byte, NONCE_SIZE), nonceBuf: make([]byte, NONCE_SIZE),
counter: uint64(fasttime.Now()) << 30, // Ensure counter is always increasing. counter: uint64(fasttime.Now()) << 30, // Ensure counter is always increasing.
signingKey: signingPrivKey,
} }
} }
func (cs *connSender) send(packetType byte, packet []byte, route *route) { func (cs *connSender) send(packetType byte, packet []byte, dstRoute, viaRoute *route) {
if dstRoute.useMediator && viaRoute == nil {
log.Printf("Dropping forwarded packet: no mediator.")
return
}
cs.counter++ cs.counter++
nonce := Nonce{ nonce := Nonce{
Timestamp: fasttime.Now(), Timestamp: fasttime.Now(),
Counter: cs.counter, Counter: cs.counter,
SourceIP: cs.sourceIP, SourceIP: cs.sourceIP,
ViaIP: route.ViaIP, DestIP: dstRoute.PeerIP,
DestIP: route.PeerIP,
StreamID: cs.streamID, StreamID: cs.streamID,
PacketType: packetType, PacketType: packetType,
} }
nonce.Marshal(cs.nonceBuf) if dstRoute.useMediator {
nonce.ViaIP = viaRoute.PeerIP
encrypted := encryptPacket(route.EncSharedKey, cs.nonceBuf, packet, cs.encrypted)
var toSend []byte
if route.ViaIP != 0 {
toSend = signPacket(cs.signingKey, encrypted, packet)
} else {
toSend = encrypted
} }
if _, err := cs.conn.WriteToUDPAddrPort(toSend, route.Addr); err != nil { nonce.Marshal(cs.nonceBuf)
addr := dstRoute.Addr
encrypted := encryptPacket(dstRoute.EncSharedKey, cs.nonceBuf, packet, cs.encrypted)
if viaRoute != nil {
packet, encrypted = encrypted, packet
encrypted = encryptPacket(viaRoute.EncSharedKey, cs.nonceBuf, packet, encrypted)
addr = viaRoute.Addr
}
if _, err := cs.conn.WriteToUDPAddrPort(encrypted, addr); err != nil {
log.Fatalf("Failed to write UDP packet: %v\n%s", err, debug.Stack()) log.Fatalf("Failed to write UDP packet: %v\n%s", err, debug.Stack())
} }
} }
@ -69,8 +76,8 @@ func newSafeConnSender(sender *connSender) *safeConnSender {
return &safeConnSender{sender: sender} return &safeConnSender{sender: sender}
} }
func (s *safeConnSender) send(packetType byte, packet []byte, route *route) { func (s *safeConnSender) send(packetType byte, packet []byte, route, viaRoute *route) {
s.lock.Lock() s.lock.Lock()
defer s.lock.Unlock() defer s.lock.Unlock()
s.sender.send(packetType, packet, route) s.sender.send(packetType, packet, route, viaRoute)
} }

View File

@ -11,32 +11,31 @@ func logState(s connState, msg string, args ...any) {
log.Printf("["+s.Name()+"] "+msg, args...) log.Printf("["+s.Name()+"] "+msg, args...)
} }
// ----------------------------------------------------------------------------
// The connection state corresponds to what we're connected TO. // The connection state corresponds to what we're connected TO.
type connState interface { type connState interface {
Name() string Name() string
HandleMediatorUpdate(ip byte) connState //HandleConnReq(wrapper[ConnReq]) connState
HandlePing(wrapper[Ping]) connState HandlePing(wrapper[Ping]) connState
HandlePong(wrapper[Pong]) connState HandlePong(wrapper[Pong]) connState
HandleTimeout() connState HandleTimeout() connState
} }
// Helper function. // Helper functions.
func newConnStateFromPeer(update peerUpdate, data *connData) connState { func newStateFromPeerUpdate(update peerUpdate, data *connData) connState {
peer := update.Peer if update.Peer != nil {
return newStateFromPeer(update.Peer, data)
if peer == nil { }
return newConnNull(data) return newConnNull(data)
} }
func newStateFromPeer(peer *m.Peer, data *connData) connState {
if _, isPublic := netip.AddrFromSlice(peer.PublicIP); isPublic { if _, isPublic := netip.AddrFromSlice(peer.PublicIP); isPublic {
return newStateServerDown(data, peer) return newStateServerDown(data, peer)
} else if data.server { } else if data.server {
return newStateClientDown(data, peer) return newStateClientDown(data, peer)
} else { } else {
return newStateMediatedDown(data, peer) return newStateMediated(data, peer)
} }
} }
@ -56,7 +55,7 @@ func newConnNull(data *connData) connState {
c.pingTimer.Stop() c.pingTimer.Stop()
c.timeoutTimer.Stop() c.timeoutTimer.Stop()
c.addr = c.publicAddr c.addr = c.publicAddr
c.viaIP = 0 c.useMediator = false
c.up = false c.up = false
c.route.Store(nil) c.route.Store(nil)
return c return c
@ -66,8 +65,8 @@ func (c connNull) Name() string {
return "NoPeer" return "NoPeer"
} }
func (c connNull) HandleMediatorUpdate(ip byte) connState { func (c connNull) HandleConnReq(w wrapper[ConnReq]) connState {
c.mediatorIP = ip logState(c, "Ignoring conn request.")
return c return c
} }
@ -102,10 +101,10 @@ func newStateServerDown(data *connData, peer *m.Peer) connState {
c.peer = peer c.peer = peer
c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey) c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
c.publicAddr = pubAddr c.publicAddr = pubAddr
c.pingTimer.Reset(time.Millisecond) // Ping right away to bring up. c.pingTimer.Reset(time.Second) // Ping right away to bring up.
c.timeoutTimer.Stop() // No timeouts yet. c.timeoutTimer.Stop() // No timeouts yet.
c.addr = c.publicAddr c.addr = c.publicAddr
c.viaIP = 0 c.useMediator = false
c.up = false c.up = false
c.route.Store(c.Route()) c.route.Store(c.Route())
@ -116,9 +115,9 @@ func (c stateServerDown) Name() string {
return "Server:DOWN" return "Server:DOWN"
} }
func (c stateServerDown) HandleMediatorUpdate(ip byte) connState { func (c stateServerDown) HandleConnReq(w wrapper[ConnReq]) connState {
// Server connection doesn't use a mediator. // Send ConnResp.
c.mediatorIP = ip // TODO
return c return c
} }
@ -149,7 +148,7 @@ func newStateServerUp(data *connData, w wrapper[Pong]) connState {
c.pingTimer.Reset(pingInterval) c.pingTimer.Reset(pingInterval)
c.timeoutTimer.Reset(timeoutInterval) c.timeoutTimer.Reset(timeoutInterval)
c.addr = w.SrcAddr c.addr = w.SrcAddr
c.viaIP = 0 c.useMediator = false
c.up = true c.up = true
c.route.Store(c.Route()) c.route.Store(c.Route())
return c return c
@ -159,12 +158,6 @@ func (c stateServerUp) Name() string {
return "Server:UP" return "Server:UP"
} }
func (c stateServerUp) HandleMediatorUpdate(ip byte) connState {
// Server connection doesn't use a mediator.
c.mediatorIP = ip
return c
}
func (c stateServerUp) HandlePing(w wrapper[Ping]) connState { func (c stateServerUp) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.") logState(c, "Ignoring ping.")
return c return c
@ -176,7 +169,7 @@ func (c stateServerUp) HandlePong(w wrapper[Pong]) connState {
} }
func (c stateServerUp) HandleTimeout() connState { func (c stateServerUp) HandleTimeout() connState {
return newStateServerDown(c.connData, c.peer) return newStateFromPeer(c.peer, c.connData)
} }
//////////////////////// ////////////////////////
@ -197,7 +190,7 @@ func newStateClientDown(data *connData, peer *m.Peer) connState {
c.encPrivKey = data.encPrivKey c.encPrivKey = data.encPrivKey
c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey) c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
c.addr = c.publicAddr c.addr = c.publicAddr
c.viaIP = 0 c.useMediator = false
c.up = false c.up = false
c.route.Store(c.Route()) c.route.Store(c.Route())
@ -211,13 +204,8 @@ func (c stateClientDown) Name() string {
return "Client:DOWN" return "Client:DOWN"
} }
func (c stateClientDown) HandleMediatorUpdate(ip byte) connState {
// Client connection doesn't use a mediator.
c.mediatorIP = ip
return c
}
func (c stateClientDown) HandlePing(w wrapper[Ping]) connState { func (c stateClientDown) HandlePing(w wrapper[Ping]) connState {
log.Printf("Got ping...")
next := newStateClientUp(c.connData, w) next := newStateClientUp(c.connData, w)
c.sendPong(w) // Have to send after transitionsing so route is ok. c.sendPong(w) // Have to send after transitionsing so route is ok.
return next return next
@ -244,7 +232,7 @@ type stateClientUp struct {
func newStateClientUp(data *connData, w wrapper[Ping]) connState { func newStateClientUp(data *connData, w wrapper[Ping]) connState {
c := stateClientUp{data} c := stateClientUp{data}
c.addr = w.SrcAddr c.addr = w.SrcAddr
c.viaIP = 0 c.useMediator = false
c.up = true c.up = true
c.route.Store(c.Route()) c.route.Store(c.Route())
@ -257,12 +245,6 @@ func (c stateClientUp) Name() string {
return "Client:UP" return "Client:UP"
} }
func (c stateClientUp) HandleMediatorUpdate(ip byte) connState {
// Client connection doesn't use a mediator.
c.mediatorIP = ip
return c
}
func (c stateClientUp) HandlePing(w wrapper[Ping]) connState { func (c stateClientUp) HandlePing(w wrapper[Ping]) connState {
// The connection is from a client. If the client's address changes, we // The connection is from a client. If the client's address changes, we
// should follow that change. // should follow that change.
@ -281,112 +263,51 @@ func (c stateClientUp) HandlePong(w wrapper[Pong]) connState {
} }
func (c stateClientUp) HandleTimeout() connState { func (c stateClientUp) HandleTimeout() connState {
return newStateClientDown(c.connData, c.peer) return newStateFromPeer(c.peer, c.connData)
} }
////////////////////////// //////////////
// Unconnected Mediator // // Mediated //
////////////////////////// //////////////
type stateMediatedDown struct { type stateMediated struct {
*connData *connData
} }
func newStateMediatedDown(data *connData, peer *m.Peer) connState { func newStateMediated(data *connData, peer *m.Peer) connState {
addr, _ := netip.AddrFromSlice(peer.PublicIP) addr, _ := netip.AddrFromSlice(peer.PublicIP)
pubAddr := netip.AddrPortFrom(addr, peer.Port) pubAddr := netip.AddrPortFrom(addr, peer.Port)
c := stateMediatedDown{data} c := stateMediated{data}
c.peer = peer c.peer = peer
c.publicAddr = pubAddr c.publicAddr = pubAddr
c.encPrivKey = data.encPrivKey c.encPrivKey = data.encPrivKey
c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey) c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
c.addr = c.publicAddr c.addr = c.publicAddr
c.viaIP = 0 c.useMediator = true
c.up = false c.up = true
c.route.Store(c.Route()) c.route.Store(c.Route())
c.pingTimer.Stop() // No pings for mediators. c.pingTimer.Stop() // No pings for mediators.
c.timeoutTimer.Stop() // No timeouts yet. c.timeoutTimer.Stop() // No timeouts yet.
// If we have a mediator route, we can connect.
if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
return newStateMediatedUp(data, mRoute)
}
return c return c
} }
func (c stateMediatedDown) Name() string { func (c stateMediated) Name() string {
return "Mediated:DOWN" return "Mediated:UP"
} }
func (c stateMediatedDown) HandleMediatorUpdate(ip byte) connState { func (c stateMediated) HandlePing(w wrapper[Ping]) connState {
c.mediatorIP = ip
if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
return newStateMediatedUp(c.connData, mRoute)
}
return c
}
func (c stateMediatedDown) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignorning ping.") logState(c, "Ignorning ping.")
return c return c
} }
func (c stateMediatedDown) HandlePong(w wrapper[Pong]) connState { func (c stateMediated) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignorning pong.") logState(c, "Ignorning pong.")
return c return c
} }
func (c stateMediatedDown) HandleTimeout() connState { func (c stateMediated) HandleTimeout() connState {
logState(c, "Unexpected timeout.") logState(c, "Unexpected timeout.")
return c return c
} }
////////////////////////
// Connected Mediator //
////////////////////////
type stateMediatedUp struct {
*connData
}
func newStateMediatedUp(data *connData, route *route) connState {
c := stateMediatedUp{data}
c.addr = route.Addr
c.viaIP = route.PeerIP
c.up = true
c.route.Store(c.Route())
// No pings for mediated routes.
c.pingTimer.Stop()
c.timeoutTimer.Stop()
return c
}
func (c stateMediatedUp) Name() string {
return "Mediated:UP"
}
func (c stateMediatedUp) HandleMediatorUpdate(ip byte) connState {
c.mediatorIP = ip
if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
return newStateMediatedUp(c.connData, mRoute)
}
return newStateMediatedDown(c.connData, c.peer)
}
func (c stateMediatedUp) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.")
return c
}
func (c stateMediatedUp) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignoring pong.")
return c
}
func (c stateMediatedUp) HandleTimeout() connState {
return newStateMediatedDown(c.connData, c.peer)
}

View File

@ -2,7 +2,6 @@ package peer
import ( import (
"golang.org/x/crypto/nacl/box" "golang.org/x/crypto/nacl/box"
"golang.org/x/crypto/nacl/sign"
) )
func encryptPacket(sharedKey, nonce, packet, out []byte) []byte { func encryptPacket(sharedKey, nonce, packet, out []byte) []byte {
@ -16,15 +15,6 @@ func decryptPacket(sharedKey, packet, out []byte) (decrypted []byte, ok bool) {
return decrypted, ok return decrypted, ok
} }
// Signed packet should be encrypted with the encryptPacket function first.
func signPacket(privKey, packet, out []byte) []byte {
return sign.Sign(out[:0], packet, (*[64]byte)(privKey))
}
func openPacket(pubKey, packet, out []byte) (encPacket []byte, ok bool) {
return sign.Open(out[:0], packet, (*[32]byte)(pubKey))
}
func computeSharedKey(peerPubKey, privKey []byte) []byte { func computeSharedKey(peerPubKey, privKey []byte) []byte {
shared := [32]byte{} shared := [32]byte{}
box.Precompute(&shared, (*[32]byte)(peerPubKey), (*[32]byte)(privKey)) box.Precompute(&shared, (*[32]byte)(peerPubKey), (*[32]byte)(privKey))

View File

@ -6,7 +6,6 @@ import (
"testing" "testing"
"golang.org/x/crypto/nacl/box" "golang.org/x/crypto/nacl/box"
"golang.org/x/crypto/nacl/sign"
) )
func TestEncryptDecryptPacket(t *testing.T) { func TestEncryptDecryptPacket(t *testing.T) {
@ -105,60 +104,3 @@ func BenchmarkDecryptPacket(b *testing.B) {
decrypted, _ = decryptPacket(sharedDecKey[:], encrypted, decrypted) decrypted, _ = decryptPacket(sharedDecKey[:], encrypted, decrypted)
} }
} }
func BenchmarkSignPacket(b *testing.B) {
_, privKey1, err := sign.GenerateKey(rand.Reader)
if err != nil {
b.Fatal(err)
}
original := make([]byte, 8192)
rand.Read(original)
out := make([]byte, 9000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
signPacket(privKey1[:], original, out)
}
}
func TestSignOpenPacket(t *testing.T) {
pubKey, privKey, err := sign.GenerateKey(rand.Reader)
if err != nil {
t.Fatal(err)
}
packet := make([]byte, MTU)
rand.Read(packet)
signedPacket := signPacket(privKey[:], packet, make([]byte, BUFFER_SIZE))
encPacket, ok := openPacket(pubKey[:], signedPacket, make([]byte, BUFFER_SIZE))
if !ok {
t.Fatal(ok)
}
if !bytes.Equal(encPacket, packet) {
t.Fatal("not equal")
}
}
func BenchmarkOpenPacket(b *testing.B) {
pubKey, privKey, err := sign.GenerateKey(rand.Reader)
if err != nil {
b.Fatal(err)
}
packet := make([]byte, MTU)
rand.Read(packet)
signedPacket := signPacket(privKey[:], packet, make([]byte, 9000))
out := make([]byte, BUFFER_SIZE)
b.ResetTimer()
for i := 0; i < b.N; i++ {
out, _ = openPacket(pubKey[:], signedPacket, out)
}
}

17
peer/duplist.go Normal file
View File

@ -0,0 +1,17 @@
package peer
type dupList struct {
items [64]uint64
index int
}
func (l *dupList) isDuplicate(in uint64) bool {
for _, i := range l.items {
if i == in {
return true
}
}
l.items[l.index] = in
l.index = (l.index + 1) % 64
return false
}

View File

@ -1,22 +1,14 @@
package peer package peer
const ( const (
MAX_IP = 65
DEFAULT_PORT = 515 DEFAULT_PORT = 515
NONCE_SIZE = 24 NONCE_SIZE = 24
KEY_SIZE = 32 KEY_SIZE = 32
SIG_SIZE = 64 SIG_SIZE = 64
MTU = 1376 MTU = 1436
BUFFER_SIZE = 2048 // Definitely big enough. BUFFER_SIZE = 1536 // Definitely big enough.
STREAM_DATA = 0 STREAM_DATA = 0
STREAM_ROUTING = 1 // Routing queries and responses. STREAM_ROUTING = 1 // Routing queries and responses.
// Basic packet types
PACKET_TYPE_DATA = 0
PACKET_TYPE_PING = 1
PACKET_TYPE_PONG = 2
// Packet sizes.
PING_SIZE = 8
PONG_SIZE = 16
) )

View File

@ -14,10 +14,11 @@ func (peer *Peer) ifReader() {
}() }()
var ( var (
sender = newConnSender(peer.conn, peer.ip, STREAM_DATA, peer.signPrivKey) sender = newConnSender(peer.conn, peer.ip, STREAM_DATA)
n int n int
destIP byte destIP byte
router = peer.router router = peer.router
viaRoute *route
route *route route *route
iface = peer.iface iface = peer.iface
err error err error
@ -54,6 +55,16 @@ func (peer *Peer) ifReader() {
continue continue
} }
sender.send(PACKET_TYPE_DATA, packet, route) if route.useMediator {
viaRoute = router.GetMediator()
if viaRoute == nil || !viaRoute.Up {
log.Printf("Dropping packet due to no mediator: %d", destIP)
continue
}
} else {
viaRoute = nil
}
sender.send(PACKET_TYPE_DATA, packet, route, viaRoute)
} }
} }

View File

@ -16,6 +16,7 @@ func (peer *Peer) netReader() {
}() }()
var ( var (
dupList = &dupList{}
n int n int
srcAddr netip.AddrPort srcAddr netip.AddrPort
nonce Nonce nonce Nonce
@ -56,8 +57,9 @@ NEXT_PACKET:
goto NEXT_PACKET goto NEXT_PACKET
} }
if nonce.Counter <= counters[nonce.StreamID][nonce.SourceIP] { if dupList.isDuplicate(nonce.Counter) {
log.Printf("Dropping packet with bad counter: -%d", counters[nonce.StreamID][nonce.SourceIP]-nonce.Counter) //if nonce.Counter+64 <= counters[nonce.StreamID][nonce.SourceIP] {
log.Printf("Dropping packet with bad counter: %d (-%d) - %v", nonce.Counter, counters[nonce.StreamID][nonce.SourceIP]-nonce.Counter, srcAddr)
goto NEXT_PACKET goto NEXT_PACKET
} }
@ -67,26 +69,28 @@ NEXT_PACKET:
goto NEXT_PACKET goto NEXT_PACKET
} }
switch ip {
case nonce.DestIP:
goto DECRYPT
case nonce.ViaIP:
goto VALIDATE_SIGNATURE
default:
log.Printf("Bad packet: %+v", nonce)
goto NEXT_PACKET
}
DECRYPT:
decrypted, ok = decryptPacket(route.EncSharedKey, packet, decrypted) decrypted, ok = decryptPacket(route.EncSharedKey, packet, decrypted)
if !ok { if !ok {
log.Printf("Failed to decrypt packet: %v", nonce) log.Printf("Failed to decrypt packet: %v", nonce)
goto NEXT_PACKET goto NEXT_PACKET
} }
// Only updated after verification. // Only updated after we've decrypted.
if nonce.Counter > counters[nonce.StreamID][nonce.SourceIP] {
counters[nonce.StreamID][nonce.SourceIP] = nonce.Counter counters[nonce.StreamID][nonce.SourceIP] = nonce.Counter
}
switch ip {
case nonce.DestIP:
goto PROCESS_LOCAL
case nonce.ViaIP:
goto FORWARD
default:
log.Printf("Invalid nonce: %+v", nonce)
goto NEXT_PACKET
}
PROCESS_LOCAL:
switch nonce.StreamID { switch nonce.StreamID {
case STREAM_DATA: case STREAM_DATA:
@ -112,16 +116,7 @@ WRITE_ROUTING_PACKET:
goto NEXT_PACKET goto NEXT_PACKET
VALIDATE_SIGNATURE: FORWARD:
decrypted, ok = openPacket(route.SignPubKey, packet, decrypted)
if !ok {
log.Printf("Failed to open signed packet: %v", nonce)
goto NEXT_PACKET
}
// Only updated after verification.
counters[nonce.StreamID][nonce.SourceIP] = nonce.Counter
route = peer.router.GetRoute(nonce.DestIP) route = peer.router.GetRoute(nonce.DestIP)
if route == nil || !route.Up { if route == nil || !route.Up {
@ -130,7 +125,7 @@ VALIDATE_SIGNATURE:
} }
// We don't forward twice. // We don't forward twice.
if route.ViaIP != 0 { if route.useMediator {
log.Printf("Dropping double-forward packet: %v", nonce) log.Printf("Dropping double-forward packet: %v", nonce)
goto NEXT_PACKET goto NEXT_PACKET
} }

View File

@ -15,8 +15,6 @@ type Peer struct {
isMediator bool isMediator bool
encPubKey []byte encPubKey []byte
encPrivKey []byte encPrivKey []byte
signPubKey []byte
signPrivKey []byte
conn *net.UDPConn conn *net.UDPConn
iface io.ReadWriteCloser iface io.ReadWriteCloser
@ -36,8 +34,6 @@ func NewPeer(netName, listenIP string, port uint16) (*Peer, error) {
apiKey: conf.APIKey, apiKey: conf.APIKey,
encPubKey: conf.EncPubKey, encPubKey: conf.EncPubKey,
encPrivKey: conf.EncPrivKey, encPrivKey: conf.EncPrivKey,
signPubKey: conf.SignPubKey,
signPrivKey: conf.SignPrivKey,
} }
_, peer.isServer = netip.AddrFromSlice(conf.PublicIP) _, peer.isServer = netip.AddrFromSlice(conf.PublicIP)

View File

@ -1,6 +1,7 @@
package peer package peer
import ( import (
"log"
"math/rand" "math/rand"
"time" "time"
) )
@ -31,15 +32,11 @@ func (r *Router) manageMediator() {
} }
if len(mediators) == 0 { if len(mediators) == 0 {
ip = 0 r.mediatorIP.Store(nil)
} else { } else {
ip = mediators[rand.Intn(len(mediators))].PeerIP ip = mediators[rand.Intn(len(mediators))].PeerIP
} log.Printf("Got mediator IP: %d", ip)
r.mediatorIP.Store(&ip)
for _, conn := range r.conns {
if conn != nil {
conn.UpdateMediator(ip)
}
} }
} }
} }

View File

@ -55,9 +55,9 @@ func (r *Router) _pollHub(client *http.Client, req *http.Request) {
return return
} }
for i, peer := range state.Peers { for i := range r.conns {
if r.conns[i] != nil { if r.conns[i] != nil {
r.conns[i].UpdatePeer(peerUpdate{PeerIP: byte(i), Peer: peer}) r.conns[i].UpdatePeer(peerUpdate{PeerIP: byte(i), Peer: state.Peers[i]})
} }
} }
} }

View File

@ -2,7 +2,6 @@ package peer
import ( import (
"net/netip" "net/netip"
"unsafe"
"vppn/m" "vppn/m"
) )
@ -15,10 +14,9 @@ type route struct {
PeerIP byte PeerIP byte
Up bool Up bool
Mediator bool Mediator bool
SignPubKey []byte
EncSharedKey []byte // Shared key for encoding / decoding packets. EncSharedKey []byte // Shared key for encoding / decoding packets.
Addr netip.AddrPort // Address to send to. Addr netip.AddrPort
ViaIP byte // If != 0, this is a forwarding address. useMediator bool
} }
type peerUpdate struct { type peerUpdate struct {
@ -41,34 +39,3 @@ func newWrapper[T any](srcAddr netip.AddrPort, nonce Nonce) wrapper[T] {
Nonce: nonce, Nonce: nonce,
} }
} }
// ----------------------------------------------------------------------------
type Ping struct {
SentAt int64 // unix milli
}
func (p *Ping) Parse(buf []byte) {
p.SentAt = *(*int64)(unsafe.Pointer(&buf[0]))
}
func (p Ping) Marshal(buf []byte) {
*(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt
}
// ----------------------------------------------------------------------------
type Pong struct {
SentAt int64 // unix mili
RecvdAt int64 // unix mili
}
func (p *Pong) Parse(buf []byte) {
p.SentAt = *(*int64)(unsafe.Pointer(&buf[0]))
p.RecvdAt = *(*int64)(unsafe.Pointer(&buf[8]))
}
func (p *Pong) Marshal(buf []byte) {
*(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt
*(*int64)(unsafe.Pointer(&buf[8])) = p.RecvdAt
}

View File

@ -12,12 +12,17 @@ type Router struct {
conf m.PeerConfig conf m.PeerConfig
// Routes used by the peer. // Routes used by the peer.
conns [256]*connHandler conns [MAX_IP]*connHandler
routes [256]*atomic.Pointer[route] routes [MAX_IP]*atomic.Pointer[route]
addrs [MAX_IP]*atomic.Pointer[netip.AddrPort]
mediatorIP *atomic.Pointer[byte]
} }
func NewRouter(conf m.PeerConfig, conn *net.UDPConn) *Router { func NewRouter(conf m.PeerConfig, conn *net.UDPConn) *Router {
r := &Router{conf: conf} r := &Router{
conf: conf,
mediatorIP: &atomic.Pointer[byte]{},
}
for i := range r.routes { for i := range r.routes {
r.routes[i] = &atomic.Pointer[route]{} r.routes[i] = &atomic.Pointer[route]{}
@ -25,7 +30,7 @@ func NewRouter(conf m.PeerConfig, conn *net.UDPConn) *Router {
_, isServer := netip.AddrFromSlice(conf.PublicIP) _, isServer := netip.AddrFromSlice(conf.PublicIP)
sender := newConnSender(conn, conf.PeerIP, STREAM_ROUTING, conf.SignPrivKey) sender := newConnSender(conn, conf.PeerIP, STREAM_ROUTING)
for i := range r.conns { for i := range r.conns {
if byte(i) != conf.PeerIP { if byte(i) != conf.PeerIP {
@ -54,6 +59,13 @@ func (rm *Router) GetRoute(ip byte) *route {
return rm.routes[ip].Load() return rm.routes[ip].Load()
} }
func (rm *Router) GetMediator() *route {
if ip := rm.mediatorIP.Load(); ip != nil {
return rm.GetRoute(*ip)
}
return nil
}
func (r *Router) HandlePacket(src netip.AddrPort, nonce Nonce, data []byte) { func (r *Router) HandlePacket(src netip.AddrPort, nonce Nonce, data []byte) {
if nonce.SourceIP == r.conf.PeerIP { if nonce.SourceIP == r.conf.PeerIP {
log.Printf("Packet to self...") log.Printf("Packet to self...")