From fdf0066fc2b50360790dbde7ac09da685b3be3fe Mon Sep 17 00:00:00 2001
From: jdl
Date: Thu, 12 Dec 2024 21:11:17 +0100
Subject: [PATCH] WIP
---
README.md | 57 ++--
cmd/vppn/build.sh | 1 +
hub/api/api.go | 11 +-
hub/api/db/generated.go | 19 +-
hub/api/db/sanitize-validate.go | 10 +-
hub/api/db/tables.defs | 3 +-
hub/api/migrations/2024-11-30-init.sql | 3 +-
hub/handler.go | 15 +
hub/handlers.go | 17 +-
hub/routes.go | 4 +-
hub/templates/admin-peer-create.html | 4 +-
hub/templates/admin-peer-delete.html | 4 +-
hub/templates/admin-peer-edit.html | 4 +-
hub/templates/admin-peer-list.html | 4 +-
hub/templates/admin-peer-view.html | 2 +-
m/models.go | 13 +-
peer/conn-states.dot | 9 +
peer/conndata.go | 83 ++++++
peer/connhandler.go | 121 ++++++++
peer/connsender.go | 83 ++++++
peer/connstate.go | 393 +++++++++++++++++++++++++
peer/globals.go | 6 +
peer/main.go | 7 -
peer/nonce.go | 14 +-
peer/nonce_test.go | 4 +-
peer/peer-ifreader.go | 76 +----
peer/peer-netreader.go | 70 ++---
peer/peer.go | 18 +-
peer/router-managemediator.go | 45 +++
peer/router-ping.go | 18 --
peer/router-ping_test.go | 37 +++
peer/router-pollhub.go | 63 ++++
peer/router-types.go | 74 +++++
peer/router.go | 155 ++++------
peer/types.go | 22 --
35 files changed, 1138 insertions(+), 331 deletions(-)
create mode 100644 peer/conn-states.dot
create mode 100644 peer/conndata.go
create mode 100644 peer/connhandler.go
create mode 100644 peer/connsender.go
create mode 100644 peer/connstate.go
create mode 100644 peer/router-managemediator.go
delete mode 100644 peer/router-ping.go
create mode 100644 peer/router-ping_test.go
create mode 100644 peer/router-pollhub.go
create mode 100644 peer/router-types.go
delete mode 100644 peer/types.go
diff --git a/README.md b/README.md
index d642881..b7a7612 100644
--- a/README.md
+++ b/README.md
@@ -4,8 +4,9 @@
* Peer: router: create process for managing the routing table
* Peer: router: track mediators, enable / disable ...
-* Hub: track peer last-seen timestamp
+* Hub: track peer last-seen timestamp (?)
* Peer: local peer discovery - part of RoutingProcessor
+* Peer: update hub w/ latest port on startup
## Principles
@@ -13,24 +14,21 @@
* Simple setup: via setup link from the hub.
* Each peer has full network state replicated from the hub.
-## Design
+## Routing
-* Append nonce to end of packet
- * Then it's readable whether signed or unsiged
-* Types of packets to send:
- * standard: encrypt and send
- * Forward via: encrypt, sign and send
- * Forward to: send
-* Type of packeting read from interface:
- * Forward to: check signature
- * Forwarded, standard
+* Routing is different for public vs non-public peers
+ * Public: routes are initialized via incoming ping requests
+ * NonPub: routes are initialized via incoming ping responses
-Incoming from net:
- * Data for iface
- * Packet for forward
- * Packet for routingHandler
-* Incoming from iface:
- * Data for peer
+A non-public peer needs to maintain connections with every public peer.
+
+* Sending:
+ * Public: send to address
+ * Non-public: send to a mediator
+
+* Pings:
+ * Servers don't need to ping
+ * Clients need to ping all public and local peers to keep connections open
## Hub Server Configuration
@@ -84,3 +82,28 @@ journalctl -f -u hub -n 100
```
Sign-in and configure.
+
+## Peer Configuration
+
+Install the binary somewhere, for example `~/bin/vppn`.
+
+Create systemd file in `/etc/systemd/system/vppn.service`.
+
+
+```
+Description=vppn
+Requires=network.target
+
+[Service]
+AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
+Type=simple
+User=user
+WorkingDirectory=/home/user/
+ExecStart=/home/user/vppn -name vppn
+Restart=always
+RestartSec=8
+TimeoutStopSec=24
+
+[Install]
+WantedBy=default.target
+```
diff --git a/cmd/vppn/build.sh b/cmd/vppn/build.sh
index ff74f39..96a5e25 100755
--- a/cmd/vppn/build.sh
+++ b/cmd/vppn/build.sh
@@ -2,3 +2,4 @@
go build
sudo setcap cap_net_admin+iep vppn
+sudo setcap cap_net_bind_service+iep vppn
diff --git a/hub/api/api.go b/hub/api/api.go
index 18b00f3..053c574 100644
--- a/hub/api/api.go
+++ b/hub/api/api.go
@@ -144,7 +144,7 @@ func (a *API) Session_SignIn(s *Session, pwd string) error {
type PeerCreateArgs struct {
Name string
- IP []byte
+ PublicIP []byte
Port uint16
Mediator bool
}
@@ -209,9 +209,10 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) {
peer := &Peer{
PeerIP: peerIP,
+ Version: idgen.NextID(0),
APIKey: idgen.NewToken(),
Name: args.Name,
- IP: args.IP,
+ PublicIP: args.PublicIP,
Port: args.Port,
Mediator: args.Mediator,
EncPubKey: encPubKey[:],
@@ -229,7 +230,7 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) {
HubAddress: conf.HubAddress,
APIKey: peer.APIKey,
Network: conf.VPNNetwork,
- IP: peer.IP,
+ PublicIP: peer.PublicIP,
Port: peer.Port,
Mediator: peer.Mediator,
EncPubKey: encPubKey[:],
@@ -240,6 +241,10 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) {
}
func (a *API) Peer_Update(p *Peer) error {
+ a.lock.Lock()
+ defer a.lock.Unlock()
+
+ p.Version = idgen.NextID(0)
return db.Peer_Update(a.db, p)
}
diff --git a/hub/api/db/generated.go b/hub/api/db/generated.go
index 3981fe1..a23498d 100644
--- a/hub/api/db/generated.go
+++ b/hub/api/db/generated.go
@@ -308,16 +308,17 @@ func Session_List(
type Peer struct {
PeerIP byte
+ Version int64
APIKey string
Name string
- IP []byte
+ PublicIP []byte
Port uint16
Mediator bool
EncPubKey []byte
SignPubKey []byte
}
-const Peer_SelectQuery = "SELECT PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey FROM peers"
+const Peer_SelectQuery = "SELECT PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey FROM peers"
func Peer_Insert(
tx TX,
@@ -328,7 +329,7 @@ func Peer_Insert(
return err
}
- _, err = tx.Exec("INSERT INTO peers(PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey) VALUES(?,?,?,?,?,?,?,?)", row.PeerIP, row.APIKey, row.Name, row.IP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey)
+ _, err = tx.Exec("INSERT INTO peers(PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey) VALUES(?,?,?,?,?,?,?,?,?)", row.PeerIP, row.Version, row.APIKey, row.Name, row.PublicIP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey)
return err
}
@@ -341,7 +342,7 @@ func Peer_Update(
return err
}
- result, err := tx.Exec("UPDATE peers SET Name=?,IP=?,Port=?,Mediator=? WHERE PeerIP=?", row.Name, row.IP, row.Port, row.Mediator, row.PeerIP)
+ result, err := tx.Exec("UPDATE peers SET Version=?,Name=?,PublicIP=?,Port=?,Mediator=? WHERE PeerIP=?", row.Version, row.Name, row.PublicIP, row.Port, row.Mediator, row.PeerIP)
if err != nil {
return err
}
@@ -369,7 +370,7 @@ func Peer_UpdateFull(
return err
}
- result, err := tx.Exec("UPDATE peers SET APIKey=?,Name=?,IP=?,Port=?,Mediator=?,EncPubKey=?,SignPubKey=? WHERE PeerIP=?", row.APIKey, row.Name, row.IP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey, row.PeerIP)
+ result, err := tx.Exec("UPDATE peers SET Version=?,APIKey=?,Name=?,PublicIP=?,Port=?,Mediator=?,EncPubKey=?,SignPubKey=? WHERE PeerIP=?", row.Version, row.APIKey, row.Name, row.PublicIP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey, row.PeerIP)
if err != nil {
return err
}
@@ -419,8 +420,8 @@ func Peer_Get(
err error,
) {
row = &Peer{}
- r := tx.QueryRow("SELECT PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey FROM peers WHERE PeerIP=?", PeerIP)
- err = r.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
+ r := tx.QueryRow("SELECT PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey FROM peers WHERE PeerIP=?", PeerIP)
+ err = r.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
return
}
@@ -434,7 +435,7 @@ func Peer_GetWhere(
) {
row = &Peer{}
r := tx.QueryRow(query, args...)
- err = r.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
+ err = r.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
return
}
@@ -454,7 +455,7 @@ func Peer_Iterate(
defer rows.Close()
for rows.Next() {
row := &Peer{}
- err := rows.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
+ err := rows.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
if !yield(row, err) {
return
}
diff --git a/hub/api/db/sanitize-validate.go b/hub/api/db/sanitize-validate.go
index b737866..b4ed8ff 100644
--- a/hub/api/db/sanitize-validate.go
+++ b/hub/api/db/sanitize-validate.go
@@ -44,10 +44,10 @@ func Session_Validate(s *Session) error {
func Peer_Sanitize(p *Peer) {
p.Name = strings.TrimSpace(p.Name)
- if len(p.IP) != 0 {
- addr, ok := netip.AddrFromSlice(p.IP)
+ if len(p.PublicIP) != 0 {
+ addr, ok := netip.AddrFromSlice(p.PublicIP)
if ok && addr.Is4() {
- p.IP = addr.AsSlice()
+ p.PublicIP = addr.AsSlice()
}
}
if p.Port == 0 {
@@ -56,8 +56,8 @@ func Peer_Sanitize(p *Peer) {
}
func Peer_Validate(p *Peer) error {
- if len(p.IP) > 0 {
- _, ok := netip.AddrFromSlice(p.IP)
+ if len(p.PublicIP) > 0 {
+ _, ok := netip.AddrFromSlice(p.PublicIP)
if !ok {
return ErrInvalidIP
}
diff --git a/hub/api/db/tables.defs b/hub/api/db/tables.defs
index ddc51b5..c9e35e2 100644
--- a/hub/api/db/tables.defs
+++ b/hub/api/db/tables.defs
@@ -15,9 +15,10 @@ TABLE sessions OF Session NoUpdate (
TABLE peers OF Peer (
PeerIP byte PK,
+ Version int64,
APIKey string NoUpdate,
Name string,
- IP []byte,
+ PublicIP []byte,
Port uint16,
Mediator bool,
EncPubKey []byte NoUpdate,
diff --git a/hub/api/migrations/2024-11-30-init.sql b/hub/api/migrations/2024-11-30-init.sql
index f61f0fb..eb5da37 100644
--- a/hub/api/migrations/2024-11-30-init.sql
+++ b/hub/api/migrations/2024-11-30-init.sql
@@ -17,9 +17,10 @@ CREATE INDEX sessions_last_seen_index ON sessions(LastSeenAt);
CREATE TABLE peers (
PeerIP INTEGER NOT NULL PRIMARY KEY, -- Final byte.
+ Version INTEGER NOT NULL,
APIKey TEXT NOT NULL UNIQUE,
Name TEXT NOT NULL UNIQUE, -- For humans.
- IP BLOB NOT NULL,
+ PublicIP BLOB NOT NULL,
Port INTEGER NOT NULL,
Mediator INTEGER NOT NULL DEFAULT 0, -- Boolean if peer will forward packets. Must also have public address.
EncPubKey BLOB NOT NULL,
diff --git a/hub/handler.go b/hub/handler.go
index c551ef4..ffaf6fc 100644
--- a/hub/handler.go
+++ b/hub/handler.go
@@ -64,3 +64,18 @@ func (app *App) handleSignedIn(pattern string, fn handlerFunc) {
return fn(s, w, r)
})
}
+
+type peerHandlerFunc func(w http.ResponseWriter, r *http.Request) error
+
+func (app *App) handlePeer(pattern string, fn peerHandlerFunc) {
+ wrapped := func(w http.ResponseWriter, r *http.Request) {
+ r.ParseForm()
+ if err := fn(w, r); err != nil {
+ http.Error(w, err.Error(), http.StatusInternalServerError)
+ }
+ }
+
+ app.mux.HandleFunc(pattern,
+ webutil.WithLogging(
+ wrapped))
+}
diff --git a/hub/handlers.go b/hub/handlers.go
index e2e8e70..f24aaaa 100644
--- a/hub/handlers.go
+++ b/hub/handlers.go
@@ -165,7 +165,7 @@ func (a *App) _adminPeerCreateSubmit(s *api.Session, w http.ResponseWriter, r *h
args := api.PeerCreateArgs{}
err := webutil.NewFormScanner(r.Form).
Scan("Name", &args.Name).
- Scan("IP", &ipStr).
+ Scan("PublicIP", &ipStr).
Scan("Port", &args.Port).
Scan("Mediator", &args.Mediator).
Error()
@@ -173,7 +173,7 @@ func (a *App) _adminPeerCreateSubmit(s *api.Session, w http.ResponseWriter, r *h
return err
}
- if args.IP, err = stringToIP(ipStr); err != nil {
+ if args.PublicIP, err = stringToIP(ipStr); err != nil {
return err
}
@@ -247,7 +247,7 @@ func (a *App) _adminPeerEditSubmit(s *api.Session, w http.ResponseWriter, r *htt
}
err = webutil.NewFormScanner(r.Form).
Scan("Name", &peer.Name).
- Scan("IP", &ipStr).
+ Scan("PublicIP", &ipStr).
Scan("Port", &peer.Port).
Scan("Mediator", &peer.Mediator).
Error()
@@ -255,7 +255,7 @@ func (a *App) _adminPeerEditSubmit(s *api.Session, w http.ResponseWriter, r *htt
return err
}
- if peer.IP, err = stringToIP(ipStr); err != nil {
+ if peer.PublicIP, err = stringToIP(ipStr); err != nil {
return err
}
@@ -298,7 +298,7 @@ func (a *App) _adminPeerDeleteSubmit(s *api.Session, w http.ResponseWriter, r *h
return a.redirect(w, r, "/admin/peer/list/")
}
-func (a *App) _peerCreate(s *api.Session, w http.ResponseWriter, r *http.Request) error {
+func (a *App) _peerCreate(w http.ResponseWriter, r *http.Request) error {
code := r.FormValue("Code")
conf, err := a.api.Peer_Create(code)
if err != nil {
@@ -308,7 +308,7 @@ func (a *App) _peerCreate(s *api.Session, w http.ResponseWriter, r *http.Request
return a.sendJSON(w, conf)
}
-func (a *App) _peerFetchState(s *api.Session, w http.ResponseWriter, r *http.Request) error {
+func (a *App) _peerFetchState(w http.ResponseWriter, r *http.Request) error {
_, apiKey, ok := r.BasicAuth()
if !ok {
log.Printf("1")
@@ -333,15 +333,16 @@ func (a *App) _peerFetchState(s *api.Session, w http.ResponseWriter, r *http.Req
HubAddress: conf.HubAddress,
Network: conf.VPNNetwork,
PeerIP: peer.PeerIP,
- IP: peer.IP,
+ PublicIP: peer.PublicIP,
Port: peer.Port,
}
for _, p := range peers {
state.Peers[p.PeerIP] = &m.Peer{
PeerIP: p.PeerIP,
+ Version: p.Version,
Name: p.Name,
- IP: p.IP,
+ PublicIP: p.PublicIP,
Port: p.Port,
Mediator: p.Mediator,
EncPubKey: p.EncPubKey,
diff --git a/hub/routes.go b/hub/routes.go
index fe026d1..0fa47f2 100644
--- a/hub/routes.go
+++ b/hub/routes.go
@@ -26,6 +26,6 @@ func (a *App) registerRoutes() {
a.handleSignedIn("GET /admin/peer/delete/", a._adminPeerDelete)
a.handleSignedIn("POST /admin/peer/delete/", a._adminPeerDeleteSubmit)
- a.handleNotSignedIn("GET /peer/create/", a._peerCreate)
- a.handleNotSignedIn("GET /peer/fetch-state/", a._peerFetchState)
+ a.handlePeer("GET /peer/create/", a._peerCreate)
+ a.handlePeer("GET /peer/fetch-state/", a._peerFetchState)
}
diff --git a/hub/templates/admin-peer-create.html b/hub/templates/admin-peer-create.html
index 2eb6c70..f2f0c39 100644
--- a/hub/templates/admin-peer-create.html
+++ b/hub/templates/admin-peer-create.html
@@ -8,8 +8,8 @@
-
-
+
+
diff --git a/hub/templates/admin-peer-delete.html b/hub/templates/admin-peer-delete.html
index 4e0618b..a330eb8 100644
--- a/hub/templates/admin-peer-delete.html
+++ b/hub/templates/admin-peer-delete.html
@@ -13,8 +13,8 @@
-
-
+
+
diff --git a/hub/templates/admin-peer-edit.html b/hub/templates/admin-peer-edit.html
index 3e04184..c6081b1 100644
--- a/hub/templates/admin-peer-edit.html
+++ b/hub/templates/admin-peer-edit.html
@@ -13,8 +13,8 @@
-
-
+
+
diff --git a/hub/templates/admin-peer-list.html b/hub/templates/admin-peer-list.html
index 4f09f1a..4acadc7 100644
--- a/hub/templates/admin-peer-list.html
+++ b/hub/templates/admin-peer-list.html
@@ -11,7 +11,7 @@
PeerIP |
Name |
- IP |
+ Public IP |
Port |
Mediator |
@@ -25,7 +25,7 @@
{{.Name}} |
- {{ipToString .IP}} |
+ {{ipToString .PublicIP}} |
{{.Port}} |
{{if .Mediator}}T{{else}}F{{end}} |
diff --git a/hub/templates/admin-peer-view.html b/hub/templates/admin-peer-view.html
index 09f254b..89ff754 100644
--- a/hub/templates/admin-peer-view.html
+++ b/hub/templates/admin-peer-view.html
@@ -10,7 +10,7 @@
Peer IP | {{.PeerIP}} |
Name | {{.Name}} |
- IP | {{ipToString .IP}} |
+ Public IP | {{ipToString .PublicIP}} |
Port | {{.Port}} |
Mediator | {{if .Mediator}}T{{else}}F{{end}} |
API Key | {{.APIKey}} |
diff --git a/m/models.go b/m/models.go
index d6d3103..29c39f9 100644
--- a/m/models.go
+++ b/m/models.go
@@ -6,7 +6,7 @@ type PeerConfig struct {
HubAddress string
Network []byte
APIKey string
- IP []byte
+ PublicIP []byte
Port uint16
Mediator bool
EncPubKey []byte
@@ -17,8 +17,9 @@ type PeerConfig struct {
type Peer struct {
PeerIP byte
+ Version int64
Name string
- IP []byte
+ PublicIP []byte
Port uint16
Mediator bool
EncPubKey []byte
@@ -29,10 +30,10 @@ type NetworkState struct {
HubAddress string
// The requester's data:
- Network []byte
- PeerIP byte
- IP []byte
- Port uint16
+ Network []byte
+ PeerIP byte
+ PublicIP []byte
+ Port uint16
// All peer data.
Peers [256]*Peer
diff --git a/peer/conn-states.dot b/peer/conn-states.dot
new file mode 100644
index 0000000..3c8c5f7
--- /dev/null
+++ b/peer/conn-states.dot
@@ -0,0 +1,9 @@
+digraph d {
+ init -> null;
+ init -> unconnectedServer;
+ init -> unconnectedClient;
+ init -> unconnectedMediated;
+ unconnectedServer -> connectedServer;
+ unconnectedClient -> connectedClient;
+ unconnectedMediated -> connectedMediated;
+}
\ No newline at end of file
diff --git a/peer/conndata.go b/peer/conndata.go
new file mode 100644
index 0000000..dbbc29c
--- /dev/null
+++ b/peer/conndata.go
@@ -0,0 +1,83 @@
+package peer
+
+import (
+ "net/netip"
+ "sync/atomic"
+ "time"
+ "vppn/m"
+)
+
+const (
+ pingInterval = time.Second * 8
+ timeoutInterval = 32 * time.Second
+)
+
+type connData struct {
+ // Shared data.
+ routes [256]*atomic.Pointer[route]
+ route *atomic.Pointer[route]
+
+ // Local data.
+ mediatorIP byte
+
+ // Peer data.
+ server bool // Never changes.
+ peerIP byte // Never changes.
+ encPrivKey []byte // Never changes.
+
+ peer *m.Peer // From hub.
+ encSharedKey []byte // From hub + private key.
+ publicAddr netip.AddrPort // From hub.
+
+ pingTimer *time.Timer
+ timeoutTimer *time.Timer
+
+ // Routing data.
+ addr netip.AddrPort
+ viaIP byte
+ up bool
+
+ // For sending.
+ buf []byte
+ sender *safeConnSender
+}
+
+func (d *connData) Route() *route {
+ return &route{
+ PeerIP: d.peerIP,
+ Up: d.up,
+ Mediator: d.peer.Mediator,
+ SignPubKey: d.peer.SignPubKey,
+ EncSharedKey: d.encSharedKey,
+ Addr: d.addr,
+ ViaIP: d.viaIP,
+ }
+}
+
+func (d *connData) HandlePeerUpdate(state connState, update peerUpdate) connState {
+ if d.peer != nil && update.Peer != nil && d.peer.Version == update.Peer.Version {
+ return state
+ }
+ if d.peer == nil && update.Peer == nil {
+ return state
+ }
+ return newConnStateFromPeer(update, d)
+}
+
+func (d *connData) HandleSendPing() {
+ route := d.route.Load()
+ req := Ping{SentAt: time.Now().UnixMilli()}
+ req.Marshal(d.buf[:PING_SIZE])
+ d.sender.send(PACKET_TYPE_PING, d.buf[:PING_SIZE], route)
+ d.pingTimer.Reset(pingInterval)
+}
+
+func (d *connData) sendPong(w wrapper[Ping]) {
+ route := d.route.Load()
+ pong := Pong{
+ SentAt: w.T.SentAt,
+ RecvdAt: time.Now().UnixMilli(),
+ }
+ pong.Marshal(d.buf[:PONG_SIZE])
+ d.sender.send(PACKET_TYPE_PONG, d.buf[:PONG_SIZE], route)
+}
diff --git a/peer/connhandler.go b/peer/connhandler.go
new file mode 100644
index 0000000..4eb2749
--- /dev/null
+++ b/peer/connhandler.go
@@ -0,0 +1,121 @@
+package peer
+
+import (
+ "fmt"
+ "log"
+ "runtime/debug"
+ "sync/atomic"
+ "time"
+)
+
+type connHandler struct {
+ // Communication.
+ mediatorUpdates chan byte
+ peerUpdates chan peerUpdate
+ pings chan wrapper[Ping]
+ pongs chan wrapper[Pong]
+
+ data *connData
+}
+
+func newConnHandler(
+ server bool,
+ peerIP byte,
+ routes [256]*atomic.Pointer[route],
+ encPrivKey []byte,
+ sender *safeConnSender,
+) *connHandler {
+ d := &connData{
+ server: server,
+ pingTimer: time.NewTimer(pingInterval),
+ timeoutTimer: time.NewTimer(timeoutInterval),
+ routes: routes,
+ route: routes[peerIP],
+ peerIP: peerIP,
+ encPrivKey: encPrivKey,
+ buf: make([]byte, BUFFER_SIZE),
+ sender: sender,
+ }
+
+ h := &connHandler{
+ mediatorUpdates: make(chan byte, 1),
+ peerUpdates: make(chan peerUpdate, 1),
+ pings: make(chan wrapper[Ping], 1),
+ pongs: make(chan wrapper[Pong], 1),
+ data: d,
+ }
+
+ go h.mainLoop()
+ return h
+}
+
+func (h *connHandler) mainLoop() {
+ defer func() {
+ if r := recover(); r != nil {
+ fmt.Println("stacktrace from panic: \n" + string(debug.Stack()))
+ }
+ }()
+
+ var (
+ data = h.data
+ state = newConnNull(data)
+ name = state.Name()
+ )
+
+ for {
+ select {
+
+ case ip := <-h.mediatorUpdates:
+ state = state.HandleMediatorUpdate(ip)
+
+ case update := <-h.peerUpdates:
+ state = data.HandlePeerUpdate(state, update)
+
+ case w := <-h.pings:
+ state = state.HandlePing(w)
+
+ case w := <-h.pongs:
+ state = state.HandlePong(w)
+
+ case <-data.pingTimer.C:
+ state.HandleSendPing()
+
+ case <-data.timeoutTimer.C:
+ log.Printf("[%s] Connection timeout.", state.Name())
+ state = state.HandleTimeout()
+ }
+
+ if state.Name() != name {
+ log.Printf("[%03d] STATE: %s --> %s", data.peerIP, name, state.Name())
+ name = state.Name()
+ }
+ }
+}
+
+func (c *connHandler) UpdateMediator(ip byte) {
+ select {
+ case c.mediatorUpdates <- ip:
+ default:
+ }
+}
+
+func (c *connHandler) HandlePing(w wrapper[Ping]) {
+ select {
+ case c.pings <- w:
+ default:
+ }
+}
+
+func (c *connHandler) HandlePong(w wrapper[Pong]) {
+ select {
+ case c.pongs <- w:
+ default:
+ }
+}
+
+func (c *connHandler) UpdatePeer(update peerUpdate) {
+ select {
+ case c.peerUpdates <- update:
+ default:
+ }
+}
diff --git a/peer/connsender.go b/peer/connsender.go
new file mode 100644
index 0000000..1970d08
--- /dev/null
+++ b/peer/connsender.go
@@ -0,0 +1,83 @@
+package peer
+
+import (
+ "log"
+ "net"
+ "runtime/debug"
+ "sync"
+ "vppn/fasttime"
+)
+
+type connSender struct {
+ conn *net.UDPConn
+ sourceIP byte
+ streamID byte
+ encrypted []byte
+ nonceBuf []byte
+ counterTS uint64
+ counter uint64
+ signingKey []byte
+}
+
+func newConnSender(conn *net.UDPConn, srcIP, streamID byte, signingPrivKey []byte) *connSender {
+ return &connSender{
+ conn: conn,
+ sourceIP: srcIP,
+ streamID: streamID,
+ encrypted: make([]byte, BUFFER_SIZE),
+ nonceBuf: make([]byte, NONCE_SIZE),
+ signingKey: signingPrivKey,
+ }
+}
+
+func (cs *connSender) send(packetType byte, packet []byte, route *route) {
+ now := uint64(fasttime.Now())
+ if cs.counterTS < now {
+ cs.counterTS = now
+ cs.counter = now << 30
+ }
+
+ cs.counter++
+
+ nonce := Nonce{
+ Counter: cs.counter,
+ SourceIP: cs.sourceIP,
+ ViaIP: route.ViaIP,
+ DestIP: route.PeerIP,
+ StreamID: cs.streamID,
+ PacketType: packetType,
+ }
+
+ nonce.Marshal(cs.nonceBuf)
+
+ encrypted := encryptPacket(route.EncSharedKey, cs.nonceBuf, packet, cs.encrypted)
+
+ var toSend []byte
+ if route.ViaIP != 0 {
+ toSend = signPacket(cs.signingKey, encrypted, packet)
+ } else {
+ toSend = encrypted
+ }
+
+ log.Printf("Sending to %v: %+v", route.Addr, nonce)
+ if _, err := cs.conn.WriteToUDPAddrPort(toSend, route.Addr); err != nil {
+ log.Fatalf("Failed to write UDP packet: %v\n%s", err, debug.Stack())
+ }
+}
+
+// ----------------------------------------------------------------------------
+
+type safeConnSender struct {
+ lock sync.Mutex
+ sender *connSender
+}
+
+func newSafeConnSender(sender *connSender) *safeConnSender {
+ return &safeConnSender{sender: sender}
+}
+
+func (s *safeConnSender) send(packetType byte, packet []byte, route *route) {
+ s.lock.Lock()
+ defer s.lock.Unlock()
+ s.sender.send(packetType, packet, route)
+}
diff --git a/peer/connstate.go b/peer/connstate.go
new file mode 100644
index 0000000..441532f
--- /dev/null
+++ b/peer/connstate.go
@@ -0,0 +1,393 @@
+package peer
+
+import (
+ "log"
+ "net/netip"
+ "time"
+ "vppn/m"
+)
+
+func logState(s connState, msg string, args ...any) {
+ log.Printf("["+s.Name()+"] "+msg, args...)
+}
+
+// ----------------------------------------------------------------------------
+
+// The connection state corresponds to what we're connected TO.
+type connState interface {
+ Name() string
+ HandleMediatorUpdate(ip byte) connState
+ HandleSendPing()
+ HandlePing(wrapper[Ping]) connState
+ HandlePong(wrapper[Pong]) connState
+ HandleTimeout() connState
+}
+
+// Helper function.
+
+func newConnStateFromPeer(update peerUpdate, data *connData) connState {
+ peer := update.Peer
+
+ if peer == nil {
+ return newConnNull(data)
+ }
+
+ if _, isPublic := netip.AddrFromSlice(peer.PublicIP); isPublic {
+ return newConnUnconnectedServer(data, peer)
+ } else if data.server {
+ return newConnUnconnectedClient(data, peer)
+ } else {
+ return newConnUnconnectedMediator(data, peer)
+ }
+}
+
+/////////////////////
+// Null Connection //
+/////////////////////
+
+type connNull struct {
+ *connData
+}
+
+func newConnNull(data *connData) connState {
+ c := connNull{data}
+ c.peer = nil
+ c.encSharedKey = nil
+ c.publicAddr = netip.AddrPort{}
+ c.pingTimer.Stop()
+ c.timeoutTimer.Stop()
+ c.addr = c.publicAddr
+ c.viaIP = 0
+ c.up = false
+ c.route.Store(nil)
+ return c
+}
+
+func (c connNull) Name() string {
+ return "NoPeer"
+}
+
+func (c connNull) HandleMediatorUpdate(ip byte) connState {
+ c.mediatorIP = ip
+ return c
+}
+
+func (c connNull) HandlePing(w wrapper[Ping]) connState {
+ logState(c, "Ignoring ping.")
+ return c
+}
+
+func (c connNull) HandlePong(w wrapper[Pong]) connState {
+ logState(c, "Ignoring pong.")
+ return c
+}
+
+func (c connNull) HandleTimeout() connState {
+ logState(c, "Unexpected timeout.")
+ return c
+}
+
+////////////////////////
+// Unconnected Server //
+////////////////////////
+
+type connUnconnectedServer struct {
+ *connData
+}
+
+func newConnUnconnectedServer(data *connData, peer *m.Peer) connState {
+ addr, _ := netip.AddrFromSlice(peer.PublicIP)
+ pubAddr := netip.AddrPortFrom(addr, peer.Port)
+
+ c := connUnconnectedServer{data}
+ c.peer = peer
+ c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
+ c.publicAddr = pubAddr
+ c.pingTimer.Reset(time.Millisecond) // Ping right away to bring up.
+ c.timeoutTimer.Stop() // No timeouts yet.
+ c.addr = c.publicAddr
+ c.viaIP = 0
+ c.up = false
+ c.route.Store(c.Route())
+
+ return c
+}
+
+func (c connUnconnectedServer) Name() string {
+ return "ServerUnconnected"
+}
+
+func (c connUnconnectedServer) HandleMediatorUpdate(ip byte) connState {
+ // Server connection doesn't use a mediator.
+ c.mediatorIP = ip
+ return c
+}
+
+func (c connUnconnectedServer) HandlePing(w wrapper[Ping]) connState {
+ logState(c, "Ignoring ping.")
+ return c
+}
+
+func (c connUnconnectedServer) HandlePong(w wrapper[Pong]) connState {
+ return newConnConnectedServer(c.connData, w)
+}
+
+func (c connUnconnectedServer) HandleTimeout() connState {
+ logState(c, "Unexpected timeout.")
+ return c
+}
+
+//////////////////////
+// Connected Server //
+//////////////////////
+
+type connConnectedServer struct {
+ *connData
+}
+
+func newConnConnectedServer(data *connData, w wrapper[Pong]) connState {
+ c := connConnectedServer{data}
+ c.pingTimer.Reset(pingInterval)
+ c.timeoutTimer.Reset(timeoutInterval)
+ c.addr = w.SrcAddr
+ c.viaIP = 0
+ c.up = true
+ c.route.Store(c.Route())
+ return c
+}
+
+func (c connConnectedServer) Name() string {
+ return "ServerConnected"
+}
+
+func (c connConnectedServer) HandleMediatorUpdate(ip byte) connState {
+ // Server connection doesn't use a mediator.
+ c.mediatorIP = ip
+ return c
+}
+
+func (c connConnectedServer) HandlePing(w wrapper[Ping]) connState {
+ logState(c, "Ignoring ping.")
+ return c
+}
+
+func (c connConnectedServer) HandlePong(w wrapper[Pong]) connState {
+ c.timeoutTimer.Reset(timeoutInterval)
+ return c
+}
+
+func (c connConnectedServer) HandleTimeout() connState {
+ return newConnUnconnectedServer(c.connData, c.peer)
+}
+
+////////////////////////
+// Unconnected Client //
+////////////////////////
+
+type connUnconnectedClient struct {
+ *connData
+}
+
+func newConnUnconnectedClient(data *connData, peer *m.Peer) connState {
+ addr, _ := netip.AddrFromSlice(peer.PublicIP)
+ pubAddr := netip.AddrPortFrom(addr, peer.Port)
+
+ c := connUnconnectedClient{data}
+ c.peer = peer
+ c.publicAddr = pubAddr
+ c.encPrivKey = data.encPrivKey
+ c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
+ c.addr = c.publicAddr
+ c.viaIP = 0
+ c.up = false
+ c.route.Store(c.Route())
+
+ c.pingTimer.Stop() // Conncection is from client => pings incoming.
+ c.timeoutTimer.Stop() // No timeouts yet.
+
+ return c
+}
+
+func (c connUnconnectedClient) Name() string {
+ return "ClientUnconnected"
+}
+
+func (c connUnconnectedClient) HandleMediatorUpdate(ip byte) connState {
+ // Client connection doesn't use a mediator.
+ c.mediatorIP = ip
+ return c
+}
+
+func (c connUnconnectedClient) HandlePing(w wrapper[Ping]) connState {
+ next := newConnConnectedClient(c.connData, w)
+ c.sendPong(w) // Have to send after transitionsing so route is ok.
+ return next
+}
+
+func (c connUnconnectedClient) HandlePong(w wrapper[Pong]) connState {
+ logState(c, "Ignorning pong.")
+ return c
+}
+
+func (c connUnconnectedClient) HandleTimeout() connState {
+ logState(c, "Unexpected timeout.")
+ return c
+}
+
+//////////////////////
+// Connected Client //
+//////////////////////
+
+type connConnectedClient struct {
+ *connData
+}
+
+func newConnConnectedClient(data *connData, w wrapper[Ping]) connState {
+ c := connConnectedClient{data}
+ c.addr = w.SrcAddr
+ c.viaIP = 0
+ c.up = true
+ c.route.Store(c.Route())
+
+ c.pingTimer.Stop() // Conncection is from client => pings incoming.
+ c.timeoutTimer.Reset(timeoutInterval)
+ return c
+}
+
+func (c connConnectedClient) Name() string {
+ return "ClientConnected"
+}
+
+func (c connConnectedClient) HandleMediatorUpdate(ip byte) connState {
+ // Client connection doesn't use a mediator.
+ c.mediatorIP = ip
+ return c
+}
+
+func (c connConnectedClient) HandlePing(w wrapper[Ping]) connState {
+ // The connection is from a client. If the client's address changes, we
+ // should follow that change.
+ if c.addr != w.SrcAddr {
+ c.addr = w.SrcAddr
+ c.route.Store(c.Route())
+ }
+ c.sendPong(w)
+ c.timeoutTimer.Reset(timeoutInterval)
+ return c
+}
+
+func (c connConnectedClient) HandlePong(w wrapper[Pong]) connState {
+ logState(c, "Ignoring pong.")
+ return c
+}
+
+func (c connConnectedClient) HandleTimeout() connState {
+ return newConnUnconnectedClient(c.connData, c.peer)
+}
+
+//////////////////////////
+// Unconnected Mediator //
+//////////////////////////
+
+type connUnconnectedMediator struct {
+ *connData
+}
+
+func newConnUnconnectedMediator(data *connData, peer *m.Peer) connState {
+ addr, _ := netip.AddrFromSlice(peer.PublicIP)
+ pubAddr := netip.AddrPortFrom(addr, peer.Port)
+
+ c := connUnconnectedMediator{data}
+ c.peer = peer
+ c.publicAddr = pubAddr
+ c.encPrivKey = data.encPrivKey
+ c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
+ c.addr = c.publicAddr
+ c.viaIP = 0
+ c.up = false
+ c.route.Store(c.Route())
+
+ c.pingTimer.Stop() // No pings for mediators.
+ c.timeoutTimer.Stop() // No timeouts yet.
+
+ // If we have a mediator route, we can connect.
+ if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
+ return newConnConnectedMediator(data, mRoute)
+ }
+
+ return c
+}
+
+func (c connUnconnectedMediator) Name() string {
+ return "MediatorUnconnected"
+}
+
+func (c connUnconnectedMediator) HandleMediatorUpdate(ip byte) connState {
+ c.mediatorIP = ip
+ if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
+ return newConnConnectedMediator(c.connData, mRoute)
+ }
+ return c
+}
+
+func (c connUnconnectedMediator) HandlePing(w wrapper[Ping]) connState {
+ logState(c, "Ignorning ping.")
+ return c
+}
+
+func (c connUnconnectedMediator) HandlePong(w wrapper[Pong]) connState {
+ logState(c, "Ignorning pong.")
+ return c
+}
+
+func (c connUnconnectedMediator) HandleTimeout() connState {
+ logState(c, "Unexpected timeout.")
+ return c
+}
+
+////////////////////////
+// Connected Mediator //
+////////////////////////
+
+type connConnectedMediator struct {
+ *connData
+}
+
+func newConnConnectedMediator(data *connData, route *route) connState {
+ c := connConnectedMediator{data}
+ c.addr = route.Addr
+ c.viaIP = route.PeerIP
+ c.up = true
+ c.route.Store(c.Route())
+
+ // No pings for mediated routes.
+ c.pingTimer.Stop()
+ c.timeoutTimer.Stop()
+ return c
+}
+
+func (c connConnectedMediator) Name() string {
+ return "MediatorConnected"
+}
+
+func (c connConnectedMediator) HandleMediatorUpdate(ip byte) connState {
+ c.mediatorIP = ip
+ if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
+ return newConnConnectedMediator(c.connData, mRoute)
+ }
+ return newConnUnconnectedMediator(c.connData, c.peer)
+}
+
+func (c connConnectedMediator) HandlePing(w wrapper[Ping]) connState {
+ logState(c, "Ignoring ping.")
+ return c
+}
+
+func (c connConnectedMediator) HandlePong(w wrapper[Pong]) connState {
+ logState(c, "Ignoring pong.")
+ return c
+}
+
+func (c connConnectedMediator) HandleTimeout() connState {
+ return newConnUnconnectedMediator(c.connData, c.peer)
+}
diff --git a/peer/globals.go b/peer/globals.go
index 0cfebf2..fdb3286 100644
--- a/peer/globals.go
+++ b/peer/globals.go
@@ -13,4 +13,10 @@ const (
// Basic packet types
PACKET_TYPE_DATA = 0
+ PACKET_TYPE_PING = 1
+ PACKET_TYPE_PONG = 2
+
+ // Packet sizes.
+ PING_SIZE = 8
+ PONG_SIZE = 16
)
diff --git a/peer/main.go b/peer/main.go
index 94a6f67..d766ce2 100644
--- a/peer/main.go
+++ b/peer/main.go
@@ -10,8 +10,6 @@ import (
"os"
"runtime/debug"
"vppn/m"
-
- _ "net/http/pprof"
)
func Main() {
@@ -21,11 +19,6 @@ func Main() {
}
}()
- go func() {
- log.Printf("Serving on localhost:6060...")
- log.Println(http.ListenAndServe("localhost:6060", nil))
- }()
-
var (
netName string
initURL string
diff --git a/peer/nonce.go b/peer/nonce.go
index 4ebabf4..daca6e5 100644
--- a/peer/nonce.go
+++ b/peer/nonce.go
@@ -2,7 +2,16 @@ package peer
import "unsafe"
-func ParseNonceBytes(nb []byte, nonce *Nonce) {
+type Nonce struct {
+ Counter uint64
+ SourceIP byte
+ ViaIP byte
+ DestIP byte
+ StreamID byte // The stream, see STREAM_* constants
+ PacketType byte // The packet type. See PACKET_* constants.
+}
+
+func (nonce *Nonce) Parse(nb []byte) {
nonce.Counter = *(*uint64)(unsafe.Pointer(&nb[0]))
nonce.SourceIP = nb[8]
nonce.ViaIP = nb[9]
@@ -11,14 +20,13 @@ func ParseNonceBytes(nb []byte, nonce *Nonce) {
nonce.PacketType = nb[12]
}
-func MarshalNonce(nonce Nonce, buf []byte) {
+func (nonce Nonce) Marshal(buf []byte) {
*(*uint64)(unsafe.Pointer(&buf[0])) = nonce.Counter
buf[8] = nonce.SourceIP
buf[9] = nonce.ViaIP
buf[10] = nonce.DestIP
buf[11] = nonce.StreamID
buf[12] = nonce.PacketType
- clear(buf[13:])
}
func CounterTimestamp(counter uint64) int64 {
diff --git a/peer/nonce_test.go b/peer/nonce_test.go
index fd6f883..e962fc9 100644
--- a/peer/nonce_test.go
+++ b/peer/nonce_test.go
@@ -15,10 +15,10 @@ func TestMarshalParseNonce(t *testing.T) {
}
buf := make([]byte, NONCE_SIZE)
- MarshalNonce(nIn, buf)
+ nIn.Marshal(buf)
nOut := Nonce{}
- ParseNonceBytes(buf, &nOut)
+ nOut.Parse(buf)
if nIn != nOut {
t.Fatal(nIn, nOut)
}
diff --git a/peer/peer-ifreader.go b/peer/peer-ifreader.go
index 1a57e97..8b56b11 100644
--- a/peer/peer-ifreader.go
+++ b/peer/peer-ifreader.go
@@ -3,10 +3,7 @@ package peer
import (
"fmt"
"log"
- "net"
- "net/netip"
"runtime/debug"
- "vppn/fasttime"
)
func (peer *Peer) ifReader() {
@@ -17,36 +14,16 @@ func (peer *Peer) ifReader() {
}()
var (
+ sender = newConnSender(peer.conn, peer.ip, STREAM_DATA, peer.signPrivKey)
n int
destIP byte
router = peer.router
route *route
iface = peer.iface
- nonce = Nonce{
- SourceIP: peer.ip,
- PacketType: PACKET_TYPE_DATA,
- StreamID: STREAM_DATA,
- }
- err error
- now uint64
- counterTS uint64
- counter uint64
- packet = make([]byte, BUFFER_SIZE)
- encrypted = make([]byte, BUFFER_SIZE)
- nonceBuf = make([]byte, NONCE_SIZE)
- toSend []byte
- signingKey = peer.signPrivKey
-
- reqPool = make(chan udpWriteReq, 1024)
- writeChan = make(chan udpWriteReq, 1024)
+ err error
+ packet = make([]byte, BUFFER_SIZE)
)
- for range cap(reqPool) {
- reqPool <- udpWriteReq{Packet: make([]byte, BUFFER_SIZE)}
- }
-
- go udpWriter(writeChan, peer.conn, reqPool)
-
for {
n, err = iface.Read(packet[:BUFFER_SIZE])
if err != nil {
@@ -61,54 +38,13 @@ func (peer *Peer) ifReader() {
packet = packet[:n]
destIP = packet[19]
+
route = router.GetRoute(destIP)
- if route == nil {
+ if route == nil || !route.Up {
log.Printf("Dropping packet for non-existent IP: %d", destIP)
continue
}
- now = uint64(fasttime.Now())
- if counterTS < now {
- counterTS = now
- counter = now << 30
- }
-
- counter++
-
- nonce.Counter = counter
- nonce.ViaIP = route.ViaIP
- nonce.DestIP = destIP
-
- MarshalNonce(nonce, nonceBuf)
-
- encrypted = encryptPacket(route.EncSharedKey, nonceBuf, packet, encrypted)
-
- if route.ViaIP != 0 {
- toSend = signPacket(signingKey, encrypted, packet)
- } else {
- toSend = encrypted
- }
-
- req := <-reqPool
- req.Addr = route.Addr
- req.Packet = req.Packet[:len(toSend)]
- copy(req.Packet, toSend)
-
- writeChan <- req
- }
-}
-
-type udpWriteReq struct {
- Addr netip.AddrPort
- Packet []byte
-}
-
-func udpWriter(in chan udpWriteReq, conn *net.UDPConn, reqPool chan udpWriteReq) {
- var err error
- for req := range in {
- if _, err = conn.WriteToUDPAddrPort(req.Packet, req.Addr); err != nil {
- log.Fatalf("Failed to write UDP packet: %v", err)
- }
- reqPool <- req
+ sender.send(PACKET_TYPE_DATA, packet, route)
}
}
diff --git a/peer/peer-netreader.go b/peer/peer-netreader.go
index b8e719d..d4c44e2 100644
--- a/peer/peer-netreader.go
+++ b/peer/peer-netreader.go
@@ -2,8 +2,8 @@ package peer
import (
"fmt"
- "io"
"log"
+ "net/netip"
"runtime/debug"
"vppn/fasttime"
)
@@ -16,35 +16,26 @@ func (peer *Peer) netReader() {
}()
var (
- n int
- //srcAddr *net.UDPAddr
+ n int
+ srcAddr netip.AddrPort
nonce Nonce
packet = make([]byte, BUFFER_SIZE)
decrypted = make([]byte, BUFFER_SIZE)
- toWrite []byte
route *route
ok bool
err error
conn = peer.conn
ip = peer.ip
counters = [2][256]uint64{} // Counter by stream and IP.
-
- ifaceChan = make(chan []byte, 1024)
- reqPool = make(chan []byte, 1024)
)
- for range cap(reqPool) {
- reqPool <- make([]byte, BUFFER_SIZE)
- }
-
- go ifWriter(ifaceChan, peer.iface, reqPool)
-
NEXT_PACKET:
- n, _, err = conn.ReadFromUDPAddrPort(packet[:BUFFER_SIZE])
+ n, srcAddr, err = conn.ReadFromUDPAddrPort(packet[:BUFFER_SIZE])
if err != nil {
log.Fatalf("Failed to read UDP packet: %v", err)
}
+ srcAddr = netip.AddrPortFrom(srcAddr.Addr().Unmap(), srcAddr.Port())
if n < NONCE_SIZE {
log.Printf("Dropping short UDP packet: %d", n)
@@ -52,8 +43,7 @@ NEXT_PACKET:
}
packet = packet[:n]
-
- ParseNonceBytes(packet[n-NONCE_SIZE:], &nonce)
+ nonce.Parse(packet[n-NONCE_SIZE:])
// Drop after 8 seconds.
if CounterTimestamp(nonce.Counter) < fasttime.Now()-8 {
@@ -62,13 +52,13 @@ NEXT_PACKET:
}
if nonce.StreamID > 1 {
- log.Printf("Dropping invalid stream ID: %v", nonce)
+ log.Printf("Dropping invalid stream ID: %+v", nonce)
goto NEXT_PACKET
}
// Check source counter.
if nonce.Counter <= counters[nonce.StreamID][nonce.SourceIP] {
- log.Printf("Dropping packet with bad counter: %v", nonce)
+ log.Printf("Dropping packet with bad counter: %+v", nonce)
goto NEXT_PACKET
}
@@ -76,7 +66,7 @@ NEXT_PACKET:
route = peer.router.GetRoute(nonce.SourceIP)
if route == nil {
- log.Printf("Dropping packet without route: %v", nonce)
+ log.Printf("Dropping packet without route: %+v", nonce)
goto NEXT_PACKET
}
@@ -86,7 +76,7 @@ NEXT_PACKET:
case nonce.ViaIP:
goto VALIDATE_SIGNATURE
default:
- log.Printf("Bad packet: %v", nonce)
+ log.Printf("Bad packet: %+v", nonce)
goto NEXT_PACKET
}
@@ -110,49 +100,41 @@ DECRYPT:
WRITE_IFACE_DATA:
- //toWrite = toWrite[:len(decrypted)]
- //copy(toWrite, decrypted)
-
- toWrite = <-reqPool
- ifaceChan <- append(toWrite[:0], decrypted...)
+ if _, err = peer.iface.Write(decrypted); err != nil {
+ log.Fatalf("Failed to write to interface: %v", err)
+ }
goto NEXT_PACKET
WRITE_ROUTING_PACKET:
- //peer.processRoutingPacket(decrypted)
- //peer.routeManager.ProcessPacket(decrypted)
+ peer.router.HandlePacket(srcAddr, nonce, decrypted)
goto NEXT_PACKET
VALIDATE_SIGNATURE:
- // We don't forward twice.
- if route.Mediator {
- log.Printf("Dropping double-forward packet: %v", nonce)
- goto NEXT_PACKET
- }
-
decrypted, ok = openPacket(route.SignPubKey, packet, decrypted)
if !ok {
log.Printf("Failed to open signed packet: %v", nonce)
goto NEXT_PACKET
}
+ route = peer.router.GetRoute(nonce.DestIP)
+ if route == nil || !route.Up {
+ log.Printf("Dropping mediated packet, route not available: %v", nonce)
+ goto NEXT_PACKET
+ }
+
+ // We don't forward twice.
+ if route.ViaIP != 0 {
+ log.Printf("Dropping double-forward packet: %v", nonce)
+ goto NEXT_PACKET
+ }
+
if _, err = conn.WriteToUDPAddrPort(decrypted, route.Addr); err != nil {
log.Fatalf("Failed to forward packet: %v", err)
}
goto NEXT_PACKET
}
-
-func ifWriter(in chan []byte, iface io.ReadWriteCloser, out chan []byte) {
- var err error
- for packet := range in {
- if _, err = iface.Write(packet); err != nil {
- log.Printf("Size: %d", len(packet))
- log.Fatalf("Failed to write to interface: %v", err)
- }
- out <- packet
- }
-}
diff --git a/peer/peer.go b/peer/peer.go
index 0b767e6..c47fcaa 100644
--- a/peer/peer.go
+++ b/peer/peer.go
@@ -4,14 +4,15 @@ import (
"io"
"log"
"net"
+ "net/netip"
)
type Peer struct {
- // Immutable data.
- ip byte // Last byte of IPv4 address.
- hubAddr string
- apiKey string
-
+ ip byte // Last byte of IPv4 address.
+ hubAddr string
+ apiKey string
+ isServer bool
+ isMediator bool
encPubKey []byte
encPrivKey []byte
signPubKey []byte
@@ -31,6 +32,7 @@ func NewPeer(netName, listenIP string, port uint16) (*Peer, error) {
peer := &Peer{
ip: conf.PeerIP,
hubAddr: conf.HubAddress,
+ isMediator: conf.Mediator,
apiKey: conf.APIKey,
encPubKey: conf.EncPubKey,
encPrivKey: conf.EncPrivKey,
@@ -38,16 +40,16 @@ func NewPeer(netName, listenIP string, port uint16) (*Peer, error) {
signPrivKey: conf.SignPrivKey,
}
- peer.router = NewRouter(conf)
+ _, peer.isServer = netip.AddrFromSlice(conf.PublicIP)
port = determinePort(conf.Port, port)
- conn, err := openUDPConn(listenIP, port)
+ peer.conn, err = openUDPConn(listenIP, port)
if err != nil {
return nil, err
}
- peer.conn = conn
+ peer.router = NewRouter(conf, peer.conn)
peer.iface, err = openInterface(conf.Network, conf.PeerIP, netName)
if err != nil {
diff --git a/peer/router-managemediator.go b/peer/router-managemediator.go
new file mode 100644
index 0000000..9778508
--- /dev/null
+++ b/peer/router-managemediator.go
@@ -0,0 +1,45 @@
+package peer
+
+import (
+ "math/rand"
+ "time"
+)
+
+func (r *Router) manageMediator() {
+ var (
+ ip = byte(0)
+ mediators = make([]*route, 0, 32)
+ )
+
+ for range time.Tick(8 * time.Second) {
+ // If the current mediator is up, keep it.
+ route := r.routes[ip].Load()
+ if route != nil && route.Up {
+ continue
+ }
+
+ // If the current mediator is up, keep it.
+ mediators = mediators[:0]
+
+ for i := range r.routes {
+ route := r.routes[i].Load()
+ if route == nil || !route.Mediator {
+ continue
+ }
+
+ mediators = append(mediators, route)
+ }
+
+ if len(mediators) == 0 {
+ ip = 0
+ } else {
+ ip = mediators[rand.Intn(len(mediators))].PeerIP
+ }
+
+ for _, conn := range r.conns {
+ if conn != nil {
+ conn.UpdateMediator(ip)
+ }
+ }
+ }
+}
diff --git a/peer/router-ping.go b/peer/router-ping.go
deleted file mode 100644
index a624188..0000000
--- a/peer/router-ping.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package peer
-
-type RoutingPingReq struct {
- PeerIP byte
- Type byte // 0 => local, 1 => direct, 2 => Via
- Addr []byte
- Port int
- SentAt int64 // unix milli
-}
-
-type RoutingPingResp struct {
- PeerIP byte
- Type byte // 0 => local, 1 => direct, 2 => Via
- Addr []byte
- Port int
- SentAt int64
- RecvdAt int64
-}
diff --git a/peer/router-ping_test.go b/peer/router-ping_test.go
new file mode 100644
index 0000000..69f4480
--- /dev/null
+++ b/peer/router-ping_test.go
@@ -0,0 +1,37 @@
+package peer
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestMarshalParsePingReq(t *testing.T) {
+ in := Ping{
+ SentAt: 4553252,
+ }
+
+ buf := make([]byte, PING_SIZE)
+ in.Marshal(buf)
+
+ out := Ping{}
+ out.Parse(buf)
+ if !reflect.DeepEqual(in, out) {
+ t.Fatal(in, out)
+ }
+}
+
+func TestMarshalParsePingResp(t *testing.T) {
+ in := Pong{
+ SentAt: 4553252,
+ RecvdAt: 4553253,
+ }
+
+ buf := make([]byte, PONG_SIZE)
+ in.Marshal(buf)
+
+ out := Pong{}
+ out.Parse(buf)
+ if !reflect.DeepEqual(in, out) {
+ t.Fatal(in, out)
+ }
+}
diff --git a/peer/router-pollhub.go b/peer/router-pollhub.go
new file mode 100644
index 0000000..eb082b6
--- /dev/null
+++ b/peer/router-pollhub.go
@@ -0,0 +1,63 @@
+package peer
+
+import (
+ "encoding/json"
+ "io"
+ "log"
+ "net/http"
+ "net/url"
+ "time"
+ "vppn/m"
+)
+
+func (r *Router) pollHub() {
+ u, err := url.Parse(r.conf.HubAddress)
+ if err != nil {
+ log.Fatalf("Failed to parse hub address %s: %v", r.conf.HubAddress, err)
+ }
+ u.Path = "/peer/fetch-state/"
+
+ client := &http.Client{Timeout: 8 * time.Second}
+
+ req := &http.Request{
+ Method: http.MethodGet,
+ URL: u,
+ Header: http.Header{},
+ }
+ req.SetBasicAuth("", r.conf.APIKey)
+
+ // TODO: Before we start polling, load state from the file system.
+ r._pollHub(client, req)
+
+ for range time.Tick(32 * time.Second) {
+ r._pollHub(client, req)
+ }
+}
+
+func (r *Router) _pollHub(client *http.Client, req *http.Request) {
+ var state m.NetworkState
+
+ log.Printf("Fetching peer state from %s...", r.conf.HubAddress)
+ resp, err := client.Do(req)
+ if err != nil {
+ log.Printf("Failed to fetch peer state: %v", err)
+ return
+ }
+ body, err := io.ReadAll(resp.Body)
+ _ = resp.Body.Close()
+ if err != nil {
+ log.Printf("Failed to read body from hub: %v", err)
+ return
+ }
+
+ if err := json.Unmarshal(body, &state); err != nil {
+ log.Printf("Failed to unmarshal response from hub: %v", err)
+ return
+ }
+
+ for i, peer := range state.Peers {
+ if r.conns[i] != nil {
+ r.conns[i].UpdatePeer(peerUpdate{PeerIP: byte(i), Peer: peer})
+ }
+ }
+}
diff --git a/peer/router-types.go b/peer/router-types.go
new file mode 100644
index 0000000..20bd296
--- /dev/null
+++ b/peer/router-types.go
@@ -0,0 +1,74 @@
+package peer
+
+import (
+ "net/netip"
+ "unsafe"
+ "vppn/m"
+)
+
+// ----------------------------------------------------------------------------
+
+// A route is used by a peer to send or receive packets. A peer won't send
+// packets on a route that isn't up, but will process incoming packets on
+// that route.
+type route struct {
+ PeerIP byte
+ Up bool
+ Mediator bool
+ SignPubKey []byte
+ EncSharedKey []byte // Shared key for encoding / decoding packets.
+ Addr netip.AddrPort // Address to send to.
+ ViaIP byte // If != 0, this is a forwarding address.
+}
+
+type peerUpdate struct {
+ PeerIP byte
+ *m.Peer // nil => delete.
+}
+
+// ----------------------------------------------------------------------------
+
+// Wrapper for routing packets.
+type wrapper[T any] struct {
+ T T
+ Nonce Nonce
+ SrcAddr netip.AddrPort
+}
+
+func newWrapper[T any](srcAddr netip.AddrPort, nonce Nonce) wrapper[T] {
+ return wrapper[T]{
+ SrcAddr: srcAddr,
+ Nonce: nonce,
+ }
+}
+
+// ----------------------------------------------------------------------------
+
+type Ping struct {
+ SentAt int64 // unix milli
+}
+
+func (p *Ping) Parse(buf []byte) {
+ p.SentAt = *(*int64)(unsafe.Pointer(&buf[0]))
+}
+
+func (p Ping) Marshal(buf []byte) {
+ *(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt
+}
+
+// ----------------------------------------------------------------------------
+
+type Pong struct {
+ SentAt int64 // unix mili
+ RecvdAt int64 // unix mili
+}
+
+func (p *Pong) Parse(buf []byte) {
+ p.SentAt = *(*int64)(unsafe.Pointer(&buf[0]))
+ p.RecvdAt = *(*int64)(unsafe.Pointer(&buf[8]))
+}
+
+func (p *Pong) Marshal(buf []byte) {
+ *(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt
+ *(*int64)(unsafe.Pointer(&buf[8])) = p.RecvdAt
+}
diff --git a/peer/router.go b/peer/router.go
index 507804a..60b67ac 100644
--- a/peer/router.go
+++ b/peer/router.go
@@ -1,123 +1,86 @@
package peer
import (
- "encoding/json"
- "io"
"log"
- "net/http"
+ "net"
"net/netip"
- "net/url"
"sync/atomic"
- "time"
"vppn/m"
)
-var zeroAddrPort netip.AddrPort
-
-type routeInfo struct {
- Up bool
- Route route
-}
-
type Router struct {
- conf m.PeerConfig
- routes [256]*atomic.Pointer[routeInfo]
+ conf m.PeerConfig
+
+ // Routes used by the peer.
+ conns [256]*connHandler
+ routes [256]*atomic.Pointer[route]
}
-func NewRouter(conf m.PeerConfig) *Router {
- rm := &Router{
- conf: conf,
+func NewRouter(conf m.PeerConfig, conn *net.UDPConn) *Router {
+ r := &Router{conf: conf}
+
+ for i := range r.routes {
+ r.routes[i] = &atomic.Pointer[route]{}
}
- for i := range rm.routes {
- rm.routes[i] = &atomic.Pointer[routeInfo]{}
- rm.routes[i].Store(&routeInfo{})
+ _, isServer := netip.AddrFromSlice(conf.PublicIP)
+
+ sender := newConnSender(conn, conf.PeerIP, STREAM_ROUTING, conf.SignPrivKey)
+
+ for i := range r.conns {
+ if byte(i) != conf.PeerIP {
+ r.conns[i] = newConnHandler(
+ isServer,
+ byte(i),
+ r.routes,
+ conf.EncPrivKey,
+ newSafeConnSender(sender))
+ }
}
- go rm.pollHub()
- return rm
+ go r.pollHub()
+ if !isServer {
+ go r.manageMediator()
+ }
+
+ return r
}
+// ----------------------------------------------------------------------------
+// Peer Methods
+// ----------------------------------------------------------------------------
+
func (rm *Router) GetRoute(ip byte) *route {
- if route := rm.routes[ip].Load(); route != nil && route.Up {
- return &route.Route
- }
- return nil
+ return rm.routes[ip].Load()
}
-func (rm *Router) pollHub() {
- u, err := url.Parse(rm.conf.HubAddress)
- if err != nil {
- log.Fatalf("Failed to parse hub address %s: %v", rm.conf.HubAddress, err)
- }
- u.Path = "/peer/fetch-state/"
-
- client := &http.Client{Timeout: 8 * time.Second}
-
- req := &http.Request{
- Method: http.MethodGet,
- URL: u,
- Header: http.Header{},
- }
- req.SetBasicAuth("", rm.conf.APIKey)
-
- rm._pollHub(client, req)
-
- for range time.Tick(time.Minute) {
- rm._pollHub(client, req)
- }
-}
-
-func (rm *Router) _pollHub(client *http.Client, req *http.Request) {
- var state m.NetworkState
-
- log.Printf("Fetching peer state from %s...", rm.conf.HubAddress)
- resp, err := client.Do(req)
- if err != nil {
- log.Printf("Failed to fetch peer state: %v", err)
- return
- }
- body, err := io.ReadAll(resp.Body)
- _ = resp.Body.Close()
- if err != nil {
- log.Printf("Failed to read body: %v", err)
+func (r *Router) HandlePacket(src netip.AddrPort, nonce Nonce, data []byte) {
+ if nonce.SourceIP == r.conf.PeerIP {
+ log.Printf("Packet to self...")
return
}
- if err := json.Unmarshal(body, &state); err != nil {
- log.Printf("Failed to unmarshal response from hub: %v", err)
- return
- }
-
- for i, peer := range state.Peers {
- if peer == nil {
- continue
+ switch nonce.PacketType {
+ case PACKET_TYPE_PING:
+ if len(data) < PING_SIZE {
+ log.Printf("Short ping request: %d", len(data))
+ return
}
- route := rm.routes[i].Load()
- rm.routes[i].Store(rm.updateRoute(route, peer))
+ w := newWrapper[Ping](src, nonce)
+ w.T.Parse(data)
+
+ r.conns[nonce.SourceIP].HandlePing(w)
+
+ case PACKET_TYPE_PONG:
+ if len(data) < PONG_SIZE {
+ log.Printf("Short ping response: %d", len(data))
+ return
+ }
+ w := newWrapper[Pong](src, nonce)
+ w.T.Parse(data)
+ r.conns[nonce.SourceIP].HandlePong(w)
+
+ default:
+ log.Printf("Unknown routing packet type: %d", nonce.PacketType)
}
}
-
-func (rm *Router) updateRoute(routePtr *routeInfo, peer *m.Peer) *routeInfo {
- if peer == nil {
- return &routeInfo{}
- }
-
- route := *routePtr
-
- addr, ok := netip.AddrFromSlice(peer.IP)
- if !ok {
- return &routeInfo{}
- }
-
- route.Up = true
- route.Route.Addr = netip.AddrPortFrom(addr, peer.Port)
- route.Route.Mediator = peer.Mediator
- route.Route.ViaIP = 0
- if len(route.Route.SignPubKey) == 0 {
- route.Route.SignPubKey = peer.SignPubKey
- route.Route.EncSharedKey = computeSharedKey(peer.EncPubKey, rm.conf.EncPrivKey)
- }
-
- return &route
-}
diff --git a/peer/types.go b/peer/types.go
deleted file mode 100644
index 415e9d6..0000000
--- a/peer/types.go
+++ /dev/null
@@ -1,22 +0,0 @@
-package peer
-
-import (
- "net/netip"
-)
-
-type route struct {
- Mediator bool // Route is via a mediator.
- SignPubKey []byte
- EncSharedKey []byte // Shared key for encoding / decoding packets.
- Addr netip.AddrPort // Address to send to.
- ViaIP byte // If != 0, this is a forwarding address.
-}
-
-type Nonce struct {
- Counter uint64
- SourceIP byte
- ViaIP byte
- DestIP byte
- StreamID byte // The stream, see STREAM_* constants
- PacketType byte // The packet type. See PACKET_* constants.
-}