package peer import ( "log" "sync/atomic" "time" "git.crumpington.com/lib/go/ratelimiter" ) // ---------------------------------------------------------------------------- type Supervisor struct { messages chan any // Incoming control messages. peers [256]PeerState pubAddrs *pubAddrStore rt *atomic.Pointer[RoutingTable] staged RoutingTable } func NewSupervisor( sendControl func(RemotePeer, Marshaller), privKey []byte, rt *atomic.Pointer[RoutingTable], ) *Supervisor { s := &Supervisor{ messages: make(chan any, 1024), pubAddrs: newPubAddrStore(rt.Load().LocalAddr), rt: rt, } routes := rt.Load() for i := range s.peers { state := &State{ publish: s.Publish, sendControlPacket: sendControl, localIP: routes.LocalIP, remoteIP: byte(i), privKey: privKey, localAddr: routes.LocalAddr, pubAddrs: s.pubAddrs, staged: routes.Peers[i], limiter: ratelimiter.New(ratelimiter.Config{ FillPeriod: 20 * time.Millisecond, MaxWaitCount: 1, }), } s.peers[i] = state.OnPeerUpdate(nil) } return s } func (s *Supervisor) HandleControlMsg(msg any) { select { case s.messages <- msg: default: } } func (s *Supervisor) Run() { for raw := range s.messages { switch msg := raw.(type) { case peerUpdateMsg: s.peers[msg.PeerIP] = s.peers[msg.PeerIP].OnPeerUpdate(msg.Peer) case controlMsg[PacketSyn]: if newState := s.peers[msg.SrcIP].OnSyn(msg); newState != nil { s.peers[msg.SrcIP] = newState } case controlMsg[PacketAck]: s.peers[msg.SrcIP].OnAck(msg) case controlMsg[PacketProbe]: if newState := s.peers[msg.SrcIP].OnProbe(msg); newState != nil { s.peers[msg.SrcIP] = newState } case controlMsg[PacketLocalDiscovery]: s.peers[msg.SrcIP].OnLocalDiscovery(msg) case pingTimerMsg: s.pubAddrs.Clean() for i := range s.peers { if newState := s.peers[i].OnPingTimer(); newState != nil { s.peers[i] = newState } } default: log.Printf("WARNING: unknown message type: %+v", msg) } } } func (s *Supervisor) Publish(rp RemotePeer) { s.staged.Peers[rp.IP] = rp rt := s.staged // Copy. s.rt.Store(&rt) }