149 lines
2.9 KiB
Go
149 lines
2.9 KiB
Go
package peer
|
|
|
|
import (
|
|
"net/netip"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"git.crumpington.com/lib/go/ratelimiter"
|
|
)
|
|
|
|
type supervisor struct {
|
|
writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error)
|
|
staged routingTable
|
|
shared *atomic.Pointer[routingTable]
|
|
peers [256]*peerSuper
|
|
lock sync.Mutex
|
|
|
|
buf1 []byte
|
|
buf2 []byte
|
|
}
|
|
|
|
func newSupervisor(
|
|
writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error),
|
|
rt *atomic.Pointer[routingTable],
|
|
privKey []byte,
|
|
) *supervisor {
|
|
|
|
routes := rt.Load()
|
|
|
|
s := &supervisor{
|
|
writeToUDPAddrPort: writeToUDPAddrPort,
|
|
staged: *routes,
|
|
shared: rt,
|
|
buf1: newBuf(),
|
|
buf2: newBuf(),
|
|
}
|
|
|
|
pubAddrs := newPubAddrStore(routes.LocalAddr)
|
|
|
|
for i := range s.peers {
|
|
state := &pState{
|
|
publish: s.publish,
|
|
sendControlPacket: s.send,
|
|
pingTimer: time.NewTicker(timeoutInterval),
|
|
localIP: routes.LocalIP,
|
|
remoteIP: byte(i),
|
|
privKey: privKey,
|
|
localAddr: routes.LocalAddr,
|
|
pubAddrs: pubAddrs,
|
|
staged: routes.Peers[i],
|
|
limiter: ratelimiter.New(ratelimiter.Config{
|
|
FillPeriod: 20 * time.Millisecond,
|
|
MaxWaitCount: 1,
|
|
}),
|
|
}
|
|
s.peers[i] = newPeerSuper(state, state.pingTimer)
|
|
}
|
|
|
|
return s
|
|
}
|
|
|
|
func (s *supervisor) Start() {
|
|
for i := range s.peers {
|
|
go s.peers[i].Run()
|
|
}
|
|
}
|
|
|
|
func (s *supervisor) HandleControlMsg(destIP byte, msg any) {
|
|
s.peers[destIP].HandleControlMsg(msg)
|
|
}
|
|
|
|
func (s *supervisor) send(peer remotePeer, pkt marshaller) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
enc := peer.EncryptControlPacket(pkt, s.buf1, s.buf2)
|
|
if peer.Direct {
|
|
s.writeToUDPAddrPort(enc, peer.DirectAddr)
|
|
return
|
|
}
|
|
|
|
relay, ok := s.staged.GetRelay()
|
|
if !ok {
|
|
return
|
|
}
|
|
|
|
enc = relay.EncryptDataPacket(peer.IP, enc, s.buf1)
|
|
s.writeToUDPAddrPort(enc, relay.DirectAddr)
|
|
}
|
|
|
|
func (s *supervisor) publish(rp remotePeer) {
|
|
s.lock.Lock()
|
|
defer s.lock.Unlock()
|
|
|
|
s.staged.Peers[rp.IP] = rp
|
|
s.ensureRelay()
|
|
copy := s.staged
|
|
s.shared.Store(©)
|
|
}
|
|
|
|
func (s *supervisor) ensureRelay() {
|
|
if _, ok := s.staged.GetRelay(); ok {
|
|
return
|
|
}
|
|
|
|
// TODO: Random selection? Something else?
|
|
for _, peer := range s.staged.Peers {
|
|
if peer.Up && peer.Direct && peer.Relay {
|
|
s.staged.RelayIP = peer.IP
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
type peerSuper struct {
|
|
messages chan any
|
|
state peerState
|
|
pingTimer *time.Ticker
|
|
}
|
|
|
|
func newPeerSuper(state *pState, pingTimer *time.Ticker) *peerSuper {
|
|
return &peerSuper{
|
|
messages: make(chan any, 8),
|
|
state: initPeerState(state, nil),
|
|
pingTimer: pingTimer,
|
|
}
|
|
}
|
|
|
|
func (s *peerSuper) HandleControlMsg(msg any) {
|
|
select {
|
|
case s.messages <- msg:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (s *peerSuper) Run() {
|
|
for {
|
|
select {
|
|
case <-s.pingTimer.C:
|
|
s.state = s.state.OnMsg(pingTimerMsg{})
|
|
case raw := <-s.messages:
|
|
s.state = s.state.OnMsg(raw)
|
|
}
|
|
}
|
|
}
|