vppn/node/peer.go
2024-12-20 21:06:16 +01:00

253 lines
5.8 KiB
Go

package node
import (
"fmt"
"log"
"net/netip"
"sync/atomic"
"time"
"vppn/m"
)
type remotePeers [256]*remotePeer
type peerData struct {
up bool
relay bool
controlCipher *controlCipher
dataCipher *dataCipher
remoteAddr netip.AddrPort
relayIP byte // Non-zero if we should relay.
}
type remotePeer struct {
// Immutable data.
localIP byte
remoteIP byte
iface *ifWriter
conn *connWriter
// Shared state.
peers *remotePeers
published *atomic.Pointer[peerData]
// Only used in HandlePacket / Not synchronized.
dupCheck *dupCheck
decryptBuf []byte
// Only used in SendData / Not synchronized.
encryptBuf []byte
// Used for sending control and data packets. Atomic access only.
counter uint64
// For communicating with the supervisor thread.
peerUpdates chan *m.Peer
controlPackets chan controlPacket
}
func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *connWriter, peers *remotePeers) *remotePeer {
rp := &remotePeer{
localIP: conf.PeerIP,
remoteIP: remoteIP,
iface: iface,
conn: conn,
peers: peers,
published: &atomic.Pointer[peerData]{},
dupCheck: newDupCheck(0),
decryptBuf: make([]byte, bufferSize),
encryptBuf: make([]byte, bufferSize),
counter: uint64(time.Now().Unix()) << 30,
peerUpdates: make(chan *m.Peer),
controlPackets: make(chan controlPacket, 512),
}
pd := peerData{}
rp.published.Store(&pd)
//go newPeerSuper(rp).Run()
go rp.supervise(conf, remoteIP, conn, peers)
return rp
}
func (rp *remotePeer) logf(msg string, args ...any) {
log.Printf(fmt.Sprintf("[%03d] ", rp.remoteIP)+msg, args...)
}
func (rp *remotePeer) HandlePeerUpdate(peer *m.Peer) {
rp.peerUpdates <- peer
}
// ----------------------------------------------------------------------------
// HandlePacket accepts a raw data packet coming in from the network.
//
// This function is called by a single thread.
func (rp *remotePeer) HandlePacket(addr netip.AddrPort, h header, data []byte) {
switch h.StreamID {
case controlStreamID:
rp.handleControlPacket(addr, h, data)
case dataStreamID:
rp.handleDataPacket(data)
case relayStreamID:
rp.handleRelayPacket(h, data)
default:
rp.logf("Unknown stream ID: %d", h.StreamID)
}
}
// ----------------------------------------------------------------------------
func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []byte) {
shared := rp.published.Load()
if shared.controlCipher == nil {
log.Printf("Shared: %+v", *shared)
rp.logf("Not connected (control).")
return
}
if h.DestIP != rp.localIP {
rp.logf("Incorrect destination IP on control packet.")
return
}
out, ok := shared.controlCipher.Decrypt(data, rp.decryptBuf)
if !ok {
rp.logf("Failed to decrypt control packet.")
return
}
if len(out) == 0 {
rp.logf("Empty control packet from: %d", h.SourceIP)
return
}
if rp.dupCheck.IsDup(h.Counter) {
rp.logf("Duplicate control packet: %d", h.Counter)
return
}
pkt := controlPacket{
SrcIP: h.SourceIP,
RemoteAddr: addr,
}
if err := pkt.ParsePayload(out); err != nil {
rp.logf("Failed to parse control packet: %v", err)
return
}
select {
case rp.controlPackets <- pkt:
default:
rp.logf("Dropping control packet.")
}
}
// ----------------------------------------------------------------------------
func (rp *remotePeer) handleDataPacket(data []byte) {
shared := rp.published.Load()
if shared.dataCipher == nil {
rp.logf("Not connected (recv).")
return
}
dec, ok := shared.dataCipher.Decrypt(data, rp.decryptBuf)
if !ok {
rp.logf("Failed to decrypt data packet.")
return
}
rp.iface.Write(dec)
}
// ----------------------------------------------------------------------------
func (rp *remotePeer) handleRelayPacket(h header, data []byte) {
shared := rp.published.Load()
if shared.dataCipher == nil {
rp.logf("Not connected (recv).")
return
}
dec, ok := shared.dataCipher.Decrypt(data, rp.decryptBuf)
if !ok {
rp.logf("Failed to decrypt data packet.")
return
}
rp.peers[h.DestIP].sendDirect(dec)
}
// ----------------------------------------------------------------------------
// SendData sends data coming from the interface going to the network.
//
// This function is called by a single thread.
func (rp *remotePeer) SendData(data []byte) {
rp.sendData(dataStreamID, rp.remoteIP, data)
}
func (rp *remotePeer) HandleInterfacePacket(data []byte) {
shared := rp.published.Load()
if shared.dataCipher == nil {
rp.logf("Not connected (handle interface).")
return
}
h := header{
StreamID: dataStreamID,
Counter: atomic.AddUint64(&rp.counter, 1),
SourceIP: rp.localIP,
DestIP: rp.remoteIP,
}
enc := shared.dataCipher.Encrypt(h, data, rp.encryptBuf)
if shared.relayIP != 0 {
rp.peers[shared.relayIP].RelayData(shared.relayIP, enc)
} else {
rp.SendData(data)
}
}
// ----------------------------------------------------------------------------
func (rp *remotePeer) RelayData(destIP byte, data []byte) {
rp.sendData(relayStreamID, destIP, data)
}
// ----------------------------------------------------------------------------
func (rp *remotePeer) sendData(streamID byte, destIP byte, data []byte) {
shared := rp.published.Load()
if shared.dataCipher == nil || shared.remoteAddr == zeroAddrPort {
rp.logf("Not connected (send).")
return
}
h := header{
StreamID: streamID,
Counter: atomic.AddUint64(&rp.counter, 1),
SourceIP: rp.localIP,
DestIP: destIP,
}
enc := shared.dataCipher.Encrypt(h, data, rp.encryptBuf)
rp.conn.WriteTo(enc, shared.remoteAddr)
}
func (rp *remotePeer) sendDirect(data []byte) {
shared := rp.published.Load()
if shared.remoteAddr == zeroAddrPort {
rp.logf("Not connected (send).")
return
}
rp.conn.WriteTo(data, shared.remoteAddr)
}