package peer import ( "encoding/json" "io" "log" "net/http" "net/netip" "net/url" "time" "golang.zx2c4.com/wireguard/wgctrl/wgtypes" "vppn/m" ) const hubPollInterval = 64 * time.Second type HubPoller struct { selfVPNIP netip.Addr vpnNet netip.Prefix hubURL string apiKey string statePath string // where the network state cache is persisted addCh chan<- m.Peer removeCh chan<- wgtypes.Key known map[wgtypes.Key]struct{} // pubKeys currently configured } func NewHubPoller( selfVPNIP netip.Addr, vpnNet netip.Prefix, hubURL, apiKey string, statePath string, addCh chan<- m.Peer, removeCh chan<- wgtypes.Key, ) (*HubPoller, error) { u, err := url.Parse(hubURL) if err != nil { return nil, err } u.Path = "/peer/fetch-state/" return &HubPoller{ selfVPNIP: selfVPNIP, vpnNet: vpnNet, hubURL: u.String(), apiKey: apiKey, statePath: statePath, addCh: addCh, removeCh: removeCh, known: make(map[wgtypes.Key]struct{}), }, nil } func (hp *HubPoller) Run() { // Prime from the on-disk cache before reaching the hub, so the peer // configures WireGuard from its last known state even if the hub is down. // known starts empty, so this emits every cached peer as an add; the first // real poll then emits only deltas (adds for new peers, removes for gone). if state, err := loadNetworkState(hp.statePath); err == nil { hp.apply(state) } hp.poll() for range time.Tick(hubPollInterval) { hp.poll() } } func (hp *HubPoller) poll() { req, err := http.NewRequest(http.MethodGet, hp.hubURL, nil) if err != nil { log.Printf("[HubPoller] build request: %v", err) return } req.SetBasicAuth("", hp.apiKey) client := &http.Client{Timeout: 32 * time.Second} resp, err := client.Do(req) if err != nil { log.Printf("[HubPoller] fetch: %v", err) return } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { log.Printf("[HubPoller] unexpected status %d", resp.StatusCode) return } body, err := io.ReadAll(resp.Body) if err != nil { log.Printf("[HubPoller] read body: %v", err) return } var state m.NetworkState if err := json.Unmarshal(body, &state); err != nil { log.Printf("[HubPoller] unmarshal: %v", err) return } // Persist only when the state actually changed, to avoid needless writes // on every poll. if hp.apply(state) { if err := saveNetworkState(hp.statePath, state); err != nil { log.Printf("[HubPoller] save state: %v", err) } } } // apply diffs state against the set of known peers, emitting an add for each // newly-seen peer and a remove for each that disappeared. It returns true if // anything changed. A peer's config is immutable under a stable WG key (the hub // has no peer-edit path), so a key already in known needs no re-emit. func (hp *HubPoller) apply(state m.NetworkState) (changed bool) { seen := make(map[wgtypes.Key]struct{}, len(hp.known)) netAddr := hp.vpnNet.Addr().As4() for _, p := range state.Peers { if p.WGPubKey == (wgtypes.Key{}) { continue } octets := netAddr octets[3] = p.PeerIP vpnIP := netip.AddrFrom4(octets) if vpnIP == hp.selfVPNIP { continue } seen[p.WGPubKey] = struct{}{} if _, ok := hp.known[p.WGPubKey]; ok { continue } hp.known[p.WGPubKey] = struct{}{} hp.addCh <- p changed = true } for key := range hp.known { if _, ok := seen[key]; !ok { delete(hp.known, key) hp.removeCh <- key changed = true } } return changed }