package node import ( "encoding/json" "io" "log" "net/http" "net/netip" "net/url" "sync/atomic" "time" "vppn/m" ) type peer struct { Up bool // No data will be sent to peers that are down. Mediator bool Mediated bool IP byte Addr *netip.AddrPort // If we have direct connection, otherwise use mediator. SharedKey []byte } // ---------------------------------------------------------------------------- type routingTable struct { table [256]*atomic.Pointer[peer] mediator *atomic.Pointer[peer] } func newRoutingTable() *routingTable { r := routingTable{ mediator: &atomic.Pointer[peer]{}, } for i := range r.table { r.table[i] = &atomic.Pointer[peer]{} } return &r } func (r *routingTable) Get(ip byte) *peer { return r.table[ip].Load() } func (r *routingTable) Set(ip byte, p *peer) { r.table[ip].Store(p) } // ---------------------------------------------------------------------------- type router struct { *routingTable netName string peerSupers [256]*peerSupervisor } func newRouter(netName string, conf m.PeerConfig, routingData *routingTable, w *connWriter) *router { r := &router{ netName: netName, routingTable: routingData, } for i := range r.peerSupers { r.peerSupers[i] = newPeerSupervisor( conf, byte(i), w, r.routingTable) } go r.selectMediator() go r.pollHub(conf) return r } // ---------------------------------------------------------------------------- func (r *router) HandlePacket(sourceIP byte, remoteAddr netip.AddrPort, data []byte) { p := routingPacket{} if err := p.Parse(data); err != nil { log.Printf("Dropping malformed routing packet: %v", err) return } w := routingPacketWrapper{ routingPacket: p, Addr: remoteAddr, } r.peerSupers[sourceIP].HandlePacket(w) } // ---------------------------------------------------------------------------- func (r *router) pollHub(conf m.PeerConfig) { defer panicHandler() u, err := url.Parse(conf.HubAddress) if err != nil { log.Fatalf("Failed to parse hub address %s: %v", conf.HubAddress, err) } u.Path = "/peer/fetch-state/" client := &http.Client{Timeout: 8 * time.Second} req := &http.Request{ Method: http.MethodGet, URL: u, Header: http.Header{}, } req.SetBasicAuth("", conf.APIKey) state, err := loadNetworkState(r.netName) if err != nil { log.Printf("Failed to load network state: %v", err) log.Printf("Polling hub...") r._pollHub(conf, client, req) } else { r.applyNetworkState(conf, state) } for range time.Tick(64 * time.Second) { r._pollHub(conf, client, req) } } func (r *router) _pollHub(conf m.PeerConfig, client *http.Client, req *http.Request) { var state m.NetworkState log.Printf("Fetching peer state from %s...", conf.HubAddress) resp, err := client.Do(req) if err != nil { log.Printf("Failed to fetch peer state: %v", err) return } body, err := io.ReadAll(resp.Body) _ = resp.Body.Close() if err != nil { log.Printf("Failed to read body from hub: %v", err) return } if err := json.Unmarshal(body, &state); err != nil { log.Printf("Failed to unmarshal response from hub: %v", err) return } r.applyNetworkState(conf, state) if err := storeNetworkState(r.netName, state); err != nil { log.Printf("Failed to store network state: %v", err) } } func (r *router) applyNetworkState(conf m.PeerConfig, state m.NetworkState) { for i := range state.Peers { if i != int(conf.PeerIP) { r.peerSupers[i].HandlePeerUpdate(state.Peers[i]) } } } // ---------------------------------------------------------------------------- func (r *router) selectMediator() { for range time.Tick(8 * time.Second) { current := r.mediator.Load() if current != nil && current.Up { continue } for i := range r.table { peer := r.table[i].Load() if peer != nil && peer.Up && peer.Mediator { log.Printf("Got mediator: %v", *peer) r.mediator.Store(peer) return } } r.mediator.Store(nil) } }