vppn/peer/supervisor.go
2025-02-10 19:11:30 +01:00

104 lines
2.2 KiB
Go

package peer
import (
"log"
"sync/atomic"
"time"
"git.crumpington.com/lib/go/ratelimiter"
)
// ----------------------------------------------------------------------------
type Supervisor struct {
messages chan any // Incoming control messages.
peers [256]PeerState
pubAddrs *pubAddrStore
rt *atomic.Pointer[RoutingTable]
staged RoutingTable
}
func NewSupervisor(
sendControl func(RemotePeer, Marshaller),
privKey []byte,
rt *atomic.Pointer[RoutingTable],
) *Supervisor {
s := &Supervisor{
messages: make(chan any, 1024),
pubAddrs: newPubAddrStore(rt.Load().LocalAddr),
rt: rt,
}
routes := rt.Load()
for i := range s.peers {
state := &State{
publish: s.Publish,
sendControlPacket: sendControl,
localIP: routes.LocalIP,
remoteIP: byte(i),
privKey: privKey,
localAddr: routes.LocalAddr,
pubAddrs: s.pubAddrs,
staged: routes.Peers[i],
limiter: ratelimiter.New(ratelimiter.Config{
FillPeriod: 20 * time.Millisecond,
MaxWaitCount: 1,
}),
}
s.peers[i] = state.OnPeerUpdate(nil)
}
return s
}
func (s *Supervisor) HandleControlMsg(msg any) {
select {
case s.messages <- msg:
default:
}
}
func (s *Supervisor) Run() {
for raw := range s.messages {
switch msg := raw.(type) {
case peerUpdateMsg:
s.peers[msg.PeerIP] = s.peers[msg.PeerIP].OnPeerUpdate(msg.Peer)
case controlMsg[PacketSyn]:
if newState := s.peers[msg.SrcIP].OnSyn(msg); newState != nil {
s.peers[msg.SrcIP] = newState
}
case controlMsg[PacketAck]:
s.peers[msg.SrcIP].OnAck(msg)
case controlMsg[PacketProbe]:
if newState := s.peers[msg.SrcIP].OnProbe(msg); newState != nil {
s.peers[msg.SrcIP] = newState
}
case controlMsg[PacketLocalDiscovery]:
s.peers[msg.SrcIP].OnLocalDiscovery(msg)
case pingTimerMsg:
s.pubAddrs.Clean()
for i := range s.peers {
if newState := s.peers[i].OnPingTimer(); newState != nil {
s.peers[i] = newState
}
}
default:
log.Printf("WARNING: unknown message type: %+v", msg)
}
}
}
func (s *Supervisor) Publish(rp RemotePeer) {
s.staged.Peers[rp.IP] = rp
rt := s.staged // Copy.
s.rt.Store(&rt)
}