vppn/peer/peersuper.go
2025-02-25 18:14:39 +01:00

149 lines
3.0 KiB
Go

package peer
import (
"net/netip"
"sync"
"sync/atomic"
"time"
"git.crumpington.com/lib/go/ratelimiter"
)
type supervisor struct {
writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error)
staged routingTable
shared *atomic.Pointer[routingTable]
peers [256]*peerSuper
lock sync.Mutex
buf1 []byte
buf2 []byte
}
func newSupervisor(
writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error),
rt *atomic.Pointer[routingTable],
privKey []byte,
) *supervisor {
routes := rt.Load()
s := &supervisor{
writeToUDPAddrPort: writeToUDPAddrPort,
staged: *routes,
shared: rt,
buf1: newBuf(),
buf2: newBuf(),
}
pubAddrs := newPubAddrStore(routes.LocalAddr)
for i := range s.peers {
state := &peerData{
publish: s.publish,
sendControlPacket: s.send,
pingTimer: time.NewTicker(timeoutInterval),
localIP: routes.LocalIP,
remoteIP: byte(i),
privKey: privKey,
localAddr: routes.LocalAddr,
pubAddrs: pubAddrs,
staged: routes.Peers[i],
limiter: ratelimiter.New(ratelimiter.Config{
FillPeriod: 20 * time.Millisecond,
MaxWaitCount: 1,
}),
}
s.peers[i] = newPeerSuper(state, state.pingTimer)
}
return s
}
func (s *supervisor) Start() {
for i := range s.peers {
go s.peers[i].Run()
}
}
func (s *supervisor) HandleControlMsg(destIP byte, msg any) {
s.peers[destIP].HandleControlMsg(msg)
}
func (s *supervisor) send(peer remotePeer, pkt marshaller) {
s.lock.Lock()
defer s.lock.Unlock()
enc := peer.EncryptControlPacket(pkt, s.buf1, s.buf2)
if peer.Direct {
s.writeToUDPAddrPort(enc, peer.DirectAddr)
return
}
relay, ok := s.staged.GetRelay()
if !ok {
return
}
enc = relay.EncryptDataPacket(peer.IP, enc, s.buf1)
s.writeToUDPAddrPort(enc, relay.DirectAddr)
}
func (s *supervisor) publish(rp remotePeer) {
s.lock.Lock()
defer s.lock.Unlock()
s.staged.Peers[rp.IP] = rp
s.ensureRelay()
copy := s.staged
s.shared.Store(&copy)
}
func (s *supervisor) ensureRelay() {
if _, ok := s.staged.GetRelay(); ok {
return
}
// TODO: Random selection? Something else?
for _, peer := range s.staged.Peers {
if peer.Up && peer.Direct && peer.Relay {
s.staged.RelayIP = peer.IP
return
}
}
}
// ----------------------------------------------------------------------------
type peerSuper struct {
messages chan any
state peerState
pingTimer *time.Ticker
}
func newPeerSuper(state *peerData, pingTimer *time.Ticker) *peerSuper {
return &peerSuper{
messages: make(chan any, 8),
state: initPeerState(state, nil),
pingTimer: pingTimer,
}
}
func (s *peerSuper) HandleControlMsg(msg any) {
select {
case s.messages <- msg:
default:
}
}
func (s *peerSuper) Run() {
for {
select {
case <-s.pingTimer.C:
s.state = s.state.OnMsg(pingTimerMsg{})
case raw := <-s.messages:
s.state = s.state.OnMsg(raw)
}
}
}