package peer import ( "net/netip" "sync" "sync/atomic" "time" "git.crumpington.com/lib/go/ratelimiter" ) type supervisor struct { writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error) staged routingTable shared *atomic.Pointer[routingTable] peers [256]*peerSuper lock sync.Mutex buf1 []byte buf2 []byte } func newSupervisor( writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error), rt *atomic.Pointer[routingTable], privKey []byte, ) *supervisor { routes := rt.Load() s := &supervisor{ writeToUDPAddrPort: writeToUDPAddrPort, staged: *routes, shared: rt, buf1: newBuf(), buf2: newBuf(), } pubAddrs := newPubAddrStore(routes.LocalAddr) for i := range s.peers { state := &peerData{ publish: s.publish, sendControlPacket: s.send, pingTimer: time.NewTicker(timeoutInterval), localIP: routes.LocalIP, remoteIP: byte(i), privKey: privKey, localAddr: routes.LocalAddr, pubAddrs: pubAddrs, staged: routes.Peers[i], limiter: ratelimiter.New(ratelimiter.Config{ FillPeriod: 20 * time.Millisecond, MaxWaitCount: 1, }), } s.peers[i] = newPeerSuper(state, state.pingTimer) } return s } func (s *supervisor) Start() { for i := range s.peers { go s.peers[i].Run() } } func (s *supervisor) HandleControlMsg(destIP byte, msg any) { s.peers[destIP].HandleControlMsg(msg) } func (s *supervisor) send(peer remotePeer, pkt marshaller) { s.lock.Lock() defer s.lock.Unlock() enc := peer.EncryptControlPacket(pkt, s.buf1, s.buf2) if peer.Direct { s.writeToUDPAddrPort(enc, peer.DirectAddr) return } relay, ok := s.staged.GetRelay() if !ok { return } enc = relay.EncryptDataPacket(peer.IP, enc, s.buf1) s.writeToUDPAddrPort(enc, relay.DirectAddr) } func (s *supervisor) publish(rp remotePeer) { s.lock.Lock() defer s.lock.Unlock() s.staged.Peers[rp.IP] = rp s.ensureRelay() copy := s.staged s.shared.Store(©) } func (s *supervisor) ensureRelay() { if _, ok := s.staged.GetRelay(); ok { return } // TODO: Random selection? Something else? for _, peer := range s.staged.Peers { if peer.Up && peer.Direct && peer.Relay { s.staged.RelayIP = peer.IP return } } } // ---------------------------------------------------------------------------- type peerSuper struct { messages chan any state peerState pingTimer *time.Ticker } func newPeerSuper(state *peerData, pingTimer *time.Ticker) *peerSuper { return &peerSuper{ messages: make(chan any, 8), state: initPeerState(state, nil), pingTimer: pingTimer, } } func (s *peerSuper) HandleControlMsg(msg any) { select { case s.messages <- msg: default: } } func (s *peerSuper) Run() { for { select { case <-s.pingTimer.C: s.state = s.state.OnMsg(pingTimerMsg{}) case raw := <-s.messages: s.state = s.state.OnMsg(raw) } } }