vppn/peer/peer-netreader.go
2024-12-08 09:45:29 +01:00

159 lines
3.2 KiB
Go

package peer
import (
"fmt"
"io"
"log"
"runtime/debug"
"vppn/fasttime"
)
func (peer *Peer) netReader() {
defer func() {
if r := recover(); r != nil {
fmt.Println("stacktrace from panic: \n" + string(debug.Stack()))
}
}()
var (
n int
//srcAddr *net.UDPAddr
nonce Nonce
packet = make([]byte, BUFFER_SIZE)
decrypted = make([]byte, BUFFER_SIZE)
toWrite []byte
route *route
ok bool
err error
conn = peer.conn
ip = peer.ip
counters = [2][256]uint64{} // Counter by stream and IP.
ifaceChan = make(chan []byte, 1024)
reqPool = make(chan []byte, 1024)
)
for range cap(reqPool) {
reqPool <- make([]byte, BUFFER_SIZE)
}
go ifWriter(ifaceChan, peer.iface, reqPool)
NEXT_PACKET:
n, _, err = conn.ReadFromUDPAddrPort(packet[:BUFFER_SIZE])
if err != nil {
log.Fatalf("Failed to read UDP packet: %v", err)
}
if n < NONCE_SIZE {
log.Printf("Dropping short UDP packet: %d", n)
goto NEXT_PACKET
}
packet = packet[:n]
ParseNonceBytes(packet[n-NONCE_SIZE:], &nonce)
// Drop after 8 seconds.
if CounterTimestamp(nonce.Counter) < fasttime.Now()-8 {
log.Printf("Dropping old packet: %d", CounterTimestamp(nonce.Counter))
goto NEXT_PACKET
}
if nonce.StreamID > 1 {
log.Printf("Dropping invalid stream ID: %v", nonce)
goto NEXT_PACKET
}
// Check source counter.
if nonce.Counter <= counters[nonce.StreamID][nonce.SourceIP] {
log.Printf("Dropping packet with bad counter: %v", nonce)
goto NEXT_PACKET
}
counters[nonce.StreamID][nonce.SourceIP] = nonce.Counter
route = peer.router.GetRoute(nonce.SourceIP)
if route == nil {
log.Printf("Dropping packet without route: %v", nonce)
goto NEXT_PACKET
}
switch ip {
case nonce.DestIP:
goto DECRYPT
case nonce.ViaIP:
goto VALIDATE_SIGNATURE
default:
log.Printf("Bad packet: %v", nonce)
goto NEXT_PACKET
}
DECRYPT:
decrypted, ok = decryptPacket(route.EncSharedKey, packet, decrypted)
if !ok {
log.Printf("Failed to decrypt packet: %v", nonce)
goto NEXT_PACKET
}
switch nonce.StreamID {
case STREAM_DATA:
goto WRITE_IFACE_DATA
case STREAM_ROUTING:
goto WRITE_ROUTING_PACKET
default:
log.Printf("Invalid stream ID: %d", nonce.StreamID)
goto NEXT_PACKET
}
WRITE_IFACE_DATA:
//toWrite = toWrite[:len(decrypted)]
//copy(toWrite, decrypted)
toWrite = <-reqPool
ifaceChan <- append(toWrite[:0], decrypted...)
goto NEXT_PACKET
WRITE_ROUTING_PACKET:
//peer.processRoutingPacket(decrypted)
//peer.routeManager.ProcessPacket(decrypted)
goto NEXT_PACKET
VALIDATE_SIGNATURE:
// We don't forward twice.
if route.Mediator {
log.Printf("Dropping double-forward packet: %v", nonce)
goto NEXT_PACKET
}
decrypted, ok = openPacket(route.SignPubKey, packet, decrypted)
if !ok {
log.Printf("Failed to open signed packet: %v", nonce)
goto NEXT_PACKET
}
if _, err = conn.WriteToUDPAddrPort(decrypted, route.Addr); err != nil {
log.Fatalf("Failed to forward packet: %v", err)
}
goto NEXT_PACKET
}
func ifWriter(in chan []byte, iface io.ReadWriteCloser, out chan []byte) {
var err error
for packet := range in {
if _, err = iface.Write(packet); err != nil {
log.Printf("Size: %d", len(packet))
log.Fatalf("Failed to write to interface: %v", err)
}
out <- packet
}
}