WIP
This commit is contained in:
350
peer/remote.go
Normal file
350
peer/remote.go
Normal file
@@ -0,0 +1,350 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/netip"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
"vppn/m"
|
||||
|
||||
"git.crumpington.com/lib/go/ratelimiter"
|
||||
)
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// The remoteConfig is the shared, immutable configuration for a remote
|
||||
// peer. It's read and written atomically. See remote.config.
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
type remoteConfig struct {
|
||||
Up bool // True if peer is up and we can send data.
|
||||
Server bool // True if role is server.
|
||||
Direct bool // True if this is a direct connection.
|
||||
DirectAddr netip.AddrPort // Remote address if directly connected.
|
||||
ControlCipher *controlCipher
|
||||
DataCipher *dataCipher
|
||||
Peer *m.Peer
|
||||
}
|
||||
|
||||
// CanRelay returns true if the remote configuration is able to relay packets.
|
||||
// to other hosts.
|
||||
func (rc remoteConfig) CanRelay() bool {
|
||||
return rc.Up && rc.Direct && rc.Peer.Relay
|
||||
}
|
||||
|
||||
// A Remote represents a remote peer and contains functions for handling
|
||||
// incoming control, data, and multicast packets, peer udpates, as well as
|
||||
// sending, forwarding, and relaying packets.
|
||||
type Remote struct {
|
||||
Globals
|
||||
RemotePeerIP byte // Immutable.
|
||||
|
||||
limiter *ratelimiter.Limiter
|
||||
dupCheck *dupCheck
|
||||
sendCounter uint64 // init to startupCount << 48. Atomic access only.
|
||||
|
||||
// config should be accessed via conf() and updateConf(...) methods.
|
||||
config atomic.Pointer[remoteConfig]
|
||||
messages chan any
|
||||
}
|
||||
|
||||
func newRemote(g Globals, remotePeerIP byte) *Remote {
|
||||
r := &Remote{
|
||||
Globals: g,
|
||||
RemotePeerIP: remotePeerIP,
|
||||
limiter: ratelimiter.New(ratelimiter.Config{
|
||||
FillPeriod: 20 * time.Millisecond,
|
||||
MaxWaitCount: 1,
|
||||
}),
|
||||
dupCheck: newDupCheck(0),
|
||||
sendCounter: uint64(time.Now().Unix()<<30) + 1,
|
||||
messages: make(chan any, 8),
|
||||
}
|
||||
r.config.Store(&remoteConfig{})
|
||||
return r
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (r *Remote) conf() remoteConfig {
|
||||
return *(r.config.Load())
|
||||
}
|
||||
|
||||
func (r *Remote) updateConf(conf remoteConfig) {
|
||||
old := r.config.Load()
|
||||
r.config.Store(&conf)
|
||||
|
||||
if !old.CanRelay() && conf.CanRelay() {
|
||||
r.RelayHandler.Add(r)
|
||||
}
|
||||
|
||||
if old.CanRelay() && !conf.CanRelay() {
|
||||
r.RelayHandler.Remove(r)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (r *Remote) sendUDP(b []byte, addr netip.AddrPort) {
|
||||
if err := r.limiter.Limit(); err != nil {
|
||||
r.logf("Rate limiter")
|
||||
return
|
||||
}
|
||||
if _, err := r.SendUDP(b, addr); err != nil {
|
||||
r.logf("Failed to send URP packet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (r *Remote) encryptData(conf remoteConfig, packet []byte) []byte {
|
||||
h := Header{
|
||||
StreamID: dataStreamID,
|
||||
Counter: atomic.AddUint64(&r.sendCounter, 1),
|
||||
SourceIP: r.Globals.LocalPeerIP,
|
||||
DestIP: r.RemotePeerIP,
|
||||
}
|
||||
return conf.DataCipher.Encrypt(h, packet, packet[len(packet):cap(packet)])
|
||||
}
|
||||
|
||||
func (r *Remote) encryptControl(conf remoteConfig, packet []byte) []byte {
|
||||
h := Header{
|
||||
StreamID: controlStreamID,
|
||||
Counter: atomic.AddUint64(&r.sendCounter, 1),
|
||||
SourceIP: r.LocalPeerIP,
|
||||
DestIP: r.RemotePeerIP,
|
||||
}
|
||||
return conf.ControlCipher.Encrypt(h, packet, packet[len(packet):cap(packet)])
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// SendDataTo sends a data packet to the remote, called by the IFReader.
|
||||
func (r *Remote) SendDataTo(data []byte) {
|
||||
conf := r.conf()
|
||||
if !conf.Up {
|
||||
r.logf("Cannot send: link down")
|
||||
return
|
||||
}
|
||||
|
||||
if conf.Direct {
|
||||
r.sendDataDirect(conf, data)
|
||||
} else {
|
||||
r.sendDataRelayed(conf, data)
|
||||
}
|
||||
}
|
||||
|
||||
// sendDataRelayed sends data to the remote via the relay.
|
||||
func (r *Remote) sendDataRelayed(conf remoteConfig, data []byte) {
|
||||
relay := r.RelayHandler.Load()
|
||||
|
||||
if relay == nil {
|
||||
r.logf("Connot send: no relay")
|
||||
return
|
||||
}
|
||||
|
||||
relay.relayData(r.encryptData(conf, data))
|
||||
}
|
||||
|
||||
// sendDataDirect sends data to the remote directly.
|
||||
func (r *Remote) sendDataDirect(conf remoteConfig, data []byte) {
|
||||
r.logf("Sending data direct...")
|
||||
r.sendUDP(r.encryptData(conf, data), conf.DirectAddr)
|
||||
}
|
||||
|
||||
func (r *Remote) relayData(enc []byte) {
|
||||
conf := r.conf()
|
||||
if !conf.Up || !conf.Direct {
|
||||
r.logf("Cannot relay: not up or not a direct connection")
|
||||
return
|
||||
}
|
||||
r.sendDataDirect(conf, enc)
|
||||
}
|
||||
|
||||
func (r *Remote) sendControl(conf remoteConfig, data []byte) {
|
||||
if conf.Direct {
|
||||
r.sendControlDirect(conf, data)
|
||||
} else {
|
||||
r.sendControlRelayed(conf, data)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) sendControlToAddr(buf []byte, addr netip.AddrPort) {
|
||||
enc := r.encryptControl(r.conf(), buf)
|
||||
r.sendUDP(enc, addr)
|
||||
}
|
||||
|
||||
func (r *Remote) sendControlDirect(conf remoteConfig, data []byte) {
|
||||
r.logf("Sending control direct...")
|
||||
enc := r.encryptControl(conf, data)
|
||||
r.sendUDP(enc, conf.DirectAddr)
|
||||
}
|
||||
|
||||
func (r *Remote) sendControlRelayed(conf remoteConfig, data []byte) {
|
||||
r.logf("Sending control relayed...")
|
||||
relay := r.RelayHandler.Load()
|
||||
|
||||
if relay == nil {
|
||||
r.logf("Connot send: no relay")
|
||||
return
|
||||
}
|
||||
|
||||
relay.relayData(r.encryptControl(conf, data))
|
||||
}
|
||||
|
||||
func (r *Remote) forwardPacket(data []byte) {
|
||||
conf := r.conf()
|
||||
if !conf.Up || !conf.Direct {
|
||||
r.logf("Cannot forward to %d: not a direct connection", conf.Peer.PeerIP)
|
||||
return
|
||||
}
|
||||
r.sendUDP(data, conf.DirectAddr)
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// HandlePacket is called by the ConnReader to handle an incoming packet.
|
||||
func (r *Remote) HandlePacket(h Header, srcAddr netip.AddrPort, data []byte) {
|
||||
switch h.StreamID {
|
||||
case controlStreamID:
|
||||
r.handleControlPacket(h, srcAddr, data)
|
||||
case dataStreamID:
|
||||
r.handleDataPacket(h, data)
|
||||
default:
|
||||
r.logf("Unknown stream ID: %d", h.StreamID)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle a control packet. Decrypt, verify, etc.
|
||||
func (r *Remote) handleControlPacket(h Header, srcAddr netip.AddrPort, data []byte) {
|
||||
conf := r.conf()
|
||||
if conf.ControlCipher == nil {
|
||||
r.logf("No control cipher")
|
||||
return
|
||||
}
|
||||
|
||||
dec, ok := conf.ControlCipher.Decrypt(data, data[len(data):cap(data)])
|
||||
if !ok {
|
||||
r.logf("Failed to decrypt control packet")
|
||||
return
|
||||
}
|
||||
|
||||
if r.dupCheck.IsDup(h.Counter) {
|
||||
r.logf("Dropping control packet as duplicate: %d", h.Counter)
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := parseControlMsg(h.SourceIP, srcAddr, dec)
|
||||
if err != nil {
|
||||
r.logf("Failed to parse control packet: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case r.messages <- msg:
|
||||
default:
|
||||
r.logf("Dropping control message")
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Remote) handleDataPacket(h Header, data []byte) {
|
||||
conf := r.conf()
|
||||
if conf.DataCipher == nil {
|
||||
return
|
||||
}
|
||||
|
||||
dec, ok := conf.DataCipher.Decrypt(data, data[len(data):cap(data)])
|
||||
if !ok {
|
||||
r.logf("Failed to decrypt data packet")
|
||||
return
|
||||
}
|
||||
|
||||
if r.dupCheck.IsDup(h.Counter) {
|
||||
r.logf("Dropping data packet as duplicate: %d", h.Counter)
|
||||
return
|
||||
}
|
||||
|
||||
// For local.
|
||||
if h.DestIP == r.LocalPeerIP {
|
||||
if _, err := r.IFace.Write(dec); err != nil {
|
||||
log.Fatalf("Failed to write to interface: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Forward.
|
||||
dest := r.RemotePeers[h.DestIP].Load()
|
||||
dest.forwardPacket(dec)
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// HandleLocalDiscoveryPacket is called by the MCReader.
|
||||
func (r *Remote) HandleLocalDiscoveryPacket(h Header, srcAddr netip.AddrPort, data []byte) {
|
||||
conf := r.conf()
|
||||
if conf.Peer.PubSignKey == nil {
|
||||
r.logf("No signing key for discovery packet.")
|
||||
return
|
||||
}
|
||||
|
||||
if !verifyLocalDiscoveryPacket(data, data[len(data):cap(data)], conf.Peer.PubSignKey) {
|
||||
r.logf("Invalid signature on discovery packet.")
|
||||
return
|
||||
}
|
||||
|
||||
msg := controlMsg[packetLocalDiscovery]{
|
||||
SrcIP: h.SourceIP,
|
||||
SrcAddr: srcAddr,
|
||||
}
|
||||
r.logf("Got local discovery packet from %v.", srcAddr)
|
||||
|
||||
select {
|
||||
case r.messages <- msg:
|
||||
default:
|
||||
r.logf("Dropping discovery message.")
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// HandlePeerUpdate is called by the HubPoller when it gets a new version of
|
||||
// the associated peer configuration.
|
||||
func (r *Remote) HandlePeerUpdate(msg peerUpdateMsg) {
|
||||
r.messages <- msg
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (s *Remote) logf(format string, args ...any) {
|
||||
conf := s.conf()
|
||||
|
||||
b := strings.Builder{}
|
||||
name := ""
|
||||
if conf.Peer != nil {
|
||||
name = conf.Peer.Name
|
||||
}
|
||||
b.WriteString(fmt.Sprintf("%03d", s.RemotePeerIP))
|
||||
|
||||
b.WriteString(fmt.Sprintf("%30s: ", name))
|
||||
|
||||
if conf.Server {
|
||||
b.WriteString("SERVER | ")
|
||||
} else {
|
||||
b.WriteString("CLIENT | ")
|
||||
}
|
||||
|
||||
if conf.Direct {
|
||||
b.WriteString("DIRECT | ")
|
||||
} else {
|
||||
b.WriteString("RELAYED | ")
|
||||
}
|
||||
|
||||
if conf.Up {
|
||||
b.WriteString("UP | ")
|
||||
} else {
|
||||
b.WriteString("DOWN | ")
|
||||
}
|
||||
|
||||
log.Printf(b.String()+format, args...)
|
||||
}
|
||||
Reference in New Issue
Block a user