vppn/node/conn.go

173 lines
3.3 KiB
Go

package node
import (
"log"
"net"
"net/netip"
"sync"
"sync/atomic"
"vppn/fasttime"
)
type connWriter struct {
*net.UDPConn
lock sync.Mutex
localIP byte
buf []byte
buf2 []byte
counters [256]uint64
routing *routingTable
}
func newConnWriter(conn *net.UDPConn, localIP byte, routing *routingTable) *connWriter {
w := &connWriter{
UDPConn: conn,
localIP: localIP,
buf: make([]byte, bufferSize),
buf2: make([]byte, bufferSize),
routing: routing,
}
for i := range w.counters {
w.counters[i] = uint64(fasttime.Now() << 30)
}
return w
}
func (w *connWriter) WriteTo(remoteIP, stream byte, data []byte) {
dstPeer := w.routing.Get(remoteIP)
if dstPeer == nil {
log.Printf("No peer: %d", remoteIP)
return
}
if stream == streamData && !dstPeer.Up {
log.Printf("Peer down: %d", remoteIP)
return
}
var viaPeer *peer
if dstPeer.Mediated {
viaPeer = w.routing.mediator.Load()
if viaPeer == nil || viaPeer.Addr == nil {
log.Printf("Mediator not connected")
return
}
} else if dstPeer.Addr == nil {
log.Printf("Peer doesn't have address: %d", remoteIP)
return
}
w.WriteToPeer(dstPeer, viaPeer, stream, data)
}
func (w *connWriter) WriteToPeer(dstPeer, viaPeer *peer, stream byte, data []byte) {
w.lock.Lock()
addr := dstPeer.Addr
h := header{
Counter: atomic.AddUint64(&w.counters[dstPeer.IP], 1),
SourceIP: w.localIP,
DestIP: dstPeer.IP,
Stream: stream,
}
buf := encryptPacketAsym(&h, dstPeer.SharedKey, data, w.buf)
if viaPeer != nil {
h := header{
Counter: atomic.AddUint64(&w.counters[viaPeer.IP], 1),
SourceIP: w.localIP,
DestIP: dstPeer.IP,
Forward: 1,
Stream: stream,
}
buf = encryptPacketAsym(&h, viaPeer.SharedKey, buf, w.buf2)
addr = viaPeer.Addr
}
if _, err := w.WriteToUDPAddrPort(buf, *addr); err != nil {
log.Fatalf("Failed to write to UDP port: %v", err)
}
w.lock.Unlock()
}
func (w *connWriter) Forward(dstIP byte, packet []byte) {
dstPeer := w.routing.Get(dstIP)
if dstPeer == nil || dstPeer.Addr == nil {
log.Printf("No peer: %d", dstIP)
return
}
if _, err := w.WriteToUDPAddrPort(packet, *dstPeer.Addr); err != nil {
log.Fatalf("Failed to write to UDP port: %v", err)
}
}
// ----------------------------------------------------------------------------
type connReader struct {
*net.UDPConn
localIP byte
dupChecks [256]*dupCheck
routing *routingTable
buf []byte
}
func newConnReader(conn *net.UDPConn, localIP byte, routing *routingTable) *connReader {
r := &connReader{
UDPConn: conn,
localIP: localIP,
routing: routing,
buf: make([]byte, bufferSize),
}
for i := range r.dupChecks {
r.dupChecks[i] = newDupCheck(0)
}
return r
}
func (r *connReader) Read(buf []byte) (remoteAddr netip.AddrPort, h header, data []byte) {
var (
n int
err error
)
for {
n, remoteAddr, err = r.ReadFromUDPAddrPort(buf[:bufferSize])
if err != nil {
log.Fatalf("Failed to read from UDP port: %v", err)
}
data = buf[:n]
if n < headerSize {
continue // Packet it soo short.
}
h.Parse(data)
peer := r.routing.Get(h.SourceIP)
if peer == nil {
continue
}
out, ok := decryptPacketAsym(peer.SharedKey, data, r.buf)
if !ok {
continue
}
out, data = data, out
if r.dupChecks[h.SourceIP].IsDup(h.Counter) {
log.Printf("Duplicate: %d", h.Counter)
continue
}
return
}
}