diff --git a/README.md b/README.md index 29ae92c..f24bbd7 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,9 @@ # vppn: Virtual Potentially Private Network +## TO DO + +* Double buffering in IFReader and ConnReader ? + ## Hub Server Configuration ``` @@ -9,7 +13,6 @@ adduser user # Enable ssh. cp -r ~/.ssh /home/user/ chown -R user:user /home/user/.ssh - ``` Upload `hub` executable: @@ -56,7 +59,6 @@ Install the binary somewhere, for example `~/bin/vppn`. Create systemd file in `/etc/systemd/system/vppn.service`. - ``` [Service] AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN @@ -73,7 +75,6 @@ WantedBy=multi-user.target Add and start the service: - ``` systemctl daemon-reload systemctl enable vppn diff --git a/peer/connreader.go b/peer/connreader.go index f627982..5427227 100644 --- a/peer/connreader.go +++ b/peer/connreader.go @@ -1,56 +1,34 @@ package peer import ( - "io" "log" + "net" "net/netip" - "sync/atomic" ) -type connReader struct { - // Input - readFromUDPAddrPort func([]byte) (int, netip.AddrPort, error) - - // Output - writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error) - iface io.Writer - handleControlMsg func(fromIP byte, pkt any) - - localIP byte - rt *atomic.Pointer[routingTable] - - buf []byte - decBuf []byte +type ConnReader struct { + Globals + conn *net.UDPConn + buf []byte } -func newConnReader( - readFromUDPAddrPort func([]byte) (int, netip.AddrPort, error), - writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error), - iface io.Writer, - handleControlMsg func(fromIP byte, pkt any), - rt *atomic.Pointer[routingTable], -) *connReader { - return &connReader{ - readFromUDPAddrPort: readFromUDPAddrPort, - writeToUDPAddrPort: writeToUDPAddrPort, - iface: iface, - handleControlMsg: handleControlMsg, - localIP: rt.Load().LocalIP, - rt: rt, - buf: newBuf(), - decBuf: newBuf(), +func NewConnReader(g Globals, conn *net.UDPConn) *ConnReader { + return &ConnReader{ + Globals: g, + conn: conn, + buf: make([]byte, bufferSize), } } -func (r *connReader) Run() { +func (r *ConnReader) Run() { for { r.handleNextPacket() } } -func (r *connReader) handleNextPacket() { +func (r *ConnReader) handleNextPacket() { buf := r.buf[:bufferSize] - n, remoteAddr, err := r.readFromUDPAddrPort(buf) + n, remoteAddr, err := r.conn.ReadFromUDPAddrPort(buf) if err != nil { log.Fatalf("Failed to read from UDP port: %v", err) } @@ -64,78 +42,5 @@ func (r *connReader) handleNextPacket() { buf = buf[:n] h := parseHeader(buf) - rt := r.rt.Load() - peer := rt.Peers[h.SourceIP] - - switch h.StreamID { - case controlStreamID: - r.handleControlPacket(remoteAddr, peer, h, buf) - case dataStreamID: - r.handleDataPacket(rt, peer, h, buf) - default: - r.logf("Unknown stream ID: %d", h.StreamID) - } -} - -func (r *connReader) handleControlPacket( - remoteAddr netip.AddrPort, - peer remotePeer, - h header, - enc []byte, -) { - if peer.ControlCipher == nil { - r.logf("No control cipher for peer: %d", h.SourceIP) - return - } - - if h.DestIP != r.localIP { - r.logf("Incorrect destination IP on control packet: %d", h.DestIP) - return - } - - msg, err := peer.DecryptControlPacket(remoteAddr, h, enc, r.decBuf) - if err != nil { - r.logf("Failed to decrypt control packet: %v", err) - return - } - - r.handleControlMsg(h.SourceIP, msg) -} - -func (r *connReader) handleDataPacket( - rt *routingTable, - peer remotePeer, - h header, - enc []byte, -) { - if !peer.Up { - r.logf("Not connected (recv).") - return - } - - data, err := peer.DecryptDataPacket(h, enc, r.decBuf) - if err != nil { - r.logf("Failed to decrypt data packet: %v", err) - return - } - - if h.DestIP == r.localIP { - if _, err := r.iface.Write(data); err != nil { - // Could be invalid data from peer. Don't crash. - log.Printf("Failed to write to interface: %v", err) - } - return - } - - remote := rt.Peers[h.DestIP] - if !remote.Direct { - r.logf("Unable to relay data to %d.", h.DestIP) - return - } - - r.writeToUDPAddrPort(data, remote.DirectAddr) -} - -func (r *connReader) logf(format string, args ...any) { - log.Printf("[ConnReader] "+format, args...) + r.RemotePeers[h.SourceIP].Load().HandlePacket(h, remoteAddr, buf) } diff --git a/peer/files.go b/peer/files.go index a7d9566..f4ee973 100644 --- a/peer/files.go +++ b/peer/files.go @@ -8,8 +8,8 @@ import ( "vppn/m" ) -type localConfig struct { - PeerIP byte +type LocalConfig struct { + LocalPeerIP byte Network []byte PubKey []byte PrivKey []byte @@ -17,6 +17,10 @@ type localConfig struct { PrivSignKey []byte } +type startupCount struct { + Count uint16 +} + func configDir(netName string) string { d, err := os.UserHomeDir() if err != nil { @@ -33,6 +37,10 @@ func peerStatePath(netName string) string { return filepath.Join(configDir(netName), "state.json") } +func startupCountPath(netName string) string { + return filepath.Join(configDir(netName), "startup_count.json") +} + func storeJson(x any, outPath string) error { outDir := filepath.Dir(outPath) _ = os.MkdirAll(outDir, 0700) @@ -65,7 +73,7 @@ func storeJson(x any, outPath string) error { return os.Rename(tmpPath, outPath) } -func storePeerConfig(netName string, pc localConfig) error { +func storePeerConfig(netName string, pc LocalConfig) error { return storeJson(pc, peerConfigPath(netName)) } @@ -82,10 +90,18 @@ func loadJson(dataPath string, ptr any) error { return json.Unmarshal(data, ptr) } -func loadPeerConfig(netName string) (pc localConfig, err error) { +func loadPeerConfig(netName string) (pc LocalConfig, err error) { return pc, loadJson(peerConfigPath(netName), &pc) } func loadNetworkState(netName string) (ps m.NetworkState, err error) { return ps, loadJson(peerStatePath(netName), &ps) } + +func loadStartupCount(netName string) (c startupCount, err error) { + return c, loadJson(startupCountPath(netName), &c) +} + +func storeStartupCount(netName string, c startupCount) error { + return storeJson(c, startupCountPath(netName)) +} diff --git a/peer/main.go b/peer/main.go index 819effb..bd20de6 100644 --- a/peer/main.go +++ b/peer/main.go @@ -6,18 +6,18 @@ import ( ) func Main() { - conf := mainArgs{} + args := mainArgs{} - flag.StringVar(&conf.NetName, "name", "", "[REQUIRED] The network name.") - flag.StringVar(&conf.HubAddress, "hub-address", "", "[REQUIRED] The hub address.") - flag.StringVar(&conf.APIKey, "api-key", "", "[REQUIRED] The node's API key.") + flag.StringVar(&args.NetName, "name", "", "[REQUIRED] The network name.") + flag.StringVar(&args.HubAddress, "hub-address", "", "[REQUIRED] The hub address.") + flag.StringVar(&args.APIKey, "api-key", "", "[REQUIRED] The node's API key.") flag.Parse() - if conf.NetName == "" || conf.HubAddress == "" || conf.APIKey == "" { + if args.NetName == "" || args.HubAddress == "" || args.APIKey == "" { flag.Usage() os.Exit(1) } - peer := newPeerMain(conf) + peer := newPeerMain(args) peer.Run() } diff --git a/peer/mcreader.go b/peer/mcreader.go index 3c1086d..5bbfe71 100644 --- a/peer/mcreader.go +++ b/peer/mcreader.go @@ -3,27 +3,19 @@ package peer import ( "log" "net" - "sync/atomic" "time" ) -func runMCReader( - rt *atomic.Pointer[routingTable], - handleControlMsg func(destIP byte, msg any), -) { +func RunMCReader(g Globals) { for { - runMCReaderInner(rt, handleControlMsg) + runMCReaderInner(g) time.Sleep(broadcastErrorTimeoutInterval) } } -func runMCReaderInner( - rt *atomic.Pointer[routingTable], - handleControlMsg func(destIP byte, msg any), -) { +func runMCReaderInner(g Globals) { var ( - raw = newBuf() - buf = newBuf() + buf = make([]byte, bufferSize) logf = func(s string, args ...any) { log.Printf("[MCReader] "+s, args...) } @@ -37,35 +29,20 @@ func runMCReaderInner( for { conn.SetReadDeadline(time.Now().Add(32 * time.Second)) - n, remoteAddr, err := conn.ReadFromUDPAddrPort(raw[:bufferSize]) + n, remoteAddr, err := conn.ReadFromUDPAddrPort(buf[:bufferSize]) if err != nil { logf("Failed to read from UDP port): %v", err) return } - raw = raw[:n] - h, ok := headerFromLocalDiscoveryPacket(raw) + buf = buf[:n] + h, ok := headerFromLocalDiscoveryPacket(buf) if !ok { logf("Failed to open discovery packet?") continue } + log.Printf("Got local discovery from %v: %v", remoteAddr, h) - peer := rt.Load().Peers[h.SourceIP] - if peer.PubSignKey == nil { - logf("No signing key for peer %d.", h.SourceIP) - continue - } - - if !verifyLocalDiscoveryPacket(raw, buf, peer.PubSignKey) { - logf("Invalid signature from peer: %d", h.SourceIP) - continue - } - - msg := controlMsg[packetLocalDiscovery]{ - SrcIP: h.SourceIP, - SrcAddr: remoteAddr, - } - logf("Got discovery packet from peer %d.", h.SourceIP) - handleControlMsg(h.SourceIP, msg) + g.RemotePeers[h.SourceIP].Load().HandleLocalDiscoveryPacket(h, remoteAddr, buf) } } diff --git a/peer/peer.go b/peer/peer.go index ade03ce..49ebe28 100644 --- a/peer/peer.go +++ b/peer/peer.go @@ -6,23 +6,23 @@ import ( "fmt" "io" "log" + "math" "net" "net/http" "net/netip" "net/url" - "sync" - "sync/atomic" +<<<<<<< HEAD + "os" +======= +>>>>>>> 69f2536 (WIP) "vppn/m" ) type peerMain struct { - conf localConfig - rt *atomic.Pointer[routingTable] - ifReader *ifReader - connReader *connReader - iface io.Writer - hubPoller *hubPoller - super *supervisor + Globals + ifReader *IFReader + connReader *ConnReader + hubPoller *HubPoller } type mainArgs struct { @@ -53,12 +53,31 @@ func newPeerMain(args mainArgs) *peerMain { log.Fatalf("Failed to load network state: %v", err) } - iface, err := openInterface(config.Network, config.PeerIP, args.NetName) +<<<<<<< HEAD + startupCount, err := loadStartupCount(args.NetName) + if err != nil { + if !os.IsNotExist(err) { + log.Fatalf("Failed to load startup count: %v", err) + } + } + + if startupCount.Count == math.MaxUint16 { + log.Fatalf("Startup counter overflow.") + } + startupCount.Count += 1 + + if err := storeStartupCount(args.NetName, startupCount); err != nil { + log.Fatalf("Failed to write startup count: %v", err) + } + +======= +>>>>>>> 69f2536 (WIP) + iface, err := openInterface(config.Network, config.LocalPeerIP, args.NetName) if err != nil { log.Fatalf("Failed to open interface: %v", err) } - localPeer := state.Peers[config.PeerIP] + localPeer := state.Peers[config.LocalPeerIP] myAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", localPeer.Port)) if err != nil { @@ -74,59 +93,47 @@ func newPeerMain(args mainArgs) *peerMain { conn.SetReadBuffer(1024 * 1024 * 8) conn.SetWriteBuffer(1024 * 1024 * 8) - // Wrap write function - this is necessary to avoid starvation. - writeLock := sync.Mutex{} - writeToUDPAddrPort := func(b []byte, addr netip.AddrPort) (n int, err error) { - writeLock.Lock() - n, err = conn.WriteToUDPAddrPort(b, addr) - if err != nil { - logf("Failed to write packet: %v", err) - } - writeLock.Unlock() - return n, err - } - var localAddr netip.AddrPort ip, localAddrValid := netip.AddrFromSlice(localPeer.PublicIP) if localAddrValid { localAddr = netip.AddrPortFrom(ip, localPeer.Port) } - rt := newRoutingTable(localPeer.PeerIP, localAddr) - rtPtr := &atomic.Pointer[routingTable]{} - rtPtr.Store(&rt) +<<<<<<< HEAD + g := NewGlobals(config, startupCount, localAddr, conn, iface) +======= + g := NewGlobals(config, localAddr, conn, iface) +>>>>>>> 69f2536 (WIP) - ifReader := newIFReader(iface, writeToUDPAddrPort, rtPtr) - super := newSupervisor(writeToUDPAddrPort, rtPtr, config.PrivKey) - connReader := newConnReader(conn.ReadFromUDPAddrPort, writeToUDPAddrPort, iface, super.HandleControlMsg, rtPtr) - hubPoller, err := newHubPoller(config.PeerIP, args.NetName, args.HubAddress, args.APIKey, super.HandleControlMsg) + hubPoller, err := NewHubPoller(g, args.NetName, args.HubAddress, args.APIKey) if err != nil { log.Fatalf("Failed to create hub poller: %v", err) } return &peerMain{ - conf: config, - rt: rtPtr, - iface: iface, - ifReader: ifReader, - connReader: connReader, + Globals: g, + ifReader: NewIFReader(g), + connReader: NewConnReader(g, conn), hubPoller: hubPoller, - super: super, } } func (p *peerMain) Run() { + for i := range p.RemotePeers { + remote := p.RemotePeers[i].Load() + go newRemoteFSM(remote).Run() + } go p.ifReader.Run() go p.connReader.Run() - p.super.Start() - if !p.rt.Load().LocalAddr.IsValid() { - go runMCWriter(p.conf.PeerIP, p.conf.PrivSignKey) - go runMCReader(p.rt, p.super.HandleControlMsg) + if !p.LocalAddrValid { + go RunMCWriter(p.LocalPeerIP, p.PrivSignKey) + go RunMCReader(p.Globals) } go p.hubPoller.Run() + select {} } @@ -171,8 +178,8 @@ func initPeerWithHub(args mainArgs) { log.Fatalf("Failed to parse configuration: %v\n%s", err, data) } - config := localConfig{} - config.PeerIP = initResp.PeerIP + config := LocalConfig{} + config.LocalPeerIP = initResp.PeerIP config.Network = initResp.Network config.PubKey = keys.PubKey config.PrivKey = keys.PrivKey