This commit is contained in:
jdl 2024-12-12 21:11:17 +01:00
parent 03ff1aac80
commit fdf0066fc2
35 changed files with 1138 additions and 331 deletions

View File

@ -4,8 +4,9 @@
* Peer: router: create process for managing the routing table
* Peer: router: track mediators, enable / disable ...
* Hub: track peer last-seen timestamp
* Hub: track peer last-seen timestamp (?)
* Peer: local peer discovery - part of RoutingProcessor
* Peer: update hub w/ latest port on startup
## Principles
@ -13,24 +14,21 @@
* Simple setup: via setup link from the hub.
* Each peer has full network state replicated from the hub.
## Design
## Routing
* Append nonce to end of packet
* Then it's readable whether signed or unsiged
* Types of packets to send:
* standard: encrypt and send
* Forward via: encrypt, sign and send
* Forward to: send
* Type of packeting read from interface:
* Forward to: check signature
* Forwarded, standard
* Routing is different for public vs non-public peers
* Public: routes are initialized via incoming ping requests
* NonPub: routes are initialized via incoming ping responses
Incoming from net:
* Data for iface
* Packet for forward
* Packet for routingHandler
* Incoming from iface:
* Data for peer
A non-public peer needs to maintain connections with every public peer.
* Sending:
* Public: send to address
* Non-public: send to a mediator
* Pings:
* Servers don't need to ping
* Clients need to ping all public and local peers to keep connections open
## Hub Server Configuration
@ -84,3 +82,28 @@ journalctl -f -u hub -n 100
```
Sign-in and configure.
## Peer Configuration
Install the binary somewhere, for example `~/bin/vppn`.
Create systemd file in `/etc/systemd/system/vppn.service`.
```
Description=vppn
Requires=network.target
[Service]
AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
Type=simple
User=user
WorkingDirectory=/home/user/
ExecStart=/home/user/vppn -name vppn
Restart=always
RestartSec=8
TimeoutStopSec=24
[Install]
WantedBy=default.target
```

View File

@ -2,3 +2,4 @@
go build
sudo setcap cap_net_admin+iep vppn
sudo setcap cap_net_bind_service+iep vppn

View File

@ -144,7 +144,7 @@ func (a *API) Session_SignIn(s *Session, pwd string) error {
type PeerCreateArgs struct {
Name string
IP []byte
PublicIP []byte
Port uint16
Mediator bool
}
@ -209,9 +209,10 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) {
peer := &Peer{
PeerIP: peerIP,
Version: idgen.NextID(0),
APIKey: idgen.NewToken(),
Name: args.Name,
IP: args.IP,
PublicIP: args.PublicIP,
Port: args.Port,
Mediator: args.Mediator,
EncPubKey: encPubKey[:],
@ -229,7 +230,7 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) {
HubAddress: conf.HubAddress,
APIKey: peer.APIKey,
Network: conf.VPNNetwork,
IP: peer.IP,
PublicIP: peer.PublicIP,
Port: peer.Port,
Mediator: peer.Mediator,
EncPubKey: encPubKey[:],
@ -240,6 +241,10 @@ func (a *API) Peer_Create(creationCode string) (*m.PeerConfig, error) {
}
func (a *API) Peer_Update(p *Peer) error {
a.lock.Lock()
defer a.lock.Unlock()
p.Version = idgen.NextID(0)
return db.Peer_Update(a.db, p)
}

View File

@ -308,16 +308,17 @@ func Session_List(
type Peer struct {
PeerIP byte
Version int64
APIKey string
Name string
IP []byte
PublicIP []byte
Port uint16
Mediator bool
EncPubKey []byte
SignPubKey []byte
}
const Peer_SelectQuery = "SELECT PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey FROM peers"
const Peer_SelectQuery = "SELECT PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey FROM peers"
func Peer_Insert(
tx TX,
@ -328,7 +329,7 @@ func Peer_Insert(
return err
}
_, err = tx.Exec("INSERT INTO peers(PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey) VALUES(?,?,?,?,?,?,?,?)", row.PeerIP, row.APIKey, row.Name, row.IP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey)
_, err = tx.Exec("INSERT INTO peers(PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey) VALUES(?,?,?,?,?,?,?,?,?)", row.PeerIP, row.Version, row.APIKey, row.Name, row.PublicIP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey)
return err
}
@ -341,7 +342,7 @@ func Peer_Update(
return err
}
result, err := tx.Exec("UPDATE peers SET Name=?,IP=?,Port=?,Mediator=? WHERE PeerIP=?", row.Name, row.IP, row.Port, row.Mediator, row.PeerIP)
result, err := tx.Exec("UPDATE peers SET Version=?,Name=?,PublicIP=?,Port=?,Mediator=? WHERE PeerIP=?", row.Version, row.Name, row.PublicIP, row.Port, row.Mediator, row.PeerIP)
if err != nil {
return err
}
@ -369,7 +370,7 @@ func Peer_UpdateFull(
return err
}
result, err := tx.Exec("UPDATE peers SET APIKey=?,Name=?,IP=?,Port=?,Mediator=?,EncPubKey=?,SignPubKey=? WHERE PeerIP=?", row.APIKey, row.Name, row.IP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey, row.PeerIP)
result, err := tx.Exec("UPDATE peers SET Version=?,APIKey=?,Name=?,PublicIP=?,Port=?,Mediator=?,EncPubKey=?,SignPubKey=? WHERE PeerIP=?", row.Version, row.APIKey, row.Name, row.PublicIP, row.Port, row.Mediator, row.EncPubKey, row.SignPubKey, row.PeerIP)
if err != nil {
return err
}
@ -419,8 +420,8 @@ func Peer_Get(
err error,
) {
row = &Peer{}
r := tx.QueryRow("SELECT PeerIP,APIKey,Name,IP,Port,Mediator,EncPubKey,SignPubKey FROM peers WHERE PeerIP=?", PeerIP)
err = r.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
r := tx.QueryRow("SELECT PeerIP,Version,APIKey,Name,PublicIP,Port,Mediator,EncPubKey,SignPubKey FROM peers WHERE PeerIP=?", PeerIP)
err = r.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
return
}
@ -434,7 +435,7 @@ func Peer_GetWhere(
) {
row = &Peer{}
r := tx.QueryRow(query, args...)
err = r.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
err = r.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
return
}
@ -454,7 +455,7 @@ func Peer_Iterate(
defer rows.Close()
for rows.Next() {
row := &Peer{}
err := rows.Scan(&row.PeerIP, &row.APIKey, &row.Name, &row.IP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
err := rows.Scan(&row.PeerIP, &row.Version, &row.APIKey, &row.Name, &row.PublicIP, &row.Port, &row.Mediator, &row.EncPubKey, &row.SignPubKey)
if !yield(row, err) {
return
}

View File

@ -44,10 +44,10 @@ func Session_Validate(s *Session) error {
func Peer_Sanitize(p *Peer) {
p.Name = strings.TrimSpace(p.Name)
if len(p.IP) != 0 {
addr, ok := netip.AddrFromSlice(p.IP)
if len(p.PublicIP) != 0 {
addr, ok := netip.AddrFromSlice(p.PublicIP)
if ok && addr.Is4() {
p.IP = addr.AsSlice()
p.PublicIP = addr.AsSlice()
}
}
if p.Port == 0 {
@ -56,8 +56,8 @@ func Peer_Sanitize(p *Peer) {
}
func Peer_Validate(p *Peer) error {
if len(p.IP) > 0 {
_, ok := netip.AddrFromSlice(p.IP)
if len(p.PublicIP) > 0 {
_, ok := netip.AddrFromSlice(p.PublicIP)
if !ok {
return ErrInvalidIP
}

View File

@ -15,9 +15,10 @@ TABLE sessions OF Session NoUpdate (
TABLE peers OF Peer (
PeerIP byte PK,
Version int64,
APIKey string NoUpdate,
Name string,
IP []byte,
PublicIP []byte,
Port uint16,
Mediator bool,
EncPubKey []byte NoUpdate,

View File

@ -17,9 +17,10 @@ CREATE INDEX sessions_last_seen_index ON sessions(LastSeenAt);
CREATE TABLE peers (
PeerIP INTEGER NOT NULL PRIMARY KEY, -- Final byte.
Version INTEGER NOT NULL,
APIKey TEXT NOT NULL UNIQUE,
Name TEXT NOT NULL UNIQUE, -- For humans.
IP BLOB NOT NULL,
PublicIP BLOB NOT NULL,
Port INTEGER NOT NULL,
Mediator INTEGER NOT NULL DEFAULT 0, -- Boolean if peer will forward packets. Must also have public address.
EncPubKey BLOB NOT NULL,

View File

@ -64,3 +64,18 @@ func (app *App) handleSignedIn(pattern string, fn handlerFunc) {
return fn(s, w, r)
})
}
type peerHandlerFunc func(w http.ResponseWriter, r *http.Request) error
func (app *App) handlePeer(pattern string, fn peerHandlerFunc) {
wrapped := func(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
if err := fn(w, r); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
app.mux.HandleFunc(pattern,
webutil.WithLogging(
wrapped))
}

View File

@ -165,7 +165,7 @@ func (a *App) _adminPeerCreateSubmit(s *api.Session, w http.ResponseWriter, r *h
args := api.PeerCreateArgs{}
err := webutil.NewFormScanner(r.Form).
Scan("Name", &args.Name).
Scan("IP", &ipStr).
Scan("PublicIP", &ipStr).
Scan("Port", &args.Port).
Scan("Mediator", &args.Mediator).
Error()
@ -173,7 +173,7 @@ func (a *App) _adminPeerCreateSubmit(s *api.Session, w http.ResponseWriter, r *h
return err
}
if args.IP, err = stringToIP(ipStr); err != nil {
if args.PublicIP, err = stringToIP(ipStr); err != nil {
return err
}
@ -247,7 +247,7 @@ func (a *App) _adminPeerEditSubmit(s *api.Session, w http.ResponseWriter, r *htt
}
err = webutil.NewFormScanner(r.Form).
Scan("Name", &peer.Name).
Scan("IP", &ipStr).
Scan("PublicIP", &ipStr).
Scan("Port", &peer.Port).
Scan("Mediator", &peer.Mediator).
Error()
@ -255,7 +255,7 @@ func (a *App) _adminPeerEditSubmit(s *api.Session, w http.ResponseWriter, r *htt
return err
}
if peer.IP, err = stringToIP(ipStr); err != nil {
if peer.PublicIP, err = stringToIP(ipStr); err != nil {
return err
}
@ -298,7 +298,7 @@ func (a *App) _adminPeerDeleteSubmit(s *api.Session, w http.ResponseWriter, r *h
return a.redirect(w, r, "/admin/peer/list/")
}
func (a *App) _peerCreate(s *api.Session, w http.ResponseWriter, r *http.Request) error {
func (a *App) _peerCreate(w http.ResponseWriter, r *http.Request) error {
code := r.FormValue("Code")
conf, err := a.api.Peer_Create(code)
if err != nil {
@ -308,7 +308,7 @@ func (a *App) _peerCreate(s *api.Session, w http.ResponseWriter, r *http.Request
return a.sendJSON(w, conf)
}
func (a *App) _peerFetchState(s *api.Session, w http.ResponseWriter, r *http.Request) error {
func (a *App) _peerFetchState(w http.ResponseWriter, r *http.Request) error {
_, apiKey, ok := r.BasicAuth()
if !ok {
log.Printf("1")
@ -333,15 +333,16 @@ func (a *App) _peerFetchState(s *api.Session, w http.ResponseWriter, r *http.Req
HubAddress: conf.HubAddress,
Network: conf.VPNNetwork,
PeerIP: peer.PeerIP,
IP: peer.IP,
PublicIP: peer.PublicIP,
Port: peer.Port,
}
for _, p := range peers {
state.Peers[p.PeerIP] = &m.Peer{
PeerIP: p.PeerIP,
Version: p.Version,
Name: p.Name,
IP: p.IP,
PublicIP: p.PublicIP,
Port: p.Port,
Mediator: p.Mediator,
EncPubKey: p.EncPubKey,

View File

@ -26,6 +26,6 @@ func (a *App) registerRoutes() {
a.handleSignedIn("GET /admin/peer/delete/", a._adminPeerDelete)
a.handleSignedIn("POST /admin/peer/delete/", a._adminPeerDeleteSubmit)
a.handleNotSignedIn("GET /peer/create/", a._peerCreate)
a.handleNotSignedIn("GET /peer/fetch-state/", a._peerFetchState)
a.handlePeer("GET /peer/create/", a._peerCreate)
a.handlePeer("GET /peer/fetch-state/", a._peerFetchState)
}

View File

@ -8,8 +8,8 @@
<input type="text" name="Name">
</p>
<p>
<label>IP</label><br>
<input type="text" name="IP">
<label>Public IP</label><br>
<input type="text" name="PublicIP">
</p>
<p>
<label>Port</label><br>

View File

@ -13,8 +13,8 @@
<input type="text" value="{{.Name}}" disabled>
</p>
<p>
<label>IP</label><br>
<input type="text" value="{{ipToString .IP}}" disabled>
<label>Public IP</label><br>
<input type="text" value="{{ipToString .PublicIP}}" disabled>
</p>
<p>
<label>Port</label><br>

View File

@ -13,8 +13,8 @@
<input type="text" name="Name" value="{{.Name}}">
</p>
<p>
<label>IP</label><br>
<input type="text" name="IP" value="{{ipToString .IP}}">
<label>Public IP</label><br>
<input type="text" name="PublicIP" value="{{ipToString .PublicIP}}">
</p>
<p>
<label>Port</label><br>

View File

@ -11,7 +11,7 @@
<tr>
<th>PeerIP</th>
<th>Name</th>
<th>IP</th>
<th>Public IP</th>
<th>Port</th>
<th>Mediator</th>
</tr>
@ -25,7 +25,7 @@
</a>
</td>
<td>{{.Name}}</td>
<td>{{ipToString .IP}}</td>
<td>{{ipToString .PublicIP}}</td>
<td>{{.Port}}</td>
<td>{{if .Mediator}}T{{else}}F{{end}}</td>
</tr>

View File

@ -10,7 +10,7 @@
<table class="def-list">
<tr><td>Peer IP</td><td>{{.PeerIP}}</td></tr>
<tr><td>Name</td><td>{{.Name}}</td></tr>
<tr><td>IP</td><td>{{ipToString .IP}}</td></tr>
<tr><td>Public IP</td><td>{{ipToString .PublicIP}}</td></tr>
<tr><td>Port</td><td>{{.Port}}</td></tr>
<tr><td>Mediator</td><td>{{if .Mediator}}T{{else}}F{{end}}</td></tr>
<tr><td>API Key</td><td>{{.APIKey}}</td></tr>

View File

@ -6,7 +6,7 @@ type PeerConfig struct {
HubAddress string
Network []byte
APIKey string
IP []byte
PublicIP []byte
Port uint16
Mediator bool
EncPubKey []byte
@ -17,8 +17,9 @@ type PeerConfig struct {
type Peer struct {
PeerIP byte
Version int64
Name string
IP []byte
PublicIP []byte
Port uint16
Mediator bool
EncPubKey []byte
@ -29,10 +30,10 @@ type NetworkState struct {
HubAddress string
// The requester's data:
Network []byte
PeerIP byte
IP []byte
Port uint16
Network []byte
PeerIP byte
PublicIP []byte
Port uint16
// All peer data.
Peers [256]*Peer

9
peer/conn-states.dot Normal file
View File

@ -0,0 +1,9 @@
digraph d {
init -> null;
init -> unconnectedServer;
init -> unconnectedClient;
init -> unconnectedMediated;
unconnectedServer -> connectedServer;
unconnectedClient -> connectedClient;
unconnectedMediated -> connectedMediated;
}

83
peer/conndata.go Normal file
View File

@ -0,0 +1,83 @@
package peer
import (
"net/netip"
"sync/atomic"
"time"
"vppn/m"
)
const (
pingInterval = time.Second * 8
timeoutInterval = 32 * time.Second
)
type connData struct {
// Shared data.
routes [256]*atomic.Pointer[route]
route *atomic.Pointer[route]
// Local data.
mediatorIP byte
// Peer data.
server bool // Never changes.
peerIP byte // Never changes.
encPrivKey []byte // Never changes.
peer *m.Peer // From hub.
encSharedKey []byte // From hub + private key.
publicAddr netip.AddrPort // From hub.
pingTimer *time.Timer
timeoutTimer *time.Timer
// Routing data.
addr netip.AddrPort
viaIP byte
up bool
// For sending.
buf []byte
sender *safeConnSender
}
func (d *connData) Route() *route {
return &route{
PeerIP: d.peerIP,
Up: d.up,
Mediator: d.peer.Mediator,
SignPubKey: d.peer.SignPubKey,
EncSharedKey: d.encSharedKey,
Addr: d.addr,
ViaIP: d.viaIP,
}
}
func (d *connData) HandlePeerUpdate(state connState, update peerUpdate) connState {
if d.peer != nil && update.Peer != nil && d.peer.Version == update.Peer.Version {
return state
}
if d.peer == nil && update.Peer == nil {
return state
}
return newConnStateFromPeer(update, d)
}
func (d *connData) HandleSendPing() {
route := d.route.Load()
req := Ping{SentAt: time.Now().UnixMilli()}
req.Marshal(d.buf[:PING_SIZE])
d.sender.send(PACKET_TYPE_PING, d.buf[:PING_SIZE], route)
d.pingTimer.Reset(pingInterval)
}
func (d *connData) sendPong(w wrapper[Ping]) {
route := d.route.Load()
pong := Pong{
SentAt: w.T.SentAt,
RecvdAt: time.Now().UnixMilli(),
}
pong.Marshal(d.buf[:PONG_SIZE])
d.sender.send(PACKET_TYPE_PONG, d.buf[:PONG_SIZE], route)
}

121
peer/connhandler.go Normal file
View File

@ -0,0 +1,121 @@
package peer
import (
"fmt"
"log"
"runtime/debug"
"sync/atomic"
"time"
)
type connHandler struct {
// Communication.
mediatorUpdates chan byte
peerUpdates chan peerUpdate
pings chan wrapper[Ping]
pongs chan wrapper[Pong]
data *connData
}
func newConnHandler(
server bool,
peerIP byte,
routes [256]*atomic.Pointer[route],
encPrivKey []byte,
sender *safeConnSender,
) *connHandler {
d := &connData{
server: server,
pingTimer: time.NewTimer(pingInterval),
timeoutTimer: time.NewTimer(timeoutInterval),
routes: routes,
route: routes[peerIP],
peerIP: peerIP,
encPrivKey: encPrivKey,
buf: make([]byte, BUFFER_SIZE),
sender: sender,
}
h := &connHandler{
mediatorUpdates: make(chan byte, 1),
peerUpdates: make(chan peerUpdate, 1),
pings: make(chan wrapper[Ping], 1),
pongs: make(chan wrapper[Pong], 1),
data: d,
}
go h.mainLoop()
return h
}
func (h *connHandler) mainLoop() {
defer func() {
if r := recover(); r != nil {
fmt.Println("stacktrace from panic: \n" + string(debug.Stack()))
}
}()
var (
data = h.data
state = newConnNull(data)
name = state.Name()
)
for {
select {
case ip := <-h.mediatorUpdates:
state = state.HandleMediatorUpdate(ip)
case update := <-h.peerUpdates:
state = data.HandlePeerUpdate(state, update)
case w := <-h.pings:
state = state.HandlePing(w)
case w := <-h.pongs:
state = state.HandlePong(w)
case <-data.pingTimer.C:
state.HandleSendPing()
case <-data.timeoutTimer.C:
log.Printf("[%s] Connection timeout.", state.Name())
state = state.HandleTimeout()
}
if state.Name() != name {
log.Printf("[%03d] STATE: %s --> %s", data.peerIP, name, state.Name())
name = state.Name()
}
}
}
func (c *connHandler) UpdateMediator(ip byte) {
select {
case c.mediatorUpdates <- ip:
default:
}
}
func (c *connHandler) HandlePing(w wrapper[Ping]) {
select {
case c.pings <- w:
default:
}
}
func (c *connHandler) HandlePong(w wrapper[Pong]) {
select {
case c.pongs <- w:
default:
}
}
func (c *connHandler) UpdatePeer(update peerUpdate) {
select {
case c.peerUpdates <- update:
default:
}
}

83
peer/connsender.go Normal file
View File

@ -0,0 +1,83 @@
package peer
import (
"log"
"net"
"runtime/debug"
"sync"
"vppn/fasttime"
)
type connSender struct {
conn *net.UDPConn
sourceIP byte
streamID byte
encrypted []byte
nonceBuf []byte
counterTS uint64
counter uint64
signingKey []byte
}
func newConnSender(conn *net.UDPConn, srcIP, streamID byte, signingPrivKey []byte) *connSender {
return &connSender{
conn: conn,
sourceIP: srcIP,
streamID: streamID,
encrypted: make([]byte, BUFFER_SIZE),
nonceBuf: make([]byte, NONCE_SIZE),
signingKey: signingPrivKey,
}
}
func (cs *connSender) send(packetType byte, packet []byte, route *route) {
now := uint64(fasttime.Now())
if cs.counterTS < now {
cs.counterTS = now
cs.counter = now << 30
}
cs.counter++
nonce := Nonce{
Counter: cs.counter,
SourceIP: cs.sourceIP,
ViaIP: route.ViaIP,
DestIP: route.PeerIP,
StreamID: cs.streamID,
PacketType: packetType,
}
nonce.Marshal(cs.nonceBuf)
encrypted := encryptPacket(route.EncSharedKey, cs.nonceBuf, packet, cs.encrypted)
var toSend []byte
if route.ViaIP != 0 {
toSend = signPacket(cs.signingKey, encrypted, packet)
} else {
toSend = encrypted
}
log.Printf("Sending to %v: %+v", route.Addr, nonce)
if _, err := cs.conn.WriteToUDPAddrPort(toSend, route.Addr); err != nil {
log.Fatalf("Failed to write UDP packet: %v\n%s", err, debug.Stack())
}
}
// ----------------------------------------------------------------------------
type safeConnSender struct {
lock sync.Mutex
sender *connSender
}
func newSafeConnSender(sender *connSender) *safeConnSender {
return &safeConnSender{sender: sender}
}
func (s *safeConnSender) send(packetType byte, packet []byte, route *route) {
s.lock.Lock()
defer s.lock.Unlock()
s.sender.send(packetType, packet, route)
}

393
peer/connstate.go Normal file
View File

@ -0,0 +1,393 @@
package peer
import (
"log"
"net/netip"
"time"
"vppn/m"
)
func logState(s connState, msg string, args ...any) {
log.Printf("["+s.Name()+"] "+msg, args...)
}
// ----------------------------------------------------------------------------
// The connection state corresponds to what we're connected TO.
type connState interface {
Name() string
HandleMediatorUpdate(ip byte) connState
HandleSendPing()
HandlePing(wrapper[Ping]) connState
HandlePong(wrapper[Pong]) connState
HandleTimeout() connState
}
// Helper function.
func newConnStateFromPeer(update peerUpdate, data *connData) connState {
peer := update.Peer
if peer == nil {
return newConnNull(data)
}
if _, isPublic := netip.AddrFromSlice(peer.PublicIP); isPublic {
return newConnUnconnectedServer(data, peer)
} else if data.server {
return newConnUnconnectedClient(data, peer)
} else {
return newConnUnconnectedMediator(data, peer)
}
}
/////////////////////
// Null Connection //
/////////////////////
type connNull struct {
*connData
}
func newConnNull(data *connData) connState {
c := connNull{data}
c.peer = nil
c.encSharedKey = nil
c.publicAddr = netip.AddrPort{}
c.pingTimer.Stop()
c.timeoutTimer.Stop()
c.addr = c.publicAddr
c.viaIP = 0
c.up = false
c.route.Store(nil)
return c
}
func (c connNull) Name() string {
return "NoPeer"
}
func (c connNull) HandleMediatorUpdate(ip byte) connState {
c.mediatorIP = ip
return c
}
func (c connNull) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.")
return c
}
func (c connNull) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignoring pong.")
return c
}
func (c connNull) HandleTimeout() connState {
logState(c, "Unexpected timeout.")
return c
}
////////////////////////
// Unconnected Server //
////////////////////////
type connUnconnectedServer struct {
*connData
}
func newConnUnconnectedServer(data *connData, peer *m.Peer) connState {
addr, _ := netip.AddrFromSlice(peer.PublicIP)
pubAddr := netip.AddrPortFrom(addr, peer.Port)
c := connUnconnectedServer{data}
c.peer = peer
c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
c.publicAddr = pubAddr
c.pingTimer.Reset(time.Millisecond) // Ping right away to bring up.
c.timeoutTimer.Stop() // No timeouts yet.
c.addr = c.publicAddr
c.viaIP = 0
c.up = false
c.route.Store(c.Route())
return c
}
func (c connUnconnectedServer) Name() string {
return "ServerUnconnected"
}
func (c connUnconnectedServer) HandleMediatorUpdate(ip byte) connState {
// Server connection doesn't use a mediator.
c.mediatorIP = ip
return c
}
func (c connUnconnectedServer) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.")
return c
}
func (c connUnconnectedServer) HandlePong(w wrapper[Pong]) connState {
return newConnConnectedServer(c.connData, w)
}
func (c connUnconnectedServer) HandleTimeout() connState {
logState(c, "Unexpected timeout.")
return c
}
//////////////////////
// Connected Server //
//////////////////////
type connConnectedServer struct {
*connData
}
func newConnConnectedServer(data *connData, w wrapper[Pong]) connState {
c := connConnectedServer{data}
c.pingTimer.Reset(pingInterval)
c.timeoutTimer.Reset(timeoutInterval)
c.addr = w.SrcAddr
c.viaIP = 0
c.up = true
c.route.Store(c.Route())
return c
}
func (c connConnectedServer) Name() string {
return "ServerConnected"
}
func (c connConnectedServer) HandleMediatorUpdate(ip byte) connState {
// Server connection doesn't use a mediator.
c.mediatorIP = ip
return c
}
func (c connConnectedServer) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.")
return c
}
func (c connConnectedServer) HandlePong(w wrapper[Pong]) connState {
c.timeoutTimer.Reset(timeoutInterval)
return c
}
func (c connConnectedServer) HandleTimeout() connState {
return newConnUnconnectedServer(c.connData, c.peer)
}
////////////////////////
// Unconnected Client //
////////////////////////
type connUnconnectedClient struct {
*connData
}
func newConnUnconnectedClient(data *connData, peer *m.Peer) connState {
addr, _ := netip.AddrFromSlice(peer.PublicIP)
pubAddr := netip.AddrPortFrom(addr, peer.Port)
c := connUnconnectedClient{data}
c.peer = peer
c.publicAddr = pubAddr
c.encPrivKey = data.encPrivKey
c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
c.addr = c.publicAddr
c.viaIP = 0
c.up = false
c.route.Store(c.Route())
c.pingTimer.Stop() // Conncection is from client => pings incoming.
c.timeoutTimer.Stop() // No timeouts yet.
return c
}
func (c connUnconnectedClient) Name() string {
return "ClientUnconnected"
}
func (c connUnconnectedClient) HandleMediatorUpdate(ip byte) connState {
// Client connection doesn't use a mediator.
c.mediatorIP = ip
return c
}
func (c connUnconnectedClient) HandlePing(w wrapper[Ping]) connState {
next := newConnConnectedClient(c.connData, w)
c.sendPong(w) // Have to send after transitionsing so route is ok.
return next
}
func (c connUnconnectedClient) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignorning pong.")
return c
}
func (c connUnconnectedClient) HandleTimeout() connState {
logState(c, "Unexpected timeout.")
return c
}
//////////////////////
// Connected Client //
//////////////////////
type connConnectedClient struct {
*connData
}
func newConnConnectedClient(data *connData, w wrapper[Ping]) connState {
c := connConnectedClient{data}
c.addr = w.SrcAddr
c.viaIP = 0
c.up = true
c.route.Store(c.Route())
c.pingTimer.Stop() // Conncection is from client => pings incoming.
c.timeoutTimer.Reset(timeoutInterval)
return c
}
func (c connConnectedClient) Name() string {
return "ClientConnected"
}
func (c connConnectedClient) HandleMediatorUpdate(ip byte) connState {
// Client connection doesn't use a mediator.
c.mediatorIP = ip
return c
}
func (c connConnectedClient) HandlePing(w wrapper[Ping]) connState {
// The connection is from a client. If the client's address changes, we
// should follow that change.
if c.addr != w.SrcAddr {
c.addr = w.SrcAddr
c.route.Store(c.Route())
}
c.sendPong(w)
c.timeoutTimer.Reset(timeoutInterval)
return c
}
func (c connConnectedClient) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignoring pong.")
return c
}
func (c connConnectedClient) HandleTimeout() connState {
return newConnUnconnectedClient(c.connData, c.peer)
}
//////////////////////////
// Unconnected Mediator //
//////////////////////////
type connUnconnectedMediator struct {
*connData
}
func newConnUnconnectedMediator(data *connData, peer *m.Peer) connState {
addr, _ := netip.AddrFromSlice(peer.PublicIP)
pubAddr := netip.AddrPortFrom(addr, peer.Port)
c := connUnconnectedMediator{data}
c.peer = peer
c.publicAddr = pubAddr
c.encPrivKey = data.encPrivKey
c.encSharedKey = computeSharedKey(peer.EncPubKey, c.encPrivKey)
c.addr = c.publicAddr
c.viaIP = 0
c.up = false
c.route.Store(c.Route())
c.pingTimer.Stop() // No pings for mediators.
c.timeoutTimer.Stop() // No timeouts yet.
// If we have a mediator route, we can connect.
if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
return newConnConnectedMediator(data, mRoute)
}
return c
}
func (c connUnconnectedMediator) Name() string {
return "MediatorUnconnected"
}
func (c connUnconnectedMediator) HandleMediatorUpdate(ip byte) connState {
c.mediatorIP = ip
if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
return newConnConnectedMediator(c.connData, mRoute)
}
return c
}
func (c connUnconnectedMediator) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignorning ping.")
return c
}
func (c connUnconnectedMediator) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignorning pong.")
return c
}
func (c connUnconnectedMediator) HandleTimeout() connState {
logState(c, "Unexpected timeout.")
return c
}
////////////////////////
// Connected Mediator //
////////////////////////
type connConnectedMediator struct {
*connData
}
func newConnConnectedMediator(data *connData, route *route) connState {
c := connConnectedMediator{data}
c.addr = route.Addr
c.viaIP = route.PeerIP
c.up = true
c.route.Store(c.Route())
// No pings for mediated routes.
c.pingTimer.Stop()
c.timeoutTimer.Stop()
return c
}
func (c connConnectedMediator) Name() string {
return "MediatorConnected"
}
func (c connConnectedMediator) HandleMediatorUpdate(ip byte) connState {
c.mediatorIP = ip
if mRoute := c.routes[c.mediatorIP].Load(); mRoute != nil {
return newConnConnectedMediator(c.connData, mRoute)
}
return newConnUnconnectedMediator(c.connData, c.peer)
}
func (c connConnectedMediator) HandlePing(w wrapper[Ping]) connState {
logState(c, "Ignoring ping.")
return c
}
func (c connConnectedMediator) HandlePong(w wrapper[Pong]) connState {
logState(c, "Ignoring pong.")
return c
}
func (c connConnectedMediator) HandleTimeout() connState {
return newConnUnconnectedMediator(c.connData, c.peer)
}

View File

@ -13,4 +13,10 @@ const (
// Basic packet types
PACKET_TYPE_DATA = 0
PACKET_TYPE_PING = 1
PACKET_TYPE_PONG = 2
// Packet sizes.
PING_SIZE = 8
PONG_SIZE = 16
)

View File

@ -10,8 +10,6 @@ import (
"os"
"runtime/debug"
"vppn/m"
_ "net/http/pprof"
)
func Main() {
@ -21,11 +19,6 @@ func Main() {
}
}()
go func() {
log.Printf("Serving on localhost:6060...")
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
var (
netName string
initURL string

View File

@ -2,7 +2,16 @@ package peer
import "unsafe"
func ParseNonceBytes(nb []byte, nonce *Nonce) {
type Nonce struct {
Counter uint64
SourceIP byte
ViaIP byte
DestIP byte
StreamID byte // The stream, see STREAM_* constants
PacketType byte // The packet type. See PACKET_* constants.
}
func (nonce *Nonce) Parse(nb []byte) {
nonce.Counter = *(*uint64)(unsafe.Pointer(&nb[0]))
nonce.SourceIP = nb[8]
nonce.ViaIP = nb[9]
@ -11,14 +20,13 @@ func ParseNonceBytes(nb []byte, nonce *Nonce) {
nonce.PacketType = nb[12]
}
func MarshalNonce(nonce Nonce, buf []byte) {
func (nonce Nonce) Marshal(buf []byte) {
*(*uint64)(unsafe.Pointer(&buf[0])) = nonce.Counter
buf[8] = nonce.SourceIP
buf[9] = nonce.ViaIP
buf[10] = nonce.DestIP
buf[11] = nonce.StreamID
buf[12] = nonce.PacketType
clear(buf[13:])
}
func CounterTimestamp(counter uint64) int64 {

View File

@ -15,10 +15,10 @@ func TestMarshalParseNonce(t *testing.T) {
}
buf := make([]byte, NONCE_SIZE)
MarshalNonce(nIn, buf)
nIn.Marshal(buf)
nOut := Nonce{}
ParseNonceBytes(buf, &nOut)
nOut.Parse(buf)
if nIn != nOut {
t.Fatal(nIn, nOut)
}

View File

@ -3,10 +3,7 @@ package peer
import (
"fmt"
"log"
"net"
"net/netip"
"runtime/debug"
"vppn/fasttime"
)
func (peer *Peer) ifReader() {
@ -17,36 +14,16 @@ func (peer *Peer) ifReader() {
}()
var (
sender = newConnSender(peer.conn, peer.ip, STREAM_DATA, peer.signPrivKey)
n int
destIP byte
router = peer.router
route *route
iface = peer.iface
nonce = Nonce{
SourceIP: peer.ip,
PacketType: PACKET_TYPE_DATA,
StreamID: STREAM_DATA,
}
err error
now uint64
counterTS uint64
counter uint64
packet = make([]byte, BUFFER_SIZE)
encrypted = make([]byte, BUFFER_SIZE)
nonceBuf = make([]byte, NONCE_SIZE)
toSend []byte
signingKey = peer.signPrivKey
reqPool = make(chan udpWriteReq, 1024)
writeChan = make(chan udpWriteReq, 1024)
err error
packet = make([]byte, BUFFER_SIZE)
)
for range cap(reqPool) {
reqPool <- udpWriteReq{Packet: make([]byte, BUFFER_SIZE)}
}
go udpWriter(writeChan, peer.conn, reqPool)
for {
n, err = iface.Read(packet[:BUFFER_SIZE])
if err != nil {
@ -61,54 +38,13 @@ func (peer *Peer) ifReader() {
packet = packet[:n]
destIP = packet[19]
route = router.GetRoute(destIP)
if route == nil {
if route == nil || !route.Up {
log.Printf("Dropping packet for non-existent IP: %d", destIP)
continue
}
now = uint64(fasttime.Now())
if counterTS < now {
counterTS = now
counter = now << 30
}
counter++
nonce.Counter = counter
nonce.ViaIP = route.ViaIP
nonce.DestIP = destIP
MarshalNonce(nonce, nonceBuf)
encrypted = encryptPacket(route.EncSharedKey, nonceBuf, packet, encrypted)
if route.ViaIP != 0 {
toSend = signPacket(signingKey, encrypted, packet)
} else {
toSend = encrypted
}
req := <-reqPool
req.Addr = route.Addr
req.Packet = req.Packet[:len(toSend)]
copy(req.Packet, toSend)
writeChan <- req
}
}
type udpWriteReq struct {
Addr netip.AddrPort
Packet []byte
}
func udpWriter(in chan udpWriteReq, conn *net.UDPConn, reqPool chan udpWriteReq) {
var err error
for req := range in {
if _, err = conn.WriteToUDPAddrPort(req.Packet, req.Addr); err != nil {
log.Fatalf("Failed to write UDP packet: %v", err)
}
reqPool <- req
sender.send(PACKET_TYPE_DATA, packet, route)
}
}

View File

@ -2,8 +2,8 @@ package peer
import (
"fmt"
"io"
"log"
"net/netip"
"runtime/debug"
"vppn/fasttime"
)
@ -16,35 +16,26 @@ func (peer *Peer) netReader() {
}()
var (
n int
//srcAddr *net.UDPAddr
n int
srcAddr netip.AddrPort
nonce Nonce
packet = make([]byte, BUFFER_SIZE)
decrypted = make([]byte, BUFFER_SIZE)
toWrite []byte
route *route
ok bool
err error
conn = peer.conn
ip = peer.ip
counters = [2][256]uint64{} // Counter by stream and IP.
ifaceChan = make(chan []byte, 1024)
reqPool = make(chan []byte, 1024)
)
for range cap(reqPool) {
reqPool <- make([]byte, BUFFER_SIZE)
}
go ifWriter(ifaceChan, peer.iface, reqPool)
NEXT_PACKET:
n, _, err = conn.ReadFromUDPAddrPort(packet[:BUFFER_SIZE])
n, srcAddr, err = conn.ReadFromUDPAddrPort(packet[:BUFFER_SIZE])
if err != nil {
log.Fatalf("Failed to read UDP packet: %v", err)
}
srcAddr = netip.AddrPortFrom(srcAddr.Addr().Unmap(), srcAddr.Port())
if n < NONCE_SIZE {
log.Printf("Dropping short UDP packet: %d", n)
@ -52,8 +43,7 @@ NEXT_PACKET:
}
packet = packet[:n]
ParseNonceBytes(packet[n-NONCE_SIZE:], &nonce)
nonce.Parse(packet[n-NONCE_SIZE:])
// Drop after 8 seconds.
if CounterTimestamp(nonce.Counter) < fasttime.Now()-8 {
@ -62,13 +52,13 @@ NEXT_PACKET:
}
if nonce.StreamID > 1 {
log.Printf("Dropping invalid stream ID: %v", nonce)
log.Printf("Dropping invalid stream ID: %+v", nonce)
goto NEXT_PACKET
}
// Check source counter.
if nonce.Counter <= counters[nonce.StreamID][nonce.SourceIP] {
log.Printf("Dropping packet with bad counter: %v", nonce)
log.Printf("Dropping packet with bad counter: %+v", nonce)
goto NEXT_PACKET
}
@ -76,7 +66,7 @@ NEXT_PACKET:
route = peer.router.GetRoute(nonce.SourceIP)
if route == nil {
log.Printf("Dropping packet without route: %v", nonce)
log.Printf("Dropping packet without route: %+v", nonce)
goto NEXT_PACKET
}
@ -86,7 +76,7 @@ NEXT_PACKET:
case nonce.ViaIP:
goto VALIDATE_SIGNATURE
default:
log.Printf("Bad packet: %v", nonce)
log.Printf("Bad packet: %+v", nonce)
goto NEXT_PACKET
}
@ -110,49 +100,41 @@ DECRYPT:
WRITE_IFACE_DATA:
//toWrite = toWrite[:len(decrypted)]
//copy(toWrite, decrypted)
toWrite = <-reqPool
ifaceChan <- append(toWrite[:0], decrypted...)
if _, err = peer.iface.Write(decrypted); err != nil {
log.Fatalf("Failed to write to interface: %v", err)
}
goto NEXT_PACKET
WRITE_ROUTING_PACKET:
//peer.processRoutingPacket(decrypted)
//peer.routeManager.ProcessPacket(decrypted)
peer.router.HandlePacket(srcAddr, nonce, decrypted)
goto NEXT_PACKET
VALIDATE_SIGNATURE:
// We don't forward twice.
if route.Mediator {
log.Printf("Dropping double-forward packet: %v", nonce)
goto NEXT_PACKET
}
decrypted, ok = openPacket(route.SignPubKey, packet, decrypted)
if !ok {
log.Printf("Failed to open signed packet: %v", nonce)
goto NEXT_PACKET
}
route = peer.router.GetRoute(nonce.DestIP)
if route == nil || !route.Up {
log.Printf("Dropping mediated packet, route not available: %v", nonce)
goto NEXT_PACKET
}
// We don't forward twice.
if route.ViaIP != 0 {
log.Printf("Dropping double-forward packet: %v", nonce)
goto NEXT_PACKET
}
if _, err = conn.WriteToUDPAddrPort(decrypted, route.Addr); err != nil {
log.Fatalf("Failed to forward packet: %v", err)
}
goto NEXT_PACKET
}
func ifWriter(in chan []byte, iface io.ReadWriteCloser, out chan []byte) {
var err error
for packet := range in {
if _, err = iface.Write(packet); err != nil {
log.Printf("Size: %d", len(packet))
log.Fatalf("Failed to write to interface: %v", err)
}
out <- packet
}
}

View File

@ -4,14 +4,15 @@ import (
"io"
"log"
"net"
"net/netip"
)
type Peer struct {
// Immutable data.
ip byte // Last byte of IPv4 address.
hubAddr string
apiKey string
ip byte // Last byte of IPv4 address.
hubAddr string
apiKey string
isServer bool
isMediator bool
encPubKey []byte
encPrivKey []byte
signPubKey []byte
@ -31,6 +32,7 @@ func NewPeer(netName, listenIP string, port uint16) (*Peer, error) {
peer := &Peer{
ip: conf.PeerIP,
hubAddr: conf.HubAddress,
isMediator: conf.Mediator,
apiKey: conf.APIKey,
encPubKey: conf.EncPubKey,
encPrivKey: conf.EncPrivKey,
@ -38,16 +40,16 @@ func NewPeer(netName, listenIP string, port uint16) (*Peer, error) {
signPrivKey: conf.SignPrivKey,
}
peer.router = NewRouter(conf)
_, peer.isServer = netip.AddrFromSlice(conf.PublicIP)
port = determinePort(conf.Port, port)
conn, err := openUDPConn(listenIP, port)
peer.conn, err = openUDPConn(listenIP, port)
if err != nil {
return nil, err
}
peer.conn = conn
peer.router = NewRouter(conf, peer.conn)
peer.iface, err = openInterface(conf.Network, conf.PeerIP, netName)
if err != nil {

View File

@ -0,0 +1,45 @@
package peer
import (
"math/rand"
"time"
)
func (r *Router) manageMediator() {
var (
ip = byte(0)
mediators = make([]*route, 0, 32)
)
for range time.Tick(8 * time.Second) {
// If the current mediator is up, keep it.
route := r.routes[ip].Load()
if route != nil && route.Up {
continue
}
// If the current mediator is up, keep it.
mediators = mediators[:0]
for i := range r.routes {
route := r.routes[i].Load()
if route == nil || !route.Mediator {
continue
}
mediators = append(mediators, route)
}
if len(mediators) == 0 {
ip = 0
} else {
ip = mediators[rand.Intn(len(mediators))].PeerIP
}
for _, conn := range r.conns {
if conn != nil {
conn.UpdateMediator(ip)
}
}
}
}

View File

@ -1,18 +0,0 @@
package peer
type RoutingPingReq struct {
PeerIP byte
Type byte // 0 => local, 1 => direct, 2 => Via
Addr []byte
Port int
SentAt int64 // unix milli
}
type RoutingPingResp struct {
PeerIP byte
Type byte // 0 => local, 1 => direct, 2 => Via
Addr []byte
Port int
SentAt int64
RecvdAt int64
}

37
peer/router-ping_test.go Normal file
View File

@ -0,0 +1,37 @@
package peer
import (
"reflect"
"testing"
)
func TestMarshalParsePingReq(t *testing.T) {
in := Ping{
SentAt: 4553252,
}
buf := make([]byte, PING_SIZE)
in.Marshal(buf)
out := Ping{}
out.Parse(buf)
if !reflect.DeepEqual(in, out) {
t.Fatal(in, out)
}
}
func TestMarshalParsePingResp(t *testing.T) {
in := Pong{
SentAt: 4553252,
RecvdAt: 4553253,
}
buf := make([]byte, PONG_SIZE)
in.Marshal(buf)
out := Pong{}
out.Parse(buf)
if !reflect.DeepEqual(in, out) {
t.Fatal(in, out)
}
}

63
peer/router-pollhub.go Normal file
View File

@ -0,0 +1,63 @@
package peer
import (
"encoding/json"
"io"
"log"
"net/http"
"net/url"
"time"
"vppn/m"
)
func (r *Router) pollHub() {
u, err := url.Parse(r.conf.HubAddress)
if err != nil {
log.Fatalf("Failed to parse hub address %s: %v", r.conf.HubAddress, err)
}
u.Path = "/peer/fetch-state/"
client := &http.Client{Timeout: 8 * time.Second}
req := &http.Request{
Method: http.MethodGet,
URL: u,
Header: http.Header{},
}
req.SetBasicAuth("", r.conf.APIKey)
// TODO: Before we start polling, load state from the file system.
r._pollHub(client, req)
for range time.Tick(32 * time.Second) {
r._pollHub(client, req)
}
}
func (r *Router) _pollHub(client *http.Client, req *http.Request) {
var state m.NetworkState
log.Printf("Fetching peer state from %s...", r.conf.HubAddress)
resp, err := client.Do(req)
if err != nil {
log.Printf("Failed to fetch peer state: %v", err)
return
}
body, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
log.Printf("Failed to read body from hub: %v", err)
return
}
if err := json.Unmarshal(body, &state); err != nil {
log.Printf("Failed to unmarshal response from hub: %v", err)
return
}
for i, peer := range state.Peers {
if r.conns[i] != nil {
r.conns[i].UpdatePeer(peerUpdate{PeerIP: byte(i), Peer: peer})
}
}
}

74
peer/router-types.go Normal file
View File

@ -0,0 +1,74 @@
package peer
import (
"net/netip"
"unsafe"
"vppn/m"
)
// ----------------------------------------------------------------------------
// A route is used by a peer to send or receive packets. A peer won't send
// packets on a route that isn't up, but will process incoming packets on
// that route.
type route struct {
PeerIP byte
Up bool
Mediator bool
SignPubKey []byte
EncSharedKey []byte // Shared key for encoding / decoding packets.
Addr netip.AddrPort // Address to send to.
ViaIP byte // If != 0, this is a forwarding address.
}
type peerUpdate struct {
PeerIP byte
*m.Peer // nil => delete.
}
// ----------------------------------------------------------------------------
// Wrapper for routing packets.
type wrapper[T any] struct {
T T
Nonce Nonce
SrcAddr netip.AddrPort
}
func newWrapper[T any](srcAddr netip.AddrPort, nonce Nonce) wrapper[T] {
return wrapper[T]{
SrcAddr: srcAddr,
Nonce: nonce,
}
}
// ----------------------------------------------------------------------------
type Ping struct {
SentAt int64 // unix milli
}
func (p *Ping) Parse(buf []byte) {
p.SentAt = *(*int64)(unsafe.Pointer(&buf[0]))
}
func (p Ping) Marshal(buf []byte) {
*(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt
}
// ----------------------------------------------------------------------------
type Pong struct {
SentAt int64 // unix mili
RecvdAt int64 // unix mili
}
func (p *Pong) Parse(buf []byte) {
p.SentAt = *(*int64)(unsafe.Pointer(&buf[0]))
p.RecvdAt = *(*int64)(unsafe.Pointer(&buf[8]))
}
func (p *Pong) Marshal(buf []byte) {
*(*int64)(unsafe.Pointer(&buf[0])) = p.SentAt
*(*int64)(unsafe.Pointer(&buf[8])) = p.RecvdAt
}

View File

@ -1,123 +1,86 @@
package peer
import (
"encoding/json"
"io"
"log"
"net/http"
"net"
"net/netip"
"net/url"
"sync/atomic"
"time"
"vppn/m"
)
var zeroAddrPort netip.AddrPort
type routeInfo struct {
Up bool
Route route
}
type Router struct {
conf m.PeerConfig
routes [256]*atomic.Pointer[routeInfo]
conf m.PeerConfig
// Routes used by the peer.
conns [256]*connHandler
routes [256]*atomic.Pointer[route]
}
func NewRouter(conf m.PeerConfig) *Router {
rm := &Router{
conf: conf,
func NewRouter(conf m.PeerConfig, conn *net.UDPConn) *Router {
r := &Router{conf: conf}
for i := range r.routes {
r.routes[i] = &atomic.Pointer[route]{}
}
for i := range rm.routes {
rm.routes[i] = &atomic.Pointer[routeInfo]{}
rm.routes[i].Store(&routeInfo{})
_, isServer := netip.AddrFromSlice(conf.PublicIP)
sender := newConnSender(conn, conf.PeerIP, STREAM_ROUTING, conf.SignPrivKey)
for i := range r.conns {
if byte(i) != conf.PeerIP {
r.conns[i] = newConnHandler(
isServer,
byte(i),
r.routes,
conf.EncPrivKey,
newSafeConnSender(sender))
}
}
go rm.pollHub()
return rm
go r.pollHub()
if !isServer {
go r.manageMediator()
}
return r
}
// ----------------------------------------------------------------------------
// Peer Methods
// ----------------------------------------------------------------------------
func (rm *Router) GetRoute(ip byte) *route {
if route := rm.routes[ip].Load(); route != nil && route.Up {
return &route.Route
}
return nil
return rm.routes[ip].Load()
}
func (rm *Router) pollHub() {
u, err := url.Parse(rm.conf.HubAddress)
if err != nil {
log.Fatalf("Failed to parse hub address %s: %v", rm.conf.HubAddress, err)
}
u.Path = "/peer/fetch-state/"
client := &http.Client{Timeout: 8 * time.Second}
req := &http.Request{
Method: http.MethodGet,
URL: u,
Header: http.Header{},
}
req.SetBasicAuth("", rm.conf.APIKey)
rm._pollHub(client, req)
for range time.Tick(time.Minute) {
rm._pollHub(client, req)
}
}
func (rm *Router) _pollHub(client *http.Client, req *http.Request) {
var state m.NetworkState
log.Printf("Fetching peer state from %s...", rm.conf.HubAddress)
resp, err := client.Do(req)
if err != nil {
log.Printf("Failed to fetch peer state: %v", err)
return
}
body, err := io.ReadAll(resp.Body)
_ = resp.Body.Close()
if err != nil {
log.Printf("Failed to read body: %v", err)
func (r *Router) HandlePacket(src netip.AddrPort, nonce Nonce, data []byte) {
if nonce.SourceIP == r.conf.PeerIP {
log.Printf("Packet to self...")
return
}
if err := json.Unmarshal(body, &state); err != nil {
log.Printf("Failed to unmarshal response from hub: %v", err)
return
}
for i, peer := range state.Peers {
if peer == nil {
continue
switch nonce.PacketType {
case PACKET_TYPE_PING:
if len(data) < PING_SIZE {
log.Printf("Short ping request: %d", len(data))
return
}
route := rm.routes[i].Load()
rm.routes[i].Store(rm.updateRoute(route, peer))
w := newWrapper[Ping](src, nonce)
w.T.Parse(data)
r.conns[nonce.SourceIP].HandlePing(w)
case PACKET_TYPE_PONG:
if len(data) < PONG_SIZE {
log.Printf("Short ping response: %d", len(data))
return
}
w := newWrapper[Pong](src, nonce)
w.T.Parse(data)
r.conns[nonce.SourceIP].HandlePong(w)
default:
log.Printf("Unknown routing packet type: %d", nonce.PacketType)
}
}
func (rm *Router) updateRoute(routePtr *routeInfo, peer *m.Peer) *routeInfo {
if peer == nil {
return &routeInfo{}
}
route := *routePtr
addr, ok := netip.AddrFromSlice(peer.IP)
if !ok {
return &routeInfo{}
}
route.Up = true
route.Route.Addr = netip.AddrPortFrom(addr, peer.Port)
route.Route.Mediator = peer.Mediator
route.Route.ViaIP = 0
if len(route.Route.SignPubKey) == 0 {
route.Route.SignPubKey = peer.SignPubKey
route.Route.EncSharedKey = computeSharedKey(peer.EncPubKey, rm.conf.EncPrivKey)
}
return &route
}

View File

@ -1,22 +0,0 @@
package peer
import (
"net/netip"
)
type route struct {
Mediator bool // Route is via a mediator.
SignPubKey []byte
EncSharedKey []byte // Shared key for encoding / decoding packets.
Addr netip.AddrPort // Address to send to.
ViaIP byte // If != 0, this is a forwarding address.
}
type Nonce struct {
Counter uint64
SourceIP byte
ViaIP byte
DestIP byte
StreamID byte // The stream, see STREAM_* constants
PacketType byte // The packet type. See PACKET_* constants.
}