vppn/peer/remote.go
2025-08-26 19:12:07 +02:00

328 lines
7.8 KiB
Go

package peer
import (
"fmt"
"log"
"net/netip"
"strings"
"sync/atomic"
"vppn/m"
)
// ----------------------------------------------------------------------------
// The remoteConfig is the shared, immutable configuration for a remote
// peer. It's read and written atomically. See remote.config.
// ----------------------------------------------------------------------------
type remoteConfig struct {
Up bool // True if peer is up and we can send data.
Server bool // True if role is server.
Direct bool // True if this is a direct connection.
DirectAddr netip.AddrPort // Remote address if directly connected.
ControlCipher *controlCipher
DataCipher *dataCipher
Peer *m.Peer
}
// CanRelay returns true if the remote configuration is able to relay packets.
// to other hosts.
func (rc remoteConfig) CanRelay() bool {
return rc.Up && rc.Direct && rc.Peer.Relay
}
// A Remote represents a remote peer and contains functions for handling
// incoming control, data, and multicast packets, peer udpates, as well as
// sending, forwarding, and relaying packets.
type Remote struct {
Globals
RemotePeerIP byte // Immutable.
dupCheck *dupCheck
sendCounter uint64 // init to startupCount << 48. Atomic access only.
// config should be accessed via conf() and updateConf(...) methods.
config atomic.Pointer[remoteConfig]
messages chan any
}
func newRemote(g Globals, remotePeerIP byte) *Remote {
r := &Remote{
Globals: g,
RemotePeerIP: remotePeerIP,
dupCheck: newDupCheck(0),
sendCounter: (uint64(g.StartupCount) << 48) + 1,
messages: make(chan any, 8),
}
r.config.Store(&remoteConfig{})
return r
}
// ----------------------------------------------------------------------------
func (r *Remote) conf() remoteConfig {
return *(r.config.Load())
}
func (r *Remote) updateConf(conf remoteConfig) {
old := r.config.Load()
r.config.Store(&conf)
if !old.CanRelay() && conf.CanRelay() {
r.RelayHandler.Add(r)
}
if old.CanRelay() && !conf.CanRelay() {
r.RelayHandler.Remove(r)
}
}
// ----------------------------------------------------------------------------
func (r *Remote) sendUDP(b []byte, addr netip.AddrPort) {
if _, err := r.SendUDP(b, addr); err != nil {
r.logf("Failed to send UDP packet: %v", err)
}
}
// ----------------------------------------------------------------------------
func (r *Remote) encryptData(conf remoteConfig, destIP byte, packet []byte) []byte {
h := Header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&r.sendCounter, 1),
SourceIP: r.Globals.LocalPeerIP,
DestIP: destIP,
}
return conf.DataCipher.Encrypt(h, packet, packet[len(packet):cap(packet)])
}
func (r *Remote) encryptControl(conf remoteConfig, packet []byte) []byte {
h := Header{
StreamID: controlStreamID,
Counter: atomic.AddUint64(&r.sendCounter, 1),
SourceIP: r.LocalPeerIP,
DestIP: r.RemotePeerIP,
}
return conf.ControlCipher.Encrypt(h, packet, packet[len(packet):cap(packet)])
}
// ----------------------------------------------------------------------------
// SendDataTo sends a data packet to the remote, called by the IFReader.
func (r *Remote) SendDataTo(data []byte) {
conf := r.conf()
if !conf.Up {
r.logf("Cannot send: link down")
return
}
// Direct:
if conf.Direct {
r.sendUDP(r.encryptData(conf, conf.Peer.PeerIP, data), conf.DirectAddr)
return
}
// Relayed:
relay := r.RelayHandler.Load()
if relay == nil {
r.logf("Connot send: no relay")
return
}
relay.relayData(conf.Peer.PeerIP, r.encryptData(conf, conf.Peer.PeerIP, data))
}
func (r *Remote) relayData(toIP byte, enc []byte) {
conf := r.conf()
if !conf.Up || !conf.Direct {
r.logf("Cannot relay: not up or not a direct connection")
return
}
r.sendUDP(r.encryptData(conf, toIP, enc), conf.DirectAddr)
}
func (r *Remote) sendControl(conf remoteConfig, data []byte) {
// Direct:
if conf.Direct {
enc := r.encryptControl(conf, data)
r.sendUDP(enc, conf.DirectAddr)
return
}
// Relayed:
relay := r.RelayHandler.Load()
if relay == nil {
r.logf("Connot send: no relay")
return
}
relay.relayData(conf.Peer.PeerIP, r.encryptControl(conf, data))
}
func (r *Remote) sendControlToAddr(buf []byte, addr netip.AddrPort) {
enc := r.encryptControl(r.conf(), buf)
r.sendUDP(enc, addr)
}
func (r *Remote) forwardPacket(data []byte) {
conf := r.conf()
if !conf.Up || !conf.Direct {
r.logf("Cannot forward to %d: not a direct connection", conf.Peer.PeerIP)
return
}
r.sendUDP(data, conf.DirectAddr)
}
// ----------------------------------------------------------------------------
// HandlePacket is called by the ConnReader to handle an incoming packet.
func (r *Remote) HandlePacket(h Header, srcAddr netip.AddrPort, data []byte) {
switch h.StreamID {
case controlStreamID:
r.handleControlPacket(h, srcAddr, data)
case dataStreamID:
r.handleDataPacket(h, data)
default:
r.logf("Unknown stream ID: %d", h.StreamID)
}
}
// Handle a control packet. Decrypt, verify, etc.
func (r *Remote) handleControlPacket(h Header, srcAddr netip.AddrPort, data []byte) {
conf := r.conf()
if conf.ControlCipher == nil {
r.logf("No control cipher")
return
}
dec, ok := conf.ControlCipher.Decrypt(data, data[len(data):cap(data)])
if !ok {
r.logf("Failed to decrypt control packet")
return
}
if r.dupCheck.IsDup(h.Counter) {
r.logf("Dropping control packet as duplicate: %d", h.Counter)
return
}
msg, err := parseControlMsg(h.SourceIP, srcAddr, dec)
if err != nil {
r.logf("Failed to parse control packet: %v", err)
return
}
select {
case r.messages <- msg:
default:
r.logf("Dropping control message")
}
}
func (r *Remote) handleDataPacket(h Header, data []byte) {
conf := r.conf()
if conf.DataCipher == nil {
return
}
dec, ok := conf.DataCipher.Decrypt(data, data[len(data):cap(data)])
if !ok {
r.logf("Failed to decrypt data packet")
return
}
if r.dupCheck.IsDup(h.Counter) {
r.logf("Dropping data packet as duplicate: %d", h.Counter)
return
}
// For local.
if h.DestIP == r.LocalPeerIP {
if _, err := r.IFace.Write(dec); err != nil {
// This could be a malformed packet from a peer, so we don't crash if it
// happens.
r.logf("Failed to write to interface: %v", err)
}
return
}
// Forward.
dest := r.RemotePeers[h.DestIP].Load()
dest.forwardPacket(dec)
}
// ----------------------------------------------------------------------------
// HandleLocalDiscoveryPacket is called by the MCReader.
func (r *Remote) HandleLocalDiscoveryPacket(h Header, srcAddr netip.AddrPort, data []byte) {
conf := r.conf()
if conf.Peer.PubSignKey == nil {
r.logf("No signing key for discovery packet.")
return
}
if !verifyLocalDiscoveryPacket(data, data[len(data):cap(data)], conf.Peer.PubSignKey) {
r.logf("Invalid signature on discovery packet.")
return
}
msg := controlMsg[packetLocalDiscovery]{
SrcIP: h.SourceIP,
SrcAddr: srcAddr,
}
select {
case r.messages <- msg:
default:
r.logf("Dropping discovery message.")
}
}
// ----------------------------------------------------------------------------
// HandlePeerUpdate is called by the HubPoller when it gets a new version of
// the associated peer configuration.
func (r *Remote) HandlePeerUpdate(msg peerUpdateMsg) {
r.messages <- msg
}
// ----------------------------------------------------------------------------
func (s *Remote) logf(format string, args ...any) {
conf := s.conf()
b := strings.Builder{}
name := ""
if conf.Peer != nil {
name = conf.Peer.Name
}
b.WriteString(fmt.Sprintf("%03d", s.RemotePeerIP))
b.WriteString(fmt.Sprintf("%30s: ", name))
if conf.Server {
b.WriteString("SERVER | ")
} else {
b.WriteString("CLIENT | ")
}
if conf.Direct {
b.WriteString("DIRECT | ")
} else {
b.WriteString("RELAYED | ")
}
if conf.Up {
b.WriteString("UP | ")
} else {
b.WriteString("DOWN | ")
}
log.Printf(b.String()+format, args...)
}