263 lines
6.2 KiB
Go
263 lines
6.2 KiB
Go
package node
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net/netip"
|
|
"sync/atomic"
|
|
"time"
|
|
"vppn/m"
|
|
)
|
|
|
|
type remotePeers [256]*remotePeer
|
|
|
|
type peerRoutingData struct {
|
|
up bool
|
|
relay bool
|
|
controlCipher *controlCipher
|
|
dataCipher *dataCipher
|
|
remoteAddr netip.AddrPort
|
|
relayIP byte // Non-zero if we should relay.
|
|
}
|
|
|
|
type remotePeer struct {
|
|
// Immutable data.
|
|
localIP byte
|
|
remoteIP byte
|
|
iface *ifWriter
|
|
conn *connWriter
|
|
|
|
// Shared state.
|
|
peers *remotePeers
|
|
published *atomic.Pointer[peerRoutingData]
|
|
|
|
// Only used in HandlePacket / Not synchronized.
|
|
dupCheck *dupCheck
|
|
decryptBuf []byte
|
|
|
|
// Only used in SendData / Not synchronized.
|
|
encryptBuf []byte
|
|
|
|
// Used for sending control and data packets. Atomic access only.
|
|
counter uint64
|
|
|
|
// For communicating with the supervisor thread.
|
|
peerUpdates chan *m.Peer
|
|
controlPackets chan controlPacket
|
|
}
|
|
|
|
func newRemotePeer(conf m.PeerConfig, remoteIP byte, iface *ifWriter, conn *connWriter, peers *remotePeers) *remotePeer {
|
|
rp := &remotePeer{
|
|
localIP: conf.PeerIP,
|
|
remoteIP: remoteIP,
|
|
iface: iface,
|
|
conn: conn,
|
|
peers: peers,
|
|
published: &atomic.Pointer[peerRoutingData]{},
|
|
dupCheck: newDupCheck(0),
|
|
decryptBuf: make([]byte, bufferSize),
|
|
encryptBuf: make([]byte, bufferSize),
|
|
counter: uint64(time.Now().Unix()) << 30,
|
|
peerUpdates: make(chan *m.Peer),
|
|
controlPackets: make(chan controlPacket, 512),
|
|
}
|
|
|
|
pd := peerRoutingData{}
|
|
rp.published.Store(&pd)
|
|
|
|
//go newPeerSuper(rp).Run()
|
|
go rp.supervise(conf)
|
|
return rp
|
|
}
|
|
|
|
func (rp *remotePeer) logf(msg string, args ...any) {
|
|
log.Printf(fmt.Sprintf("[%03d] ", rp.remoteIP)+msg, args...)
|
|
}
|
|
|
|
func (rp *remotePeer) HandlePeerUpdate(peer *m.Peer) {
|
|
rp.peerUpdates <- peer
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
// HandlePacket accepts a raw data packet coming in from the network.
|
|
//
|
|
// This function is called by a single thread.
|
|
func (rp *remotePeer) HandlePacket(addr netip.AddrPort, h header, data []byte) {
|
|
switch h.StreamID {
|
|
case controlStreamID:
|
|
rp.handleControlPacket(addr, h, data)
|
|
|
|
case dataStreamID:
|
|
rp.handleDataPacket(data)
|
|
|
|
case relayStreamID:
|
|
rp.handleRelayPacket(h, data)
|
|
|
|
default:
|
|
rp.logf("Unknown stream ID: %d", h.StreamID)
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (rp *remotePeer) handleControlPacket(addr netip.AddrPort, h header, data []byte) {
|
|
routingData := rp.published.Load()
|
|
if routingData.controlCipher == nil {
|
|
rp.logf("Not connected (control).")
|
|
return
|
|
}
|
|
|
|
if h.DestIP != rp.localIP {
|
|
rp.logf("Incorrect destination IP on control packet.")
|
|
return
|
|
}
|
|
|
|
out, ok := routingData.controlCipher.Decrypt(data, rp.decryptBuf)
|
|
if !ok {
|
|
rp.logf("Failed to decrypt control packet.")
|
|
return
|
|
}
|
|
|
|
if len(out) == 0 {
|
|
rp.logf("Empty control packet from: %d", h.SourceIP)
|
|
return
|
|
}
|
|
|
|
if rp.dupCheck.IsDup(h.Counter) {
|
|
rp.logf("Duplicate control packet: %d", h.Counter)
|
|
return
|
|
}
|
|
|
|
pkt := controlPacket{
|
|
SrcIP: h.SourceIP,
|
|
RemoteAddr: addr,
|
|
}
|
|
|
|
if err := pkt.ParsePayload(out); err != nil {
|
|
rp.logf("Failed to parse control packet: %v", err)
|
|
return
|
|
}
|
|
|
|
select {
|
|
case rp.controlPackets <- pkt:
|
|
default:
|
|
rp.logf("Dropping control packet.")
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (rp *remotePeer) handleDataPacket(data []byte) {
|
|
routingData := rp.published.Load()
|
|
if routingData.dataCipher == nil {
|
|
rp.logf("Not connected (recv).")
|
|
return
|
|
}
|
|
|
|
dec, ok := routingData.dataCipher.Decrypt(data, rp.decryptBuf)
|
|
if !ok {
|
|
rp.logf("Failed to decrypt data packet.")
|
|
return
|
|
}
|
|
|
|
rp.iface.Write(dec)
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (rp *remotePeer) handleRelayPacket(h header, data []byte) {
|
|
routingData := rp.published.Load()
|
|
if routingData.dataCipher == nil {
|
|
rp.logf("Not connected (recv).")
|
|
return
|
|
}
|
|
|
|
dec, ok := routingData.dataCipher.Decrypt(data, rp.decryptBuf)
|
|
if !ok {
|
|
rp.logf("Failed to decrypt data packet.")
|
|
return
|
|
}
|
|
|
|
rp.peers[h.DestIP].SendAsIs(dec)
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
// SendData sends data coming from the interface going to the network.
|
|
//
|
|
// This function is called by a single thread.
|
|
func (rp *remotePeer) SendData(data []byte) {
|
|
rp.encryptAndSend(dataStreamID, rp.remoteIP, data)
|
|
}
|
|
|
|
func (rp *remotePeer) HandleInterfacePacket(data []byte) {
|
|
routingData := rp.published.Load()
|
|
|
|
if routingData.dataCipher == nil {
|
|
rp.logf("Not connected (handle interface).")
|
|
return
|
|
}
|
|
|
|
h := header{
|
|
StreamID: dataStreamID,
|
|
Counter: atomic.AddUint64(&rp.counter, 1),
|
|
SourceIP: rp.localIP,
|
|
DestIP: rp.remoteIP,
|
|
}
|
|
|
|
enc := routingData.dataCipher.Encrypt(h, data, rp.encryptBuf)
|
|
|
|
if routingData.relayIP != 0 {
|
|
rp.peers[routingData.relayIP].RelayFor(rp.remoteIP, enc)
|
|
} else {
|
|
rp.SendData(data)
|
|
}
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (rp *remotePeer) CanRelay() bool {
|
|
data := rp.published.Load()
|
|
return data.relay && data.up
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (rp *remotePeer) RelayFor(destIP byte, data []byte) {
|
|
rp.encryptAndSend(relayStreamID, destIP, data)
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
func (rp *remotePeer) encryptAndSend(streamID byte, destIP byte, data []byte) {
|
|
routingData := rp.published.Load()
|
|
if routingData.dataCipher == nil || routingData.remoteAddr == zeroAddrPort {
|
|
rp.logf("Not connected (encrypt and send).")
|
|
return
|
|
}
|
|
|
|
h := header{
|
|
StreamID: streamID,
|
|
Counter: atomic.AddUint64(&rp.counter, 1),
|
|
SourceIP: rp.localIP,
|
|
DestIP: destIP,
|
|
}
|
|
|
|
enc := routingData.dataCipher.Encrypt(h, data, rp.encryptBuf)
|
|
rp.conn.WriteTo(enc, routingData.remoteAddr)
|
|
}
|
|
|
|
// ----------------------------------------------------------------------------
|
|
|
|
// SendAsIs is used when forwarding already-encrypted data from one peer to
|
|
// another.
|
|
func (rp *remotePeer) SendAsIs(data []byte) {
|
|
routingData := rp.published.Load()
|
|
if routingData.remoteAddr == zeroAddrPort {
|
|
rp.logf("Not connected (send direct).")
|
|
return
|
|
}
|
|
rp.conn.WriteTo(data, routingData.remoteAddr)
|
|
}
|