diff --git a/README.md b/README.md index d642881..b7a7612 100644 --- a/README.md +++ b/README.md @@ -4,8 +4,9 @@ * Peer: router: create process for managing the routing table * Peer: router: track mediators, enable / disable ... -* Hub: track peer last-seen timestamp +* Hub: track peer last-seen timestamp (?) * Peer: local peer discovery - part of RoutingProcessor +* Peer: update hub w/ latest port on startup ## Principles @@ -13,24 +14,21 @@ * Simple setup: via setup link from the hub. * Each peer has full network state replicated from the hub. -## Design +## Routing -* Append nonce to end of packet - * Then it's readable whether signed or unsiged -* Types of packets to send: - * standard: encrypt and send - * Forward via: encrypt, sign and send - * Forward to: send -* Type of packeting read from interface: - * Forward to: check signature - * Forwarded, standard +* Routing is different for public vs non-public peers + * Public: routes are initialized via incoming ping requests + * NonPub: routes are initialized via incoming ping responses -Incoming from net: - * Data for iface - * Packet for forward - * Packet for routingHandler -* Incoming from iface: - * Data for peer +A non-public peer needs to maintain connections with every public peer. + +* Sending: + * Public: send to address + * Non-public: send to a mediator + +* Pings: + * Servers don't need to ping + * Clients need to ping all public and local peers to keep connections open ## Hub Server Configuration @@ -84,3 +82,28 @@ journalctl -f -u hub -n 100 ``` Sign-in and configure. + +## Peer Configuration + +Install the binary somewhere, for example `~/bin/vppn`. + +Create systemd file in `/etc/systemd/system/vppn.service`. + + +``` +Description=vppn +Requires=network.target + +[Service] +AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN +Type=simple +User=user +WorkingDirectory=/home/user/ +ExecStart=/home/user/vppn -name vppn +Restart=always +RestartSec=8 +TimeoutStopSec=24 + +[Install] +WantedBy=default.target +``` diff --git a/cmd/vppn/build.sh b/cmd/vppn/build.sh index ff74f39..96a5e25 100755 --- a/cmd/vppn/build.sh +++ b/cmd/vppn/build.sh @@ -2,3 +2,4 @@ go build sudo setcap cap_net_admin+iep vppn +sudo setcap cap_net_bind_service+iep vppn diff --git a/hub/api/api.go b/hub/api/api.go index 18b00f3..053c574 100644 --- a/hub/api/api.go +++ b/hub/api/api.go @@ -144,7 +144,7 @@ func (a *API) Session_SignIn(s *Session, pwd string) error { type PeerCreateArgs struct { Name string - IP []byte + PublicIP []byte Port uint16 Mediator bool } @@ -209,9 +209,10 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) { peer := &Peer{ PeerIP: peerIP, + Version: idgen.NextID(0), APIKey: idgen.NewToken(), Name: args.Name, - IP: args.IP, + PublicIP: args.PublicIP, Port: args.Port, Mediator: args.Mediator, EncPubKey: encPubKey[:], @@ -229,7 +230,7 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) { HubAddress: conf.HubAddress, APIKey: peer.APIKey, Network: conf.VPNNetwork, - IP: peer.IP, + PublicIP: peer.PublicIP, Port: peer.Port, Mediator: peer.Mediator, EncPubKey: encPubKey[:], @@ -240,6 +241,10 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) { } func (a *API) Peer_Update(p *Peer) error { + a.lock.Lock() + defer a.lock.Unlock() + + p.Version = idgen.NextID(0) return db.Peer_Update(a.db, p) } diff --git a/hub/api/db/generated.go b/hub/api/db/generated.go index 3981fe1..a23498d 100644 --- a/hub/api/db/generated.go +++ b/hub/api/db/generated.go @@ -308,16 +308,17 @@ func Session_List( type Peer struct { PeerIP byte + Version int64 APIKey string Name string - IP []byte + PublicIP []byte Port uint16 Mediator bool EncPubKey []byte SignPubKey []byte } -const Peer_SelectQuery = "SELECT PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey FROM peers" +const Peer_SelectQuery = "SELECT PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey FROM peers" func Peer_Insert( tx TX, @@ -328,7 +329,7 @@ func Peer_Insert( return err } - _, err = tx.Exec("INSERT INTO peers(PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey) VALUES(?,?,?,?,?,?,?,?)", row.PeerIP, row.APIKey, row.Name, row.IP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey) + _, err = tx.Exec("INSERT INTO peers(PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey) VALUES(?,?,?,?,?,?,?,?,?)", row.PeerIP, row.Version, row.APIKey, row.Name, row.PublicIP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey) return err } @@ -341,7 +342,7 @@ func Peer_Update( return err } - result, err := tx.Exec("UPDATE peers SET Name=?,IP=?,Port=?,Mediator=? WHERE PeerIP=?", row.Name, row.IP, row.Port, row.Mediator, row.PeerIP) + result, err := tx.Exec("UPDATE peers SET Version=?,Name=?,PublicIP=?,Port=?,Mediator=? WHERE PeerIP=?", row.Version, row.Name, row.PublicIP, row.Port, row.Mediator, row.PeerIP) if err != nil { return err } @@ -369,7 +370,7 @@ func Peer_UpdateFull( return err } - result, err := tx.Exec("UPDATE peers SET APIKey=?,Name=?,IP=?,Port=?,Mediator=?,EncPubKey=?,SignPubKey=? WHERE PeerIP=?", row.APIKey, row.Name, row.IP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey, row.PeerIP) + result, err := tx.Exec("UPDATE peers SET Version=?,APIKey=?,Name=?,PublicIP=?,Port=?,Mediator=?,EncPubKey=?,SignPubKey=? WHERE PeerIP=?", row.Version, row.APIKey, row.Name, row.PublicIP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey, row.PeerIP) if err != nil { return err } @@ -419,8 +420,8 @@ func Peer_Get( err error, ) { row = &Peer{} - r := tx.QueryRow("SELECT PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey FROM peers WHERE PeerIP=?", PeerIP) - err = r.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey) + r := tx.QueryRow("SELECT PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey FROM peers WHERE PeerIP=?", PeerIP) + err = r.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey) return } @@ -434,7 +435,7 @@ func Peer_GetWhere( ) { row = &Peer{} r := tx.QueryRow(query, args...) - err = r.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey) + err = r.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey) return } @@ -454,7 +455,7 @@ func Peer_Iterate( defer rows.Close() for rows.Next() { row := &Peer{} - err := rows.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey) + err := rows.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey) if !yield(row, err) { return } diff --git a/hub/api/db/sanitize-validate.go b/hub/api/db/sanitize-validate.go index b737866..b4ed8ff 100644 --- a/hub/api/db/sanitize-validate.go +++ b/hub/api/db/sanitize-validate.go @@ -44,10 +44,10 @@ func Session_Validate(s *Session) error { func Peer_Sanitize(p *Peer) { p.Name = strings.TrimSpace(p.Name) - if len(p.IP) != 0 { - addr, ok := netip.AddrFromSlice(p.IP) + if len(p.PublicIP) != 0 { + addr, ok := netip.AddrFromSlice(p.PublicIP) if ok && addr.Is4() { - p.IP = addr.AsSlice() + p.PublicIP = addr.AsSlice() } } if p.Port == 0 { @@ -56,8 +56,8 @@ func Peer_Sanitize(p *Peer) { } func Peer_Validate(p *Peer) error { - if len(p.IP) > 0 { - _, ok := netip.AddrFromSlice(p.IP) + if len(p.PublicIP) > 0 { + _, ok := netip.AddrFromSlice(p.PublicIP) if !ok { return ErrInvalidIP } diff --git a/hub/api/db/tables.defs b/hub/api/db/tables.defs index ddc51b5..c9e35e2 100644 --- a/hub/api/db/tables.defs +++ b/hub/api/db/tables.defs @@ -15,9 +15,10 @@ TABLE sessions OF Session NoUpdate ( TABLE peers OF Peer ( PeerIP byte PK, + Version int64, APIKey string NoUpdate, Name string, - IP []byte, + PublicIP []byte, Port uint16, Mediator bool, EncPubKey []byte NoUpdate, diff --git a/hub/api/migrations/2024-11-30-init.sql b/hub/api/migrations/2024-11-30-init.sql index f61f0fb..eb5da37 100644 --- a/hub/api/migrations/2024-11-30-init.sql +++ b/hub/api/migrations/2024-11-30-init.sql @@ -17,9 +17,10 @@ CREATE INDEX sessions_last_seen_index ON sessions(LastSeenAt); CREATE TABLE peers ( PeerIP INTEGER NOT NULL PRIMARY KEY, -- Final byte. + Version INTEGER NOT NULL, APIKey TEXT NOT NULL UNIQUE, Name TEXT NOT NULL UNIQUE, -- For humans. - IP BLOB NOT NULL, + PublicIP BLOB NOT NULL, Port INTEGER NOT NULL, Mediator INTEGER NOT NULL DEFAULT 0, -- Boolean if peer will forward packets. Must also have public address. EncPubKey BLOB NOT NULL, diff --git a/hub/handler.go b/hub/handler.go index c551ef4..ffaf6fc 100644 --- a/hub/handler.go +++ b/hub/handler.go @@ -64,3 +64,18 @@ func (app *App) handleSignedIn(pattern string, fn handlerFunc) { return fn(s, w, r) }) } + +type peerHandlerFunc func(w http.ResponseWriter, r *http.Request) error + +func (app *App) handlePeer(pattern string, fn peerHandlerFunc) { + wrapped := func(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + if err := fn(w, r); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + } + + app.mux.HandleFunc(pattern, + webutil.WithLogging( + wrapped)) +} diff --git a/hub/handlers.go b/hub/handlers.go index e2e8e70..f24aaaa 100644 --- a/hub/handlers.go +++ b/hub/handlers.go @@ -165,7 +165,7 @@ func (a *App) _adminPeerCreateSubmit(s *api.Session, w http.ResponseWriter, r *h args := api.PeerCreateArgs{} err := webutil.NewFormScanner(r.Form). Scan("Name", &args.Name). - Scan("IP", &ipStr). + Scan("PublicIP", &ipStr). Scan("Port", &args.Port). Scan("Mediator", &args.Mediator). Error() @@ -173,7 +173,7 @@ func (a *App) _adminPeerCreateSubmit(s *api.Session, w http.ResponseWriter, r *h return err } - if args.IP, err = stringToIP(ipStr); err != nil { + if args.PublicIP, err = stringToIP(ipStr); err != nil { return err } @@ -247,7 +247,7 @@ func (a *App) _adminPeerEditSubmit(s *api.Session, w http.ResponseWriter, r *htt } err = webutil.NewFormScanner(r.Form). Scan("Name", &peer.Name). - Scan("IP", &ipStr). + Scan("PublicIP", &ipStr). Scan("Port", &peer.Port). Scan("Mediator", &peer.Mediator). Error() @@ -255,7 +255,7 @@ func (a *App) _adminPeerEditSubmit(s *api.Session, w http.ResponseWriter, r *htt return err } - if peer.IP, err = stringToIP(ipStr); err != nil { + if peer.PublicIP, err = stringToIP(ipStr); err != nil { return err } @@ -298,7 +298,7 @@ func (a *App) _adminPeerDeleteSubmit(s *api.Session, w http.ResponseWriter, r *h return a.redirect(w, r, "/admin/peer/list/") } -func (a *App) _peerCreate(s *api.Session, w http.ResponseWriter, r *http.Request) error { +func (a *App) _peerCreate(w http.ResponseWriter, r *http.Request) error { code := r.FormValue("Code") conf, err := a.api.Peer_Create(code) if err != nil { @@ -308,7 +308,7 @@ func (a *App) _peerCreate(s *api.Session, w http.ResponseWriter, r *http.Request return a.sendJSON(w, conf) } -func (a *App) _peerFetchState(s *api.Session, w http.ResponseWriter, r *http.Request) error { +func (a *App) _peerFetchState(w http.ResponseWriter, r *http.Request) error { _, apiKey, ok := r.BasicAuth() if !ok { log.Printf("1") @@ -333,15 +333,16 @@ func (a *App) _peerFetchState(s *api.Session, w http.ResponseWriter, r *http.Req HubAddress: conf.HubAddress, Network: conf.VPNNetwork, PeerIP: peer.PeerIP, - IP: peer.IP, + PublicIP: peer.PublicIP, Port: peer.Port, } for _, p := range peers { state.Peers[p.PeerIP] = &m.Peer{ PeerIP: p.PeerIP, + Version: p.Version, Name: p.Name, - IP: p.IP, + PublicIP: p.PublicIP, Port: p.Port, Mediator: p.Mediator, EncPubKey: p.EncPubKey, diff --git a/hub/routes.go b/hub/routes.go index fe026d1..0fa47f2 100644 --- a/hub/routes.go +++ b/hub/routes.go @@ -26,6 +26,6 @@ func (a *App) registerRoutes() { a.handleSignedIn("GET /admin/peer/delete/", a._adminPeerDelete) a.handleSignedIn("POST /admin/peer/delete/", a._adminPeerDeleteSubmit) - a.handleNotSignedIn("GET /peer/create/", a._peerCreate) - a.handleNotSignedIn("GET /peer/fetch-state/", a._peerFetchState) + a.handlePeer("GET /peer/create/", a._peerCreate) + a.handlePeer("GET /peer/fetch-state/", a._peerFetchState) } diff --git a/hub/templates/admin-peer-create.html b/hub/templates/admin-peer-create.html index 2eb6c70..f2f0c39 100644 --- a/hub/templates/admin-peer-create.html +++ b/hub/templates/admin-peer-create.html @@ -8,8 +8,8 @@

-
- +
+


diff --git a/hub/templates/admin-peer-delete.html b/hub/templates/admin-peer-delete.html index 4e0618b..a330eb8 100644 --- a/hub/templates/admin-peer-delete.html +++ b/hub/templates/admin-peer-delete.html @@ -13,8 +13,8 @@

-
- +
+


diff --git a/hub/templates/admin-peer-edit.html b/hub/templates/admin-peer-edit.html index 3e04184..c6081b1 100644 --- a/hub/templates/admin-peer-edit.html +++ b/hub/templates/admin-peer-edit.html @@ -13,8 +13,8 @@

-
- +
+


diff --git a/hub/templates/admin-peer-list.html b/hub/templates/admin-peer-list.html index 4f09f1a..4acadc7 100644 --- a/hub/templates/admin-peer-list.html +++ b/hub/templates/admin-peer-list.html @@ -11,7 +11,7 @@ PeerIP Name - IP + Public IP Port Mediator @@ -25,7 +25,7 @@ {{.Name}} - {{ipToString .IP}} + {{ipToString .PublicIP}} {{.Port}} {{if .Mediator}}T{{else}}F{{end}} diff --git a/hub/templates/admin-peer-view.html b/hub/templates/admin-peer-view.html index 09f254b..89ff754 100644 --- a/hub/templates/admin-peer-view.html +++ b/hub/templates/admin-peer-view.html @@ -10,7 +10,7 @@ - + diff --git a/m/models.go b/m/models.go index d6d3103..29c39f9 100644 --- a/m/models.go +++ b/m/models.go @@ -6,7 +6,7 @@ type PeerConfig struct { HubAddress string Network []byte APIKey string - IP []byte + PublicIP []byte Port uint16 Mediator bool EncPubKey []byte @@ -17,8 +17,9 @@ type PeerConfig struct { type Peer struct { PeerIP byte + Version int64 Name string - IP []byte + PublicIP []byte Port uint16 Mediator bool EncPubKey []byte @@ -29,10 +30,10 @@ type NetworkState struct { HubAddress string // The requester's data: - Network []byte - PeerIP byte - IP []byte - Port uint16 + Network []byte + PeerIP byte + PublicIP []byte + Port uint16 // All peer data. Peers [256]*Peer diff --git a/peer/conn-states.dot b/peer/conn-states.dot new file mode 100644 index 0000000..3c8c5f7 --- /dev/null +++ b/peer/conn-states.dot @@ -0,0 +1,9 @@ +digraph d { + init -> null; + init -> unconnectedServer; + init -> unconnectedClient; + init -> unconnectedMediated; + unconnectedServer -> connectedServer; + unconnectedClient -> connectedClient; + unconnectedMediated -> connectedMediated; +} \ No newline at end of file diff --git a/peer/conndata.go b/peer/conndata.go new file mode 100644 index 0000000..dbbc29c --- /dev/null +++ b/peer/conndata.go @@ -0,0 +1,83 @@ +package peer + +import ( + "net/netip" + "sync/atomic" + "time" + "vppn/m" +) + +const ( + pingInterval = time.Second * 8 + timeoutInterval = 32 * time.Second +) + +type connData struct { + // Shared data. + routes [256]*atomic.Pointer[route] + route *atomic.Pointer[route] + + // Local data. + mediatorIP byte + + // Peer data. + server bool // Never changes. + peerIP byte // Never changes. + encPrivKey []byte // Never changes. + + peer *m.Peer // From hub. + encSharedKey []byte // From hub + private key. + publicAddr netip.AddrPort // From hub. + + pingTimer *time.Timer + timeoutTimer *time.Timer + + // Routing data. + addr netip.AddrPort + viaIP byte + up bool + + // For sending. + buf []byte + sender *safeConnSender +} + +func (d *connData) Route() *route { + return &route{ + PeerIP: d.peerIP, + Up: d.up, + Mediator: d.peer.Mediator, + SignPubKey: d.peer.SignPubKey, + EncSharedKey: d.encSharedKey, + Addr: d.addr, + ViaIP: d.viaIP, + } +} + +func (d *connData) HandlePeerUpdate(state connState, update peerUpdate) connState { + if d.peer != nil && update.Peer != nil && d.peer.Version == update.Peer.Version { + return state + } + if d.peer == nil && update.Peer == nil { + return state + } + return newConnStateFromPeer(update, d) +} + +func (d *connData) HandleSendPing() { + route := d.route.Load() + req := Ping{SentAt: time.Now().UnixMilli()} + req.Marshal(d.buf[:PING_SIZE]) + d.sender.send(PACKET_TYPE_PING, d.buf[:PING_SIZE], route) + d.pingTimer.Reset(pingInterval) +} + +func (d *connData) sendPong(w wrapper[Ping]) { + route := d.route.Load() + pong := Pong{ + SentAt: w.T.SentAt, + RecvdAt: time.Now().UnixMilli(), + } + pong.Marshal(d.buf[:PONG_SIZE]) + d.sender.send(PACKET_TYPE_PONG, d.buf[:PONG_SIZE], route) +} diff --git a/peer/connhandler.go b/peer/connhandler.go new file mode 100644 index 0000000..4eb2749 --- /dev/null +++ b/peer/connhandler.go @@ -0,0 +1,121 @@ +package peer + +import ( + "fmt" + "log" + "runtime/debug" + "sync/atomic" + "time" +) + +type connHandler struct { + // Communication. + mediatorUpdates chan byte + peerUpdates chan peerUpdate + pings chan wrapper[Ping] + pongs chan wrapper[Pong] + + data *connData +} + +func newConnHandler( + server bool, + peerIP byte, + routes [256]*atomic.Pointer[route], + encPrivKey []byte, + sender *safeConnSender, +) *connHandler { + d := &connData{ + server: server, + pingTimer: time.NewTimer(pingInterval), + timeoutTimer: time.NewTimer(timeoutInterval), + routes: routes, + route: routes[peerIP], + peerIP: peerIP, + encPrivKey: encPrivKey, + buf: make([]byte, BUFFER_SIZE), + sender: sender, + } + + h := &connHandler{ + mediatorUpdates: make(chan byte, 1), + peerUpdates: make(chan peerUpdate, 1), + pings: make(chan wrapper[Ping], 1), + pongs: make(chan wrapper[Pong], 1), + data: d, + } + + go h.mainLoop() + return h +} + +func (h *connHandler) mainLoop() { + defer func() { + if r := recover(); r != nil { + fmt.Println("stacktrace from panic: \n" + string(debug.Stack())) + } + }() + + var ( + data = h.data + state = newConnNull(data) + name = state.Name() + ) + + for { + select { + + case ip := <-h.mediatorUpdates: + state = state.HandleMediatorUpdate(ip) + + case update := <-h.peerUpdates: + state = data.HandlePeerUpdate(state, update) + + case w := <-h.pings: + state = state.HandlePing(w) + + case w := <-h.pongs: + state = state.HandlePong(w) + + case <-data.pingTimer.C: + state.HandleSendPing() + + case <-data.timeoutTimer.C: + log.Printf("[%s] Connection timeout.", state.Name()) + state = state.HandleTimeout() + } + + if state.Name() != name { + log.Printf("[%03d] STATE: %s --> %s", data.peerIP, name, state.Name()) + name = state.Name() + } + } +} + +func (c *connHandler) UpdateMediator(ip byte) { + select { + case c.mediatorUpdates <- ip: + default: + } +} + +func (c *connHandler) HandlePing(w wrapper[Ping]) { + select { + case c.pings <- w: + default: + } +} + +func (c *connHandler) HandlePong(w wrapper[Pong]) { + select { + case c.pongs <- w: + default: + } +} + +func (c *connHandler) UpdatePeer(update peerUpdate) { + select { + case c.peerUpdates <- update: + default: + } +} diff --git a/peer/connsender.go b/peer/connsender.go new file mode 100644 index 0000000..1970d08 --- /dev/null +++ b/peer/connsender.go @@ -0,0 +1,83 @@ +package peer + +import ( + "log" + "net" + "runtime/debug" + "sync" + "vppn/fasttime" +) + +type connSender struct { + conn *net.UDPConn + sourceIP byte + streamID byte + encrypted []byte + nonceBuf []byte + counterTS uint64 + counter uint64 + signingKey []byte +} + +func newConnSender(conn *net.UDPConn, srcIP, streamID byte, signingPrivKey []byte) *connSender { + return &connSender{ + conn: conn, + sourceIP: srcIP, + streamID: streamID, + encrypted: make([]byte, BUFFER_SIZE), + nonceBuf: make([]byte, NONCE_SIZE), + signingKey: signingPrivKey, + } +} + +func (cs *connSender) send(packetType byte, packet []byte, route *route) { + now := uint64(fasttime.Now()) + if cs.counterTS < now { + cs.counterTS = now + cs.counter = now << 30 + } + + cs.counter++ + + nonce := Nonce{ + Counter: cs.counter, + SourceIP: cs.sourceIP, + ViaIP: route.ViaIP, + DestIP: route.PeerIP, + StreamID: cs.streamID, + PacketType: packetType, + } + + nonce.Marshal(cs.nonceBuf) + + encrypted := encryptPacket(route.EncSharedKey, cs.nonceBuf, packet, cs.encrypted) + + var toSend []byte + if route.ViaIP != 0 { + toSend = signPacket(cs.signingKey, encrypted, packet) + } else { + toSend = encrypted + } + + log.Printf("Sending to %v: %+v", route.Addr, nonce) + if _, err := cs.conn.WriteToUDPAddrPort(toSend, route.Addr); err != nil { + log.Fatalf("Failed to write UDP packet: %v\n%s", err, debug.Stack()) + } +} + +// ---------------------------------------------------------------------------- + +type safeConnSender struct { + lock sync.Mutex + sender *connSender +} + +func newSafeConnSender(sender *connSender) *safeConnSender { + return &safeConnSender{sender: sender} +} + +func (s *safeConnSender) send(packetType byte, packet []byte, route *route) { + s.lock.Lock() + defer s.lock.Unlock() + s.sender.send(packetType, packet, route) +} diff --git a/peer/connstate.go b/peer/connstate.go new file mode 100644 index 0000000..441532f --- /dev/null +++ b/peer/connstate.go @@ -0,0 +1,393 @@ +package peer + +import ( + "log" + "net/netip" + "time" + "vppn/m" +) + +func logState(s connState, msg string, args ...any) { + log.Printf("["+s.Name()+"] "+msg, args...) +} + +// ---------------------------------------------------------------------------- + +// The connection state corresponds to what we're connected TO. +type connState interface { + Name() string + HandleMediatorUpdate(ip byte) connState + HandleSendPing() + HandlePing(wrapper[Ping]) connState + HandlePong(wrapper[Pong]) connState + HandleTimeout() connState +} + +// Helper function. + +func newConnStateFromPeer(update peerUpdate, data *connData) connState { + peer := update.Peer + + if peer == nil { + return newConnNull(data) + } + + if _, isPublic := netip.AddrFromSlice(peer.PublicIP); isPublic { + return newConnUnconnectedServer(data, peer) + } else if data.server { + return newConnUnconnectedClient(data, peer) + } else { + return newConnUnconnectedMediator(data, peer) + } +} + +///////////////////// +// Null Connection // +///////////////////// + +type connNull struct { + *connData +} + +func newConnNull(data *connData) connState { + c := connNull{data} + c.peer = nil + c.encSharedKey = nil + c.publicAddr = netip.AddrPort{} + c.pingTimer.Stop() + c.timeoutTimer.Stop() + c.addr = c.publicAddr + c.viaIP = 0 + c.up = false + c.route.Store(nil) + return c +} + +func (c connNull) Name() string { + return "NoPeer" +} + +func (c connNull) HandleMediatorUpdate(ip byte) connState { + c.mediatorIP = ip + return c +} + +func (c connNull) HandlePing(w wrapper[Ping]) connState { + logState(c, "Ignoring ping.") + return c +} + +func (c connNull) HandlePong(w wrapper[Pong]) connState { + logState(c, "Ignoring pong.") + return c +} + +func (c connNull) HandleTimeout() connState { + logState(c, "Unexpected timeout.") + return c +} + +//////////////////////// +// Unconnected Server // +//////////////////////// + +type connUnconnectedServer struct { + *connData +} + +func newConnUnconnectedServer(data *connData, peer *m.Peer) connState { + addr, _ := netip.AddrFromSlice(peer.PublicIP) + pubAddr := netip.AddrPortFrom(addr, peer.Port) + + c := connUnconnectedServer{data} + c.peer = peer + c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey) + c.publicAddr = pubAddr + c.pingTimer.Reset(time.Millisecond) // Ping right away to bring up. + c.timeoutTimer.Stop() // No timeouts yet. + c.addr = c.publicAddr + c.viaIP = 0 + c.up = false + c.route.Store(c.Route()) + + return c +} + +func (c connUnconnectedServer) Name() string { + return "ServerUnconnected" +} + +func (c connUnconnectedServer) HandleMediatorUpdate(ip byte) connState { + // Server connection doesn't use a mediator. + c.mediatorIP = ip + return c +} + +func (c connUnconnectedServer) HandlePing(w wrapper[Ping]) connState { + logState(c, "Ignoring ping.") + return c +} + +func (c connUnconnectedServer) HandlePong(w wrapper[Pong]) connState { + return newConnConnectedServer(c.connData, w) +} + +func (c connUnconnectedServer) HandleTimeout() connState { + logState(c, "Unexpected timeout.") + return c +} + +////////////////////// +// Connected Server // +////////////////////// + +type connConnectedServer struct { + *connData +} + +func newConnConnectedServer(data *connData, w wrapper[Pong]) connState { + c := connConnectedServer{data} + c.pingTimer.Reset(pingInterval) + c.timeoutTimer.Reset(timeoutInterval) + c.addr = w.SrcAddr + c.viaIP = 0 + c.up = true + c.route.Store(c.Route()) + return c +} + +func (c connConnectedServer) Name() string { + return "ServerConnected" +} + +func (c connConnectedServer) HandleMediatorUpdate(ip byte) connState { + // Server connection doesn't use a mediator. + c.mediatorIP = ip + return c +} + +func (c connConnectedServer) HandlePing(w wrapper[Ping]) connState { + logState(c, "Ignoring ping.") + return c +} + +func (c connConnectedServer) HandlePong(w wrapper[Pong]) connState { + c.timeoutTimer.Reset(timeoutInterval) + return c +} + +func (c connConnectedServer) HandleTimeout() connState { + return newConnUnconnectedServer(c.connData, c.peer) +} + +//////////////////////// +// Unconnected Client // +//////////////////////// + +type connUnconnectedClient struct { + *connData +} + +func newConnUnconnectedClient(data *connData, peer *m.Peer) connState { + addr, _ := netip.AddrFromSlice(peer.PublicIP) + pubAddr := netip.AddrPortFrom(addr, peer.Port) + + c := connUnconnectedClient{data} + c.peer = peer + c.publicAddr = pubAddr + c.encPrivKey = data.encPrivKey + c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey) + c.addr = c.publicAddr + c.viaIP = 0 + c.up = false + c.route.Store(c.Route()) + + c.pingTimer.Stop() // Conncection is from client => pings incoming. + c.timeoutTimer.Stop() // No timeouts yet. + + return c +} + +func (c connUnconnectedClient) Name() string { + return "ClientUnconnected" +} + +func (c connUnconnectedClient) HandleMediatorUpdate(ip byte) connState { + // Client connection doesn't use a mediator. + c.mediatorIP = ip + return c +} + +func (c connUnconnectedClient) HandlePing(w wrapper[Ping]) connState { + next := newConnConnectedClient(c.connData, w) + c.sendPong(w) // Have to send after transitionsing so route is ok. + return next +} + +func (c connUnconnectedClient) HandlePong(w wrapper[Pong]) connState { + logState(c, "Ignorning pong.") + return c +} + +func (c connUnconnectedClient) HandleTimeout() connState { + logState(c, "Unexpected timeout.") + return c +} + +////////////////////// +// Connected Client // +////////////////////// + +type connConnectedClient struct { + *connData +} + +func newConnConnectedClient(data *connData, w wrapper[Ping]) connState { + c := connConnectedClient{data} + c.addr = w.SrcAddr + c.viaIP = 0 + c.up = true + c.route.Store(c.Route()) + + c.pingTimer.Stop() // Conncection is from client => pings incoming. + c.timeoutTimer.Reset(timeoutInterval) + return c +} + +func (c connConnectedClient) Name() string { + return "ClientConnected" +} + +func (c connConnectedClient) HandleMediatorUpdate(ip byte) connState { + // Client connection doesn't use a mediator. + c.mediatorIP = ip + return c +} + +func (c connConnectedClient) HandlePing(w wrapper[Ping]) connState { + // The connection is from a client. If the client's address changes, we + // should follow that change. + if c.addr != w.SrcAddr { + c.addr = w.SrcAddr + c.route.Store(c.Route()) + } + c.sendPong(w) + c.timeoutTimer.Reset(timeoutInterval) + return c +} + +func (c connConnectedClient) HandlePong(w wrapper[Pong]) connState { + logState(c, "Ignoring pong.") + return c +} + +func (c connConnectedClient) HandleTimeout() connState { + return newConnUnconnectedClient(c.connData, c.peer) +} + +////////////////////////// +// Unconnected Mediator // +////////////////////////// + +type connUnconnectedMediator struct { + *connData +} + +func newConnUnconnectedMediator(data *connData, peer *m.Peer) connState { + addr, _ := netip.AddrFromSlice(peer.PublicIP) + pubAddr := netip.AddrPortFrom(addr, peer.Port) + + c := connUnconnectedMediator{data} + c.peer = peer + c.publicAddr = pubAddr + c.encPrivKey = data.encPrivKey + c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey) + c.addr = c.publicAddr + c.viaIP = 0 + c.up = false + c.route.Store(c.Route()) + + c.pingTimer.Stop() // No pings for mediators. + c.timeoutTimer.Stop() // No timeouts yet. + + // If we have a mediator route, we can connect. + if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil { + return newConnConnectedMediator(data, mRoute) + } + + return c +} + +func (c connUnconnectedMediator) Name() string { + return "MediatorUnconnected" +} + +func (c connUnconnectedMediator) HandleMediatorUpdate(ip byte) connState { + c.mediatorIP = ip + if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil { + return newConnConnectedMediator(c.connData, mRoute) + } + return c +} + +func (c connUnconnectedMediator) HandlePing(w wrapper[Ping]) connState { + logState(c, "Ignorning ping.") + return c +} + +func (c connUnconnectedMediator) HandlePong(w wrapper[Pong]) connState { + logState(c, "Ignorning pong.") + return c +} + +func (c connUnconnectedMediator) HandleTimeout() connState { + logState(c, "Unexpected timeout.") + return c +} + +//////////////////////// +// Connected Mediator // +//////////////////////// + +type connConnectedMediator struct { + *connData +} + +func newConnConnectedMediator(data *connData, route *route) connState { + c := connConnectedMediator{data} + c.addr = route.Addr + c.viaIP = route.PeerIP + c.up = true + c.route.Store(c.Route()) + + // No pings for mediated routes. + c.pingTimer.Stop() + c.timeoutTimer.Stop() + return c +} + +func (c connConnectedMediator) Name() string { + return "MediatorConnected" +} + +func (c connConnectedMediator) HandleMediatorUpdate(ip byte) connState { + c.mediatorIP = ip + if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil { + return newConnConnectedMediator(c.connData, mRoute) + } + return newConnUnconnectedMediator(c.connData, c.peer) +} + +func (c connConnectedMediator) HandlePing(w wrapper[Ping]) connState { + logState(c, "Ignoring ping.") + return c +} + +func (c connConnectedMediator) HandlePong(w wrapper[Pong]) connState { + logState(c, "Ignoring pong.") + return c +} + +func (c connConnectedMediator) HandleTimeout() connState { + return newConnUnconnectedMediator(c.connData, c.peer) +} diff --git a/peer/globals.go b/peer/globals.go index 0cfebf2..fdb3286 100644 --- a/peer/globals.go +++ b/peer/globals.go @@ -13,4 +13,10 @@ const ( // Basic packet types PACKET_TYPE_DATA = 0 + PACKET_TYPE_PING = 1 + PACKET_TYPE_PONG = 2 + + // Packet sizes. + PING_SIZE = 8 + PONG_SIZE = 16 ) diff --git a/peer/main.go b/peer/main.go index 94a6f67..d766ce2 100644 --- a/peer/main.go +++ b/peer/main.go @@ -10,8 +10,6 @@ import ( "os" "runtime/debug" "vppn/m" - - _ "net/http/pprof" ) func Main() { @@ -21,11 +19,6 @@ func Main() { } }() - go func() { - log.Printf("Serving on localhost:6060...") - log.Println(http.ListenAndServe("localhost:6060", nil)) - }() - var ( netName string initURL string diff --git a/peer/nonce.go b/peer/nonce.go index 4ebabf4..daca6e5 100644 --- a/peer/nonce.go +++ b/peer/nonce.go @@ -2,7 +2,16 @@ package peer import "unsafe" -func ParseNonceBytes(nb []byte, nonce *Nonce) { +type Nonce struct { + Counter uint64 + SourceIP byte + ViaIP byte + DestIP byte + StreamID byte // The stream, see STREAM_* constants + PacketType byte // The packet type. See PACKET_* constants. +} + +func (nonce *Nonce) Parse(nb []byte) { nonce.Counter = *(*uint64)(unsafe.Pointer(&nb[0])) nonce.SourceIP = nb[8] nonce.ViaIP = nb[9] @@ -11,14 +20,13 @@ func ParseNonceBytes(nb []byte, nonce *Nonce) { nonce.PacketType = nb[12] } -func MarshalNonce(nonce Nonce, buf []byte) { +func (nonce Nonce) Marshal(buf []byte) { *(*uint64)(unsafe.Pointer(&buf[0])) = nonce.Counter buf[8] = nonce.SourceIP buf[9] = nonce.ViaIP buf[10] = nonce.DestIP buf[11] = nonce.StreamID buf[12] = nonce.PacketType - clear(buf[13:]) } func CounterTimestamp(counter uint64) int64 { diff --git a/peer/nonce_test.go b/peer/nonce_test.go index fd6f883..e962fc9 100644 --- a/peer/nonce_test.go +++ b/peer/nonce_test.go @@ -15,10 +15,10 @@ func TestMarshalParseNonce(t *testing.T) { } buf := make([]byte, NONCE_SIZE) - MarshalNonce(nIn, buf) + nIn.Marshal(buf) nOut := Nonce{} - ParseNonceBytes(buf, &nOut) + nOut.Parse(buf) if nIn != nOut { t.Fatal(nIn, nOut) } diff --git a/peer/peer-ifreader.go b/peer/peer-ifreader.go index 1a57e97..8b56b11 100644 --- a/peer/peer-ifreader.go +++ b/peer/peer-ifreader.go @@ -3,10 +3,7 @@ package peer import ( "fmt" "log" - "net" - "net/netip" "runtime/debug" - "vppn/fasttime" ) func (peer *Peer) ifReader() { @@ -17,36 +14,16 @@ func (peer *Peer) ifReader() { }() var ( + sender = newConnSender(peer.conn, peer.ip, STREAM_DATA, peer.signPrivKey) n int destIP byte router = peer.router route *route iface = peer.iface - nonce = Nonce{ - SourceIP: peer.ip, - PacketType: PACKET_TYPE_DATA, - StreamID: STREAM_DATA, - } - err error - now uint64 - counterTS uint64 - counter uint64 - packet = make([]byte, BUFFER_SIZE) - encrypted = make([]byte, BUFFER_SIZE) - nonceBuf = make([]byte, NONCE_SIZE) - toSend []byte - signingKey = peer.signPrivKey - - reqPool = make(chan udpWriteReq, 1024) - writeChan = make(chan udpWriteReq, 1024) + err error + packet = make([]byte, BUFFER_SIZE) ) - for range cap(reqPool) { - reqPool <- udpWriteReq{Packet: make([]byte, BUFFER_SIZE)} - } - - go udpWriter(writeChan, peer.conn, reqPool) - for { n, err = iface.Read(packet[:BUFFER_SIZE]) if err != nil { @@ -61,54 +38,13 @@ func (peer *Peer) ifReader() { packet = packet[:n] destIP = packet[19] + route = router.GetRoute(destIP) - if route == nil { + if route == nil || !route.Up { log.Printf("Dropping packet for non-existent IP: %d", destIP) continue } - now = uint64(fasttime.Now()) - if counterTS < now { - counterTS = now - counter = now << 30 - } - - counter++ - - nonce.Counter = counter - nonce.ViaIP = route.ViaIP - nonce.DestIP = destIP - - MarshalNonce(nonce, nonceBuf) - - encrypted = encryptPacket(route.EncSharedKey, nonceBuf, packet, encrypted) - - if route.ViaIP != 0 { - toSend = signPacket(signingKey, encrypted, packet) - } else { - toSend = encrypted - } - - req := <-reqPool - req.Addr = route.Addr - req.Packet = req.Packet[:len(toSend)] - copy(req.Packet, toSend) - - writeChan <- req - } -} - -type udpWriteReq struct { - Addr netip.AddrPort - Packet []byte -} - -func udpWriter(in chan udpWriteReq, conn *net.UDPConn, reqPool chan udpWriteReq) { - var err error - for req := range in { - if _, err = conn.WriteToUDPAddrPort(req.Packet, req.Addr); err != nil { - log.Fatalf("Failed to write UDP packet: %v", err) - } - reqPool <- req + sender.send(PACKET_TYPE_DATA, packet, route) } } diff --git a/peer/peer-netreader.go b/peer/peer-netreader.go index b8e719d..d4c44e2 100644 --- a/peer/peer-netreader.go +++ b/peer/peer-netreader.go @@ -2,8 +2,8 @@ package peer import ( "fmt" - "io" "log" + "net/netip" "runtime/debug" "vppn/fasttime" ) @@ -16,35 +16,26 @@ func (peer *Peer) netReader() { }() var ( - n int - //srcAddr *net.UDPAddr + n int + srcAddr netip.AddrPort nonce Nonce packet = make([]byte, BUFFER_SIZE) decrypted = make([]byte, BUFFER_SIZE) - toWrite []byte route *route ok bool err error conn = peer.conn ip = peer.ip counters = [2][256]uint64{} // Counter by stream and IP. - - ifaceChan = make(chan []byte, 1024) - reqPool = make(chan []byte, 1024) ) - for range cap(reqPool) { - reqPool <- make([]byte, BUFFER_SIZE) - } - - go ifWriter(ifaceChan, peer.iface, reqPool) - NEXT_PACKET: - n, _, err = conn.ReadFromUDPAddrPort(packet[:BUFFER_SIZE]) + n, srcAddr, err = conn.ReadFromUDPAddrPort(packet[:BUFFER_SIZE]) if err != nil { log.Fatalf("Failed to read UDP packet: %v", err) } + srcAddr = netip.AddrPortFrom(srcAddr.Addr().Unmap(), srcAddr.Port()) if n < NONCE_SIZE { log.Printf("Dropping short UDP packet: %d", n) @@ -52,8 +43,7 @@ NEXT_PACKET: } packet = packet[:n] - - ParseNonceBytes(packet[n-NONCE_SIZE:], &nonce) + nonce.Parse(packet[n-NONCE_SIZE:]) // Drop after 8 seconds. if CounterTimestamp(nonce.Counter) < fasttime.Now()-8 { @@ -62,13 +52,13 @@ NEXT_PACKET: } if nonce.StreamID > 1 { - log.Printf("Dropping invalid stream ID: %v", nonce) + log.Printf("Dropping invalid stream ID: %+v", nonce) goto NEXT_PACKET } // Check source counter. if nonce.Counter <= counters[nonce.StreamID][nonce.SourceIP] { - log.Printf("Dropping packet with bad counter: %v", nonce) + log.Printf("Dropping packet with bad counter: %+v", nonce) goto NEXT_PACKET } @@ -76,7 +66,7 @@ NEXT_PACKET: route = peer.router.GetRoute(nonce.SourceIP) if route == nil { - log.Printf("Dropping packet without route: %v", nonce) + log.Printf("Dropping packet without route: %+v", nonce) goto NEXT_PACKET } @@ -86,7 +76,7 @@ NEXT_PACKET: case nonce.ViaIP: goto VALIDATE_SIGNATURE default: - log.Printf("Bad packet: %v", nonce) + log.Printf("Bad packet: %+v", nonce) goto NEXT_PACKET } @@ -110,49 +100,41 @@ DECRYPT: WRITE_IFACE_DATA: - //toWrite = toWrite[:len(decrypted)] - //copy(toWrite, decrypted) - - toWrite = <-reqPool - ifaceChan <- append(toWrite[:0], decrypted...) + if _, err = peer.iface.Write(decrypted); err != nil { + log.Fatalf("Failed to write to interface: %v", err) + } goto NEXT_PACKET WRITE_ROUTING_PACKET: - //peer.processRoutingPacket(decrypted) - //peer.routeManager.ProcessPacket(decrypted) + peer.router.HandlePacket(srcAddr, nonce, decrypted) goto NEXT_PACKET VALIDATE_SIGNATURE: - // We don't forward twice. - if route.Mediator { - log.Printf("Dropping double-forward packet: %v", nonce) - goto NEXT_PACKET - } - decrypted, ok = openPacket(route.SignPubKey, packet, decrypted) if !ok { log.Printf("Failed to open signed packet: %v", nonce) goto NEXT_PACKET } + route = peer.router.GetRoute(nonce.DestIP) + if route == nil || !route.Up { + log.Printf("Dropping mediated packet, route not available: %v", nonce) + goto NEXT_PACKET + } + + // We don't forward twice. + if route.ViaIP != 0 { + log.Printf("Dropping double-forward packet: %v", nonce) + goto NEXT_PACKET + } + if _, err = conn.WriteToUDPAddrPort(decrypted, route.Addr); err != nil { log.Fatalf("Failed to forward packet: %v", err) } goto NEXT_PACKET } - -func ifWriter(in chan []byte, iface io.ReadWriteCloser, out chan []byte) { - var err error - for packet := range in { - if _, err = iface.Write(packet); err != nil { - log.Printf("Size: %d", len(packet)) - log.Fatalf("Failed to write to interface: %v", err) - } - out <- packet - } -} diff --git a/peer/peer.go b/peer/peer.go index 0b767e6..c47fcaa 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -4,14 +4,15 @@ import ( "io" "log" "net" + "net/netip" ) type Peer struct { - // Immutable data. - ip byte // Last byte of IPv4 address. - hubAddr string - apiKey string - + ip byte // Last byte of IPv4 address. + hubAddr string + apiKey string + isServer bool + isMediator bool encPubKey []byte encPrivKey []byte signPubKey []byte @@ -31,6 +32,7 @@ func NewPeer(netName, listenIP string, port uint16) (*Peer, error) { peer := &Peer{ ip: conf.PeerIP, hubAddr: conf.HubAddress, + isMediator: conf.Mediator, apiKey: conf.APIKey, encPubKey: conf.EncPubKey, encPrivKey: conf.EncPrivKey, @@ -38,16 +40,16 @@ func NewPeer(netName, listenIP string, port uint16) (*Peer, error) { signPrivKey: conf.SignPrivKey, } - peer.router = NewRouter(conf) + _, peer.isServer = netip.AddrFromSlice(conf.PublicIP) port = determinePort(conf.Port, port) - conn, err := openUDPConn(listenIP, port) + peer.conn, err = openUDPConn(listenIP, port) if err != nil { return nil, err } - peer.conn = conn + peer.router = NewRouter(conf, peer.conn) peer.iface, err = openInterface(conf.Network, conf.PeerIP, netName) if err != nil { diff --git a/peer/router-managemediator.go b/peer/router-managemediator.go new file mode 100644 index 0000000..9778508 --- /dev/null +++ b/peer/router-managemediator.go @@ -0,0 +1,45 @@ +package peer + +import ( + "math/rand" + "time" +) + +func (r *Router) manageMediator() { + var ( + ip = byte(0) + mediators = make([]*route, 0, 32) + ) + + for range time.Tick(8 * time.Second) { + // If the current mediator is up, keep it. + route := r.routes[ip].Load() + if route != nil && route.Up { + continue + } + + // If the current mediator is up, keep it. + mediators = mediators[:0] + + for i := range r.routes { + route := r.routes[i].Load() + if route == nil || !route.Mediator { + continue + } + + mediators = append(mediators, route) + } + + if len(mediators) == 0 { + ip = 0 + } else { + ip = mediators[rand.Intn(len(mediators))].PeerIP + } + + for _, conn := range r.conns { + if conn != nil { + conn.UpdateMediator(ip) + } + } + } +} diff --git a/peer/router-ping.go b/peer/router-ping.go deleted file mode 100644 index a624188..0000000 --- a/peer/router-ping.go +++ /dev/null @@ -1,18 +0,0 @@ -package peer - -type RoutingPingReq struct { - PeerIP byte - Type byte // 0 => local, 1 => direct, 2 => Via - Addr []byte - Port int - SentAt int64 // unix milli -} - -type RoutingPingResp struct { - PeerIP byte - Type byte // 0 => local, 1 => direct, 2 => Via - Addr []byte - Port int - SentAt int64 - RecvdAt int64 -} diff --git a/peer/router-ping_test.go b/peer/router-ping_test.go new file mode 100644 index 0000000..69f4480 --- /dev/null +++ b/peer/router-ping_test.go @@ -0,0 +1,37 @@ +package peer + +import ( + "reflect" + "testing" +) + +func TestMarshalParsePingReq(t *testing.T) { + in := Ping{ + SentAt: 4553252, + } + + buf := make([]byte, PING_SIZE) + in.Marshal(buf) + + out := Ping{} + out.Parse(buf) + if !reflect.DeepEqual(in, out) { + t.Fatal(in, out) + } +} + +func TestMarshalParsePingResp(t *testing.T) { + in := Pong{ + SentAt: 4553252, + RecvdAt: 4553253, + } + + buf := make([]byte, PONG_SIZE) + in.Marshal(buf) + + out := Pong{} + out.Parse(buf) + if !reflect.DeepEqual(in, out) { + t.Fatal(in, out) + } +} diff --git a/peer/router-pollhub.go b/peer/router-pollhub.go new file mode 100644 index 0000000..eb082b6 --- /dev/null +++ b/peer/router-pollhub.go @@ -0,0 +1,63 @@ +package peer + +import ( + "encoding/json" + "io" + "log" + "net/http" + "net/url" + "time" + "vppn/m" +) + +func (r *Router) pollHub() { + u, err := url.Parse(r.conf.HubAddress) + if err != nil { + log.Fatalf("Failed to parse hub address %s: %v", r.conf.HubAddress, err) + } + u.Path = "/peer/fetch-state/" + + client := &http.Client{Timeout: 8 * time.Second} + + req := &http.Request{ + Method: http.MethodGet, + URL: u, + Header: http.Header{}, + } + req.SetBasicAuth("", r.conf.APIKey) + + // TODO: Before we start polling, load state from the file system. + r._pollHub(client, req) + + for range time.Tick(32 * time.Second) { + r._pollHub(client, req) + } +} + +func (r *Router) _pollHub(client *http.Client, req *http.Request) { + var state m.NetworkState + + log.Printf("Fetching peer state from %s...", r.conf.HubAddress) + resp, err := client.Do(req) + if err != nil { + log.Printf("Failed to fetch peer state: %v", err) + return + } + body, err := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + log.Printf("Failed to read body from hub: %v", err) + return + } + + if err := json.Unmarshal(body, &state); err != nil { + log.Printf("Failed to unmarshal response from hub: %v", err) + return + } + + for i, peer := range state.Peers { + if r.conns[i] != nil { + r.conns[i].UpdatePeer(peerUpdate{PeerIP: byte(i), Peer: peer}) + } + } +} diff --git a/peer/router-types.go b/peer/router-types.go new file mode 100644 index 0000000..20bd296 --- /dev/null +++ b/peer/router-types.go @@ -0,0 +1,74 @@ +package peer + +import ( + "net/netip" + "unsafe" + "vppn/m" +) + +// ---------------------------------------------------------------------------- + +// A route is used by a peer to send or receive packets. A peer won't send +// packets on a route that isn't up, but will process incoming packets on +// that route. +type route struct { + PeerIP byte + Up bool + Mediator bool + SignPubKey []byte + EncSharedKey []byte // Shared key for encoding / decoding packets. + Addr netip.AddrPort // Address to send to. + ViaIP byte // If != 0, this is a forwarding address. +} + +type peerUpdate struct { + PeerIP byte + *m.Peer // nil => delete. +} + +// ---------------------------------------------------------------------------- + +// Wrapper for routing packets. +type wrapper[T any] struct { + T T + Nonce Nonce + SrcAddr netip.AddrPort +} + +func newWrapper[T any](srcAddr netip.AddrPort, nonce Nonce) wrapper[T] { + return wrapper[T]{ + SrcAddr: srcAddr, + Nonce: nonce, + } +} + +// ---------------------------------------------------------------------------- + +type Ping struct { + SentAt int64 // unix milli +} + +func (p *Ping) Parse(buf []byte) { + p.SentAt = *(*int64)(unsafe.Pointer(&buf[0])) +} + +func (p Ping) Marshal(buf []byte) { + *(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt +} + +// ---------------------------------------------------------------------------- + +type Pong struct { + SentAt int64 // unix mili + RecvdAt int64 // unix mili +} + +func (p *Pong) Parse(buf []byte) { + p.SentAt = *(*int64)(unsafe.Pointer(&buf[0])) + p.RecvdAt = *(*int64)(unsafe.Pointer(&buf[8])) +} + +func (p *Pong) Marshal(buf []byte) { + *(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt + *(*int64)(unsafe.Pointer(&buf[8])) = p.RecvdAt +} diff --git a/peer/router.go b/peer/router.go index 507804a..60b67ac 100644 --- a/peer/router.go +++ b/peer/router.go @@ -1,123 +1,86 @@ package peer import ( - "encoding/json" - "io" "log" - "net/http" + "net" "net/netip" - "net/url" "sync/atomic" - "time" "vppn/m" ) -var zeroAddrPort netip.AddrPort - -type routeInfo struct { - Up bool - Route route -} - type Router struct { - conf m.PeerConfig - routes [256]*atomic.Pointer[routeInfo] + conf m.PeerConfig + + // Routes used by the peer. + conns [256]*connHandler + routes [256]*atomic.Pointer[route] } -func NewRouter(conf m.PeerConfig) *Router { - rm := &Router{ - conf: conf, +func NewRouter(conf m.PeerConfig, conn *net.UDPConn) *Router { + r := &Router{conf: conf} + + for i := range r.routes { + r.routes[i] = &atomic.Pointer[route]{} } - for i := range rm.routes { - rm.routes[i] = &atomic.Pointer[routeInfo]{} - rm.routes[i].Store(&routeInfo{}) + _, isServer := netip.AddrFromSlice(conf.PublicIP) + + sender := newConnSender(conn, conf.PeerIP, STREAM_ROUTING, conf.SignPrivKey) + + for i := range r.conns { + if byte(i) != conf.PeerIP { + r.conns[i] = newConnHandler( + isServer, + byte(i), + r.routes, + conf.EncPrivKey, + newSafeConnSender(sender)) + } } - go rm.pollHub() - return rm + go r.pollHub() + if !isServer { + go r.manageMediator() + } + + return r } +// ---------------------------------------------------------------------------- +// Peer Methods +// ---------------------------------------------------------------------------- + func (rm *Router) GetRoute(ip byte) *route { - if route := rm.routes[ip].Load(); route != nil && route.Up { - return &route.Route - } - return nil + return rm.routes[ip].Load() } -func (rm *Router) pollHub() { - u, err := url.Parse(rm.conf.HubAddress) - if err != nil { - log.Fatalf("Failed to parse hub address %s: %v", rm.conf.HubAddress, err) - } - u.Path = "/peer/fetch-state/" - - client := &http.Client{Timeout: 8 * time.Second} - - req := &http.Request{ - Method: http.MethodGet, - URL: u, - Header: http.Header{}, - } - req.SetBasicAuth("", rm.conf.APIKey) - - rm._pollHub(client, req) - - for range time.Tick(time.Minute) { - rm._pollHub(client, req) - } -} - -func (rm *Router) _pollHub(client *http.Client, req *http.Request) { - var state m.NetworkState - - log.Printf("Fetching peer state from %s...", rm.conf.HubAddress) - resp, err := client.Do(req) - if err != nil { - log.Printf("Failed to fetch peer state: %v", err) - return - } - body, err := io.ReadAll(resp.Body) - _ = resp.Body.Close() - if err != nil { - log.Printf("Failed to read body: %v", err) +func (r *Router) HandlePacket(src netip.AddrPort, nonce Nonce, data []byte) { + if nonce.SourceIP == r.conf.PeerIP { + log.Printf("Packet to self...") return } - if err := json.Unmarshal(body, &state); err != nil { - log.Printf("Failed to unmarshal response from hub: %v", err) - return - } - - for i, peer := range state.Peers { - if peer == nil { - continue + switch nonce.PacketType { + case PACKET_TYPE_PING: + if len(data) < PING_SIZE { + log.Printf("Short ping request: %d", len(data)) + return } - route := rm.routes[i].Load() - rm.routes[i].Store(rm.updateRoute(route, peer)) + w := newWrapper[Ping](src, nonce) + w.T.Parse(data) + + r.conns[nonce.SourceIP].HandlePing(w) + + case PACKET_TYPE_PONG: + if len(data) < PONG_SIZE { + log.Printf("Short ping response: %d", len(data)) + return + } + w := newWrapper[Pong](src, nonce) + w.T.Parse(data) + r.conns[nonce.SourceIP].HandlePong(w) + + default: + log.Printf("Unknown routing packet type: %d", nonce.PacketType) } } - -func (rm *Router) updateRoute(routePtr *routeInfo, peer *m.Peer) *routeInfo { - if peer == nil { - return &routeInfo{} - } - - route := *routePtr - - addr, ok := netip.AddrFromSlice(peer.IP) - if !ok { - return &routeInfo{} - } - - route.Up = true - route.Route.Addr = netip.AddrPortFrom(addr, peer.Port) - route.Route.Mediator = peer.Mediator - route.Route.ViaIP = 0 - if len(route.Route.SignPubKey) == 0 { - route.Route.SignPubKey = peer.SignPubKey - route.Route.EncSharedKey = computeSharedKey(peer.EncPubKey, rm.conf.EncPrivKey) - } - - return &route -} diff --git a/peer/types.go b/peer/types.go deleted file mode 100644 index 415e9d6..0000000 --- a/peer/types.go +++ /dev/null @@ -1,22 +0,0 @@ -package peer - -import ( - "net/netip" -) - -type route struct { - Mediator bool // Route is via a mediator. - SignPubKey []byte - EncSharedKey []byte // Shared key for encoding / decoding packets. - Addr netip.AddrPort // Address to send to. - ViaIP byte // If != 0, this is a forwarding address. -} - -type Nonce struct { - Counter uint64 - SourceIP byte - ViaIP byte - DestIP byte - StreamID byte // The stream, see STREAM_* constants - PacketType byte // The packet type. See PACKET_* constants. -}
Peer IP{{.PeerIP}}
Name{{.Name}}
IP{{ipToString .IP}}
Public IP{{ipToString .PublicIP}}
Port{{.Port}}
Mediator{{if .Mediator}}T{{else}}F{{end}}
API Key{{.APIKey}}