Refactor - now wireguard based. (#7)
This commit is contained in:
380
peer/remote.go
380
peer/remote.go
@@ -1,351 +1,75 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"net/netip"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"vppn/m"
|
||||
"time"
|
||||
|
||||
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
||||
|
||||
"vppn/peer/control"
|
||||
"vppn/peer/wginterface"
|
||||
)
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
// The remoteConfig is the shared, immutable configuration for a remote
|
||||
// peer. It's read and written atomically. See remote.config.
|
||||
// ----------------------------------------------------------------------------
|
||||
type PeerState string
|
||||
|
||||
type remoteConfig struct {
|
||||
Up bool // True if peer is up and we can send data.
|
||||
Server bool // True if role is server.
|
||||
Direct bool // True if this is a direct connection.
|
||||
DirectAddr netip.AddrPort // Remote address if directly connected.
|
||||
ControlCipher *controlCipher
|
||||
DataCipher *dataCipher
|
||||
Peer *m.Peer
|
||||
const (
|
||||
StateRelayed = PeerState("RELAY")
|
||||
StateProbing = PeerState("PROBE")
|
||||
StateDirect = PeerState("DIRECT")
|
||||
)
|
||||
|
||||
type Peer struct {
|
||||
wgPeer wgtypes.Peer
|
||||
VPNIP netip.Addr // VPN IP address.
|
||||
Name string // Human-readable DNS label.
|
||||
IsRelay bool // Peer is a relay.
|
||||
IsPublic bool // Peer has a public IP.
|
||||
EndpointV4 netip.AddrPort // Reported IPv4 endpoint.
|
||||
EndpointV6 netip.AddrPort // Reported IPv6 endpoint.
|
||||
RTT time.Duration // Round-trip time.
|
||||
State PeerState // Current routing state; updated on each devXxx call.
|
||||
Role control.Role // Client initiates pings; server responds.
|
||||
SignPubKey [32]byte // nacl/sign public key for verifying multicast beacons.
|
||||
}
|
||||
|
||||
// CanRelay returns true if the remote configuration is able to relay packets.
|
||||
// to other hosts.
|
||||
func (rc remoteConfig) CanRelay() bool {
|
||||
return rc.Up && rc.Direct && rc.Peer.Relay
|
||||
// PubKey is the wireguard public key.
|
||||
func (p *Peer) PubKey() wgtypes.Key {
|
||||
return p.wgPeer.PublicKey
|
||||
}
|
||||
|
||||
// A Remote represents a remote peer and contains functions for handling
|
||||
// incoming control, data, and multicast packets, peer udpates, as well as
|
||||
// sending, forwarding, and relaying packets.
|
||||
type Remote struct {
|
||||
Globals
|
||||
RemotePeerIP byte // Immutable.
|
||||
|
||||
dupCheck *dupCheck
|
||||
sendCounter uint64 // init to startupCount << 48. Atomic access only.
|
||||
|
||||
// config should be accessed via conf() and updateConf(...) methods.
|
||||
config atomic.Pointer[remoteConfig]
|
||||
messages chan any
|
||||
}
|
||||
|
||||
func newRemote(g Globals, remotePeerIP byte) *Remote {
|
||||
r := &Remote{
|
||||
Globals: g,
|
||||
RemotePeerIP: remotePeerIP,
|
||||
dupCheck: newDupCheck(0),
|
||||
sendCounter: (uint64(g.StartupCount) << 48) + 1,
|
||||
messages: make(chan any, 8),
|
||||
func (p *Peer) WGEndpoint() netip.AddrPort {
|
||||
ep := p.wgPeer.Endpoint
|
||||
if ep == nil {
|
||||
return netip.AddrPort{}
|
||||
}
|
||||
r.config.Store(&remoteConfig{})
|
||||
return r
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (r *Remote) conf() remoteConfig {
|
||||
return *(r.config.Load())
|
||||
}
|
||||
|
||||
func (r *Remote) updateConf(conf remoteConfig) {
|
||||
old := r.config.Load()
|
||||
r.config.Store(&conf)
|
||||
|
||||
if !old.CanRelay() && conf.CanRelay() {
|
||||
r.RelayHandler.Add(r)
|
||||
}
|
||||
|
||||
if old.CanRelay() && !conf.CanRelay() {
|
||||
r.RelayHandler.Remove(r)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (r *Remote) sendUDP(b []byte, addr netip.AddrPort) {
|
||||
if _, err := r.SendUDP(b, addr); err != nil {
|
||||
r.logf("Failed to send UDP packet: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (r *Remote) encryptData(conf remoteConfig, destIP byte, packet []byte) []byte {
|
||||
h := Header{
|
||||
StreamID: dataStreamID,
|
||||
Counter: atomic.AddUint64(&r.sendCounter, 1),
|
||||
SourceIP: r.Globals.LocalPeerIP,
|
||||
DestIP: destIP,
|
||||
}
|
||||
return conf.DataCipher.Encrypt(h, packet, packet[len(packet):cap(packet)])
|
||||
}
|
||||
|
||||
func (r *Remote) encryptControl(conf remoteConfig, packet []byte) []byte {
|
||||
h := Header{
|
||||
StreamID: controlStreamID,
|
||||
Counter: atomic.AddUint64(&r.sendCounter, 1),
|
||||
SourceIP: r.LocalPeerIP,
|
||||
DestIP: r.RemotePeerIP,
|
||||
}
|
||||
return conf.ControlCipher.Encrypt(h, packet, packet[len(packet):cap(packet)])
|
||||
}
|
||||
|
||||
func (r *Remote) Status() (RemoteStatus, bool) {
|
||||
conf := r.conf()
|
||||
if conf.Peer == nil {
|
||||
return RemoteStatus{}, false
|
||||
}
|
||||
|
||||
return RemoteStatus{
|
||||
PeerIP: conf.Peer.PeerIP,
|
||||
Up: conf.Up,
|
||||
Name: conf.Peer.Name,
|
||||
PublicIP: conf.Peer.PublicIP,
|
||||
Port: conf.Peer.Port,
|
||||
Relay: conf.Peer.Relay,
|
||||
Server: conf.Server,
|
||||
Direct: conf.Direct,
|
||||
DirectAddr: conf.DirectAddr,
|
||||
}, true
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// SendDataTo sends a data packet to the remote, called by the IFReader.
|
||||
func (r *Remote) SendDataTo(data []byte) {
|
||||
conf := r.conf()
|
||||
if !conf.Up {
|
||||
r.logf("Cannot send: link down")
|
||||
return
|
||||
}
|
||||
|
||||
// Direct:
|
||||
|
||||
if conf.Direct {
|
||||
r.sendUDP(r.encryptData(conf, conf.Peer.PeerIP, data), conf.DirectAddr)
|
||||
return
|
||||
}
|
||||
|
||||
// Relayed:
|
||||
relay := r.RelayHandler.Load()
|
||||
|
||||
if relay == nil {
|
||||
r.logf("Connot send: no relay")
|
||||
return
|
||||
}
|
||||
|
||||
relay.relayData(conf.Peer.PeerIP, r.encryptData(conf, conf.Peer.PeerIP, data))
|
||||
}
|
||||
|
||||
func (r *Remote) relayData(toIP byte, enc []byte) {
|
||||
conf := r.conf()
|
||||
if !conf.Up || !conf.Direct {
|
||||
r.logf("Cannot relay: not up or not a direct connection")
|
||||
return
|
||||
}
|
||||
r.sendUDP(r.encryptData(conf, toIP, enc), conf.DirectAddr)
|
||||
}
|
||||
|
||||
func (r *Remote) sendControl(conf remoteConfig, data []byte) {
|
||||
// Direct:
|
||||
|
||||
if conf.Direct {
|
||||
enc := r.encryptControl(conf, data)
|
||||
r.sendUDP(enc, conf.DirectAddr)
|
||||
return
|
||||
}
|
||||
|
||||
// Relayed:
|
||||
|
||||
relay := r.RelayHandler.Load()
|
||||
|
||||
if relay == nil {
|
||||
r.logf("Connot send: no relay")
|
||||
return
|
||||
}
|
||||
|
||||
relay.relayData(conf.Peer.PeerIP, r.encryptControl(conf, data))
|
||||
}
|
||||
|
||||
func (r *Remote) sendControlToAddr(buf []byte, addr netip.AddrPort) {
|
||||
enc := r.encryptControl(r.conf(), buf)
|
||||
r.sendUDP(enc, addr)
|
||||
}
|
||||
|
||||
func (r *Remote) forwardPacket(data []byte) {
|
||||
conf := r.conf()
|
||||
if !conf.Up || !conf.Direct {
|
||||
r.logf("Cannot forward to %d: not a direct connection", conf.Peer.PeerIP)
|
||||
return
|
||||
}
|
||||
r.sendUDP(data, conf.DirectAddr)
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// HandlePacket is called by the ConnReader to handle an incoming packet.
|
||||
func (r *Remote) HandlePacket(h Header, srcAddr netip.AddrPort, data []byte) {
|
||||
switch h.StreamID {
|
||||
case controlStreamID:
|
||||
r.handleControlPacket(h, srcAddr, data)
|
||||
case dataStreamID:
|
||||
r.handleDataPacket(h, data)
|
||||
default:
|
||||
r.logf("Unknown stream ID: %d", h.StreamID)
|
||||
}
|
||||
}
|
||||
|
||||
// Handle a control packet. Decrypt, verify, etc.
|
||||
func (r *Remote) handleControlPacket(h Header, srcAddr netip.AddrPort, data []byte) {
|
||||
conf := r.conf()
|
||||
if conf.ControlCipher == nil {
|
||||
r.logf("No control cipher")
|
||||
return
|
||||
}
|
||||
|
||||
dec, ok := conf.ControlCipher.Decrypt(data, data[len(data):cap(data)])
|
||||
addr, ok := netip.AddrFromSlice(ep.IP)
|
||||
if !ok {
|
||||
r.logf("Failed to decrypt control packet")
|
||||
return
|
||||
}
|
||||
|
||||
if r.dupCheck.IsDup(h.Counter) {
|
||||
r.logf("Dropping control packet as duplicate: %d", h.Counter)
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := parseControlMsg(h.SourceIP, srcAddr, dec)
|
||||
if err != nil {
|
||||
r.logf("Failed to parse control packet: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case r.messages <- msg:
|
||||
default:
|
||||
r.logf("Dropping control message")
|
||||
return netip.AddrPort{}
|
||||
}
|
||||
return netip.AddrPortFrom(addr.Unmap(), uint16(ep.Port))
|
||||
}
|
||||
|
||||
func (r *Remote) handleDataPacket(h Header, data []byte) {
|
||||
conf := r.conf()
|
||||
if conf.DataCipher == nil {
|
||||
return
|
||||
}
|
||||
|
||||
dec, ok := conf.DataCipher.Decrypt(data, data[len(data):cap(data)])
|
||||
if !ok {
|
||||
r.logf("Failed to decrypt data packet")
|
||||
return
|
||||
}
|
||||
|
||||
if r.dupCheck.IsDup(h.Counter) {
|
||||
r.logf("Dropping data packet as duplicate: %d", h.Counter)
|
||||
return
|
||||
}
|
||||
|
||||
// For local.
|
||||
if h.DestIP == r.LocalPeerIP {
|
||||
if _, err := r.IFace.Write(dec); err != nil {
|
||||
// This could be a malformed packet from a peer, so we don't crash if it
|
||||
// happens.
|
||||
r.logf("Failed to write to interface: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Forward.
|
||||
dest := r.RemotePeers[h.DestIP].Load()
|
||||
dest.forwardPacket(dec)
|
||||
func (p *Peer) LastHandshakeTime() time.Time {
|
||||
return p.wgPeer.LastHandshakeTime
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// HandleLocalDiscoveryPacket is called by the MCReader.
|
||||
func (r *Remote) HandleLocalDiscoveryPacket(h Header, srcAddr netip.AddrPort, data []byte) {
|
||||
conf := r.conf()
|
||||
if conf.Peer == nil {
|
||||
r.logf("No peer for discovery packet.")
|
||||
return
|
||||
}
|
||||
|
||||
if conf.Peer.PubSignKey == nil {
|
||||
r.logf("No signing key for discovery packet.")
|
||||
return
|
||||
}
|
||||
|
||||
if !verifyLocalDiscoveryPacket(data, data[len(data):cap(data)], conf.Peer.PubSignKey) {
|
||||
r.logf("Invalid signature on discovery packet.")
|
||||
return
|
||||
}
|
||||
|
||||
msg := controlMsg[packetLocalDiscovery]{
|
||||
SrcIP: h.SourceIP,
|
||||
SrcAddr: srcAddr,
|
||||
}
|
||||
|
||||
select {
|
||||
case r.messages <- msg:
|
||||
default:
|
||||
r.logf("Dropping discovery message.")
|
||||
}
|
||||
func (p *Peer) Up() bool {
|
||||
return time.Since(p.wgPeer.LastHandshakeTime) < wginterface.SessionTimeout
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
// HandlePeerUpdate is called by the HubPoller when it gets a new version of
|
||||
// the associated peer configuration.
|
||||
func (r *Remote) HandlePeerUpdate(msg peerUpdateMsg) {
|
||||
r.messages <- msg
|
||||
func (p *Peer) CanRelay() bool {
|
||||
return p.IsRelay && p.Up()
|
||||
}
|
||||
|
||||
// ----------------------------------------------------------------------------
|
||||
|
||||
func (s *Remote) logf(format string, args ...any) {
|
||||
conf := s.conf()
|
||||
|
||||
b := strings.Builder{}
|
||||
name := ""
|
||||
if conf.Peer != nil {
|
||||
name = conf.Peer.Name
|
||||
}
|
||||
b.WriteString(fmt.Sprintf("%03d", s.RemotePeerIP))
|
||||
|
||||
b.WriteString(fmt.Sprintf("%30s: ", name))
|
||||
|
||||
if conf.Server {
|
||||
b.WriteString("SERVER | ")
|
||||
} else {
|
||||
b.WriteString("CLIENT | ")
|
||||
}
|
||||
|
||||
if conf.Direct {
|
||||
b.WriteString("DIRECT | ")
|
||||
} else {
|
||||
b.WriteString("RELAYED | ")
|
||||
}
|
||||
|
||||
if conf.Up {
|
||||
b.WriteString("UP | ")
|
||||
} else {
|
||||
b.WriteString("DOWN | ")
|
||||
}
|
||||
|
||||
log.Printf(b.String()+format, args...)
|
||||
func (p *Peer) PreferredEndpoint() netip.AddrPort {
|
||||
return preferredEndpoint(p.EndpointV4, p.EndpointV6)
|
||||
}
|
||||
|
||||
func (p *Peer) UpdateEndpoints(v4, v6 netip.AddrPort) {
|
||||
if v4.IsValid() {
|
||||
p.EndpointV4 = v4
|
||||
}
|
||||
if v6.IsValid() {
|
||||
p.EndpointV6 = v6
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user