Refactor
This commit is contained in:
parent
e91cbfe957
commit
5f0b00ff46
@ -1,5 +1,9 @@
|
||||
# vppn: Virtual Potentially Private Network
|
||||
|
||||
## TO DO
|
||||
|
||||
* Double buffering in IFReader and ConnReader ?
|
||||
|
||||
## Hub Server Configuration
|
||||
|
||||
```
|
||||
@ -9,7 +13,6 @@ adduser user
|
||||
# Enable ssh.
|
||||
cp -r ~/.ssh /home/user/
|
||||
chown -R user:user /home/user/.ssh
|
||||
|
||||
```
|
||||
|
||||
Upload `hub` executable:
|
||||
@ -56,7 +59,6 @@ Install the binary somewhere, for example `~/bin/vppn`.
|
||||
|
||||
Create systemd file in `/etc/systemd/system/vppn.service`.
|
||||
|
||||
|
||||
```
|
||||
[Service]
|
||||
AmbientCapabilities=CAP_NET_BIND_SERVICE CAP_NET_ADMIN
|
||||
@ -73,7 +75,6 @@ WantedBy=multi-user.target
|
||||
|
||||
Add and start the service:
|
||||
|
||||
|
||||
```
|
||||
systemctl daemon-reload
|
||||
systemctl enable vppn
|
||||
|
@ -1,56 +1,34 @@
|
||||
package peer
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"net/netip"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
type connReader struct {
|
||||
// Input
|
||||
readFromUDPAddrPort func([]byte) (int, netip.AddrPort, error)
|
||||
|
||||
// Output
|
||||
writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error)
|
||||
iface io.Writer
|
||||
handleControlMsg func(fromIP byte, pkt any)
|
||||
|
||||
localIP byte
|
||||
rt *atomic.Pointer[routingTable]
|
||||
|
||||
buf []byte
|
||||
decBuf []byte
|
||||
type ConnReader struct {
|
||||
Globals
|
||||
conn *net.UDPConn
|
||||
buf []byte
|
||||
}
|
||||
|
||||
func newConnReader(
|
||||
readFromUDPAddrPort func([]byte) (int, netip.AddrPort, error),
|
||||
writeToUDPAddrPort func([]byte, netip.AddrPort) (int, error),
|
||||
iface io.Writer,
|
||||
handleControlMsg func(fromIP byte, pkt any),
|
||||
rt *atomic.Pointer[routingTable],
|
||||
) *connReader {
|
||||
return &connReader{
|
||||
readFromUDPAddrPort: readFromUDPAddrPort,
|
||||
writeToUDPAddrPort: writeToUDPAddrPort,
|
||||
iface: iface,
|
||||
handleControlMsg: handleControlMsg,
|
||||
localIP: rt.Load().LocalIP,
|
||||
rt: rt,
|
||||
buf: newBuf(),
|
||||
decBuf: newBuf(),
|
||||
func NewConnReader(g Globals, conn *net.UDPConn) *ConnReader {
|
||||
return &ConnReader{
|
||||
Globals: g,
|
||||
conn: conn,
|
||||
buf: make([]byte, bufferSize),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *connReader) Run() {
|
||||
func (r *ConnReader) Run() {
|
||||
for {
|
||||
r.handleNextPacket()
|
||||
}
|
||||
}
|
||||
|
||||
func (r *connReader) handleNextPacket() {
|
||||
func (r *ConnReader) handleNextPacket() {
|
||||
buf := r.buf[:bufferSize]
|
||||
n, remoteAddr, err := r.readFromUDPAddrPort(buf)
|
||||
n, remoteAddr, err := r.conn.ReadFromUDPAddrPort(buf)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to read from UDP port: %v", err)
|
||||
}
|
||||
@ -64,78 +42,5 @@ func (r *connReader) handleNextPacket() {
|
||||
buf = buf[:n]
|
||||
h := parseHeader(buf)
|
||||
|
||||
rt := r.rt.Load()
|
||||
peer := rt.Peers[h.SourceIP]
|
||||
|
||||
switch h.StreamID {
|
||||
case controlStreamID:
|
||||
r.handleControlPacket(remoteAddr, peer, h, buf)
|
||||
case dataStreamID:
|
||||
r.handleDataPacket(rt, peer, h, buf)
|
||||
default:
|
||||
r.logf("Unknown stream ID: %d", h.StreamID)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *connReader) handleControlPacket(
|
||||
remoteAddr netip.AddrPort,
|
||||
peer remotePeer,
|
||||
h header,
|
||||
enc []byte,
|
||||
) {
|
||||
if peer.ControlCipher == nil {
|
||||
r.logf("No control cipher for peer: %d", h.SourceIP)
|
||||
return
|
||||
}
|
||||
|
||||
if h.DestIP != r.localIP {
|
||||
r.logf("Incorrect destination IP on control packet: %d", h.DestIP)
|
||||
return
|
||||
}
|
||||
|
||||
msg, err := peer.DecryptControlPacket(remoteAddr, h, enc, r.decBuf)
|
||||
if err != nil {
|
||||
r.logf("Failed to decrypt control packet: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
r.handleControlMsg(h.SourceIP, msg)
|
||||
}
|
||||
|
||||
func (r *connReader) handleDataPacket(
|
||||
rt *routingTable,
|
||||
peer remotePeer,
|
||||
h header,
|
||||
enc []byte,
|
||||
) {
|
||||
if !peer.Up {
|
||||
r.logf("Not connected (recv).")
|
||||
return
|
||||
}
|
||||
|
||||
data, err := peer.DecryptDataPacket(h, enc, r.decBuf)
|
||||
if err != nil {
|
||||
r.logf("Failed to decrypt data packet: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
if h.DestIP == r.localIP {
|
||||
if _, err := r.iface.Write(data); err != nil {
|
||||
// Could be invalid data from peer. Don't crash.
|
||||
log.Printf("Failed to write to interface: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
remote := rt.Peers[h.DestIP]
|
||||
if !remote.Direct {
|
||||
r.logf("Unable to relay data to %d.", h.DestIP)
|
||||
return
|
||||
}
|
||||
|
||||
r.writeToUDPAddrPort(data, remote.DirectAddr)
|
||||
}
|
||||
|
||||
func (r *connReader) logf(format string, args ...any) {
|
||||
log.Printf("[ConnReader] "+format, args...)
|
||||
r.RemotePeers[h.SourceIP].Load().HandlePacket(h, remoteAddr, buf)
|
||||
}
|
||||
|
@ -8,8 +8,8 @@ import (
|
||||
"vppn/m"
|
||||
)
|
||||
|
||||
type localConfig struct {
|
||||
PeerIP byte
|
||||
type LocalConfig struct {
|
||||
LocalPeerIP byte
|
||||
Network []byte
|
||||
PubKey []byte
|
||||
PrivKey []byte
|
||||
@ -17,6 +17,10 @@ type localConfig struct {
|
||||
PrivSignKey []byte
|
||||
}
|
||||
|
||||
type startupCount struct {
|
||||
Count uint16
|
||||
}
|
||||
|
||||
func configDir(netName string) string {
|
||||
d, err := os.UserHomeDir()
|
||||
if err != nil {
|
||||
@ -33,6 +37,10 @@ func peerStatePath(netName string) string {
|
||||
return filepath.Join(configDir(netName), "state.json")
|
||||
}
|
||||
|
||||
func startupCountPath(netName string) string {
|
||||
return filepath.Join(configDir(netName), "startup_count.json")
|
||||
}
|
||||
|
||||
func storeJson(x any, outPath string) error {
|
||||
outDir := filepath.Dir(outPath)
|
||||
_ = os.MkdirAll(outDir, 0700)
|
||||
@ -65,7 +73,7 @@ func storeJson(x any, outPath string) error {
|
||||
return os.Rename(tmpPath, outPath)
|
||||
}
|
||||
|
||||
func storePeerConfig(netName string, pc localConfig) error {
|
||||
func storePeerConfig(netName string, pc LocalConfig) error {
|
||||
return storeJson(pc, peerConfigPath(netName))
|
||||
}
|
||||
|
||||
@ -82,10 +90,18 @@ func loadJson(dataPath string, ptr any) error {
|
||||
return json.Unmarshal(data, ptr)
|
||||
}
|
||||
|
||||
func loadPeerConfig(netName string) (pc localConfig, err error) {
|
||||
func loadPeerConfig(netName string) (pc LocalConfig, err error) {
|
||||
return pc, loadJson(peerConfigPath(netName), &pc)
|
||||
}
|
||||
|
||||
func loadNetworkState(netName string) (ps m.NetworkState, err error) {
|
||||
return ps, loadJson(peerStatePath(netName), &ps)
|
||||
}
|
||||
|
||||
func loadStartupCount(netName string) (c startupCount, err error) {
|
||||
return c, loadJson(startupCountPath(netName), &c)
|
||||
}
|
||||
|
||||
func storeStartupCount(netName string, c startupCount) error {
|
||||
return storeJson(c, startupCountPath(netName))
|
||||
}
|
||||
|
12
peer/main.go
12
peer/main.go
@ -6,18 +6,18 @@ import (
|
||||
)
|
||||
|
||||
func Main() {
|
||||
conf := mainArgs{}
|
||||
args := mainArgs{}
|
||||
|
||||
flag.StringVar(&conf.NetName, "name", "", "[REQUIRED] The network name.")
|
||||
flag.StringVar(&conf.HubAddress, "hub-address", "", "[REQUIRED] The hub address.")
|
||||
flag.StringVar(&conf.APIKey, "api-key", "", "[REQUIRED] The node's API key.")
|
||||
flag.StringVar(&args.NetName, "name", "", "[REQUIRED] The network name.")
|
||||
flag.StringVar(&args.HubAddress, "hub-address", "", "[REQUIRED] The hub address.")
|
||||
flag.StringVar(&args.APIKey, "api-key", "", "[REQUIRED] The node's API key.")
|
||||
flag.Parse()
|
||||
|
||||
if conf.NetName == "" || conf.HubAddress == "" || conf.APIKey == "" {
|
||||
if args.NetName == "" || args.HubAddress == "" || args.APIKey == "" {
|
||||
flag.Usage()
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
peer := newPeerMain(conf)
|
||||
peer := newPeerMain(args)
|
||||
peer.Run()
|
||||
}
|
||||
|
@ -3,27 +3,19 @@ package peer
|
||||
import (
|
||||
"log"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
)
|
||||
|
||||
func runMCReader(
|
||||
rt *atomic.Pointer[routingTable],
|
||||
handleControlMsg func(destIP byte, msg any),
|
||||
) {
|
||||
func RunMCReader(g Globals) {
|
||||
for {
|
||||
runMCReaderInner(rt, handleControlMsg)
|
||||
runMCReaderInner(g)
|
||||
time.Sleep(broadcastErrorTimeoutInterval)
|
||||
}
|
||||
}
|
||||
|
||||
func runMCReaderInner(
|
||||
rt *atomic.Pointer[routingTable],
|
||||
handleControlMsg func(destIP byte, msg any),
|
||||
) {
|
||||
func runMCReaderInner(g Globals) {
|
||||
var (
|
||||
raw = newBuf()
|
||||
buf = newBuf()
|
||||
buf = make([]byte, bufferSize)
|
||||
logf = func(s string, args ...any) {
|
||||
log.Printf("[MCReader] "+s, args...)
|
||||
}
|
||||
@ -37,35 +29,20 @@ func runMCReaderInner(
|
||||
|
||||
for {
|
||||
conn.SetReadDeadline(time.Now().Add(32 * time.Second))
|
||||
n, remoteAddr, err := conn.ReadFromUDPAddrPort(raw[:bufferSize])
|
||||
n, remoteAddr, err := conn.ReadFromUDPAddrPort(buf[:bufferSize])
|
||||
if err != nil {
|
||||
logf("Failed to read from UDP port): %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
raw = raw[:n]
|
||||
h, ok := headerFromLocalDiscoveryPacket(raw)
|
||||
buf = buf[:n]
|
||||
h, ok := headerFromLocalDiscoveryPacket(buf)
|
||||
if !ok {
|
||||
logf("Failed to open discovery packet?")
|
||||
continue
|
||||
}
|
||||
log.Printf("Got local discovery from %v: %v", remoteAddr, h)
|
||||
|
||||
peer := rt.Load().Peers[h.SourceIP]
|
||||
if peer.PubSignKey == nil {
|
||||
logf("No signing key for peer %d.", h.SourceIP)
|
||||
continue
|
||||
}
|
||||
|
||||
if !verifyLocalDiscoveryPacket(raw, buf, peer.PubSignKey) {
|
||||
logf("Invalid signature from peer: %d", h.SourceIP)
|
||||
continue
|
||||
}
|
||||
|
||||
msg := controlMsg[packetLocalDiscovery]{
|
||||
SrcIP: h.SourceIP,
|
||||
SrcAddr: remoteAddr,
|
||||
}
|
||||
logf("Got discovery packet from peer %d.", h.SourceIP)
|
||||
handleControlMsg(h.SourceIP, msg)
|
||||
g.RemotePeers[h.SourceIP].Load().HandleLocalDiscoveryPacket(h, remoteAddr, buf)
|
||||
}
|
||||
}
|
||||
|
91
peer/peer.go
91
peer/peer.go
@ -6,23 +6,23 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/netip"
|
||||
"net/url"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
<<<<<<< HEAD
|
||||
"os"
|
||||
=======
|
||||
>>>>>>> 69f2536 (WIP)
|
||||
"vppn/m"
|
||||
)
|
||||
|
||||
type peerMain struct {
|
||||
conf localConfig
|
||||
rt *atomic.Pointer[routingTable]
|
||||
ifReader *ifReader
|
||||
connReader *connReader
|
||||
iface io.Writer
|
||||
hubPoller *hubPoller
|
||||
super *supervisor
|
||||
Globals
|
||||
ifReader *IFReader
|
||||
connReader *ConnReader
|
||||
hubPoller *HubPoller
|
||||
}
|
||||
|
||||
type mainArgs struct {
|
||||
@ -53,12 +53,31 @@ func newPeerMain(args mainArgs) *peerMain {
|
||||
log.Fatalf("Failed to load network state: %v", err)
|
||||
}
|
||||
|
||||
iface, err := openInterface(config.Network, config.PeerIP, args.NetName)
|
||||
<<<<<<< HEAD
|
||||
startupCount, err := loadStartupCount(args.NetName)
|
||||
if err != nil {
|
||||
if !os.IsNotExist(err) {
|
||||
log.Fatalf("Failed to load startup count: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if startupCount.Count == math.MaxUint16 {
|
||||
log.Fatalf("Startup counter overflow.")
|
||||
}
|
||||
startupCount.Count += 1
|
||||
|
||||
if err := storeStartupCount(args.NetName, startupCount); err != nil {
|
||||
log.Fatalf("Failed to write startup count: %v", err)
|
||||
}
|
||||
|
||||
=======
|
||||
>>>>>>> 69f2536 (WIP)
|
||||
iface, err := openInterface(config.Network, config.LocalPeerIP, args.NetName)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to open interface: %v", err)
|
||||
}
|
||||
|
||||
localPeer := state.Peers[config.PeerIP]
|
||||
localPeer := state.Peers[config.LocalPeerIP]
|
||||
|
||||
myAddr, err := net.ResolveUDPAddr("udp", fmt.Sprintf(":%d", localPeer.Port))
|
||||
if err != nil {
|
||||
@ -74,59 +93,47 @@ func newPeerMain(args mainArgs) *peerMain {
|
||||
conn.SetReadBuffer(1024 * 1024 * 8)
|
||||
conn.SetWriteBuffer(1024 * 1024 * 8)
|
||||
|
||||
// Wrap write function - this is necessary to avoid starvation.
|
||||
writeLock := sync.Mutex{}
|
||||
writeToUDPAddrPort := func(b []byte, addr netip.AddrPort) (n int, err error) {
|
||||
writeLock.Lock()
|
||||
n, err = conn.WriteToUDPAddrPort(b, addr)
|
||||
if err != nil {
|
||||
logf("Failed to write packet: %v", err)
|
||||
}
|
||||
writeLock.Unlock()
|
||||
return n, err
|
||||
}
|
||||
|
||||
var localAddr netip.AddrPort
|
||||
ip, localAddrValid := netip.AddrFromSlice(localPeer.PublicIP)
|
||||
if localAddrValid {
|
||||
localAddr = netip.AddrPortFrom(ip, localPeer.Port)
|
||||
}
|
||||
|
||||
rt := newRoutingTable(localPeer.PeerIP, localAddr)
|
||||
rtPtr := &atomic.Pointer[routingTable]{}
|
||||
rtPtr.Store(&rt)
|
||||
<<<<<<< HEAD
|
||||
g := NewGlobals(config, startupCount, localAddr, conn, iface)
|
||||
=======
|
||||
g := NewGlobals(config, localAddr, conn, iface)
|
||||
>>>>>>> 69f2536 (WIP)
|
||||
|
||||
ifReader := newIFReader(iface, writeToUDPAddrPort, rtPtr)
|
||||
super := newSupervisor(writeToUDPAddrPort, rtPtr, config.PrivKey)
|
||||
connReader := newConnReader(conn.ReadFromUDPAddrPort, writeToUDPAddrPort, iface, super.HandleControlMsg, rtPtr)
|
||||
hubPoller, err := newHubPoller(config.PeerIP, args.NetName, args.HubAddress, args.APIKey, super.HandleControlMsg)
|
||||
hubPoller, err := NewHubPoller(g, args.NetName, args.HubAddress, args.APIKey)
|
||||
if err != nil {
|
||||
log.Fatalf("Failed to create hub poller: %v", err)
|
||||
}
|
||||
|
||||
return &peerMain{
|
||||
conf: config,
|
||||
rt: rtPtr,
|
||||
iface: iface,
|
||||
ifReader: ifReader,
|
||||
connReader: connReader,
|
||||
Globals: g,
|
||||
ifReader: NewIFReader(g),
|
||||
connReader: NewConnReader(g, conn),
|
||||
hubPoller: hubPoller,
|
||||
super: super,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *peerMain) Run() {
|
||||
for i := range p.RemotePeers {
|
||||
remote := p.RemotePeers[i].Load()
|
||||
go newRemoteFSM(remote).Run()
|
||||
}
|
||||
|
||||
go p.ifReader.Run()
|
||||
go p.connReader.Run()
|
||||
p.super.Start()
|
||||
|
||||
if !p.rt.Load().LocalAddr.IsValid() {
|
||||
go runMCWriter(p.conf.PeerIP, p.conf.PrivSignKey)
|
||||
go runMCReader(p.rt, p.super.HandleControlMsg)
|
||||
if !p.LocalAddrValid {
|
||||
go RunMCWriter(p.LocalPeerIP, p.PrivSignKey)
|
||||
go RunMCReader(p.Globals)
|
||||
}
|
||||
|
||||
go p.hubPoller.Run()
|
||||
|
||||
select {}
|
||||
}
|
||||
|
||||
@ -171,8 +178,8 @@ func initPeerWithHub(args mainArgs) {
|
||||
log.Fatalf("Failed to parse configuration: %v\n%s", err, data)
|
||||
}
|
||||
|
||||
config := localConfig{}
|
||||
config.PeerIP = initResp.PeerIP
|
||||
config := LocalConfig{}
|
||||
config.LocalPeerIP = initResp.PeerIP
|
||||
config.Network = initResp.Network
|
||||
config.PubKey = keys.PubKey
|
||||
config.PrivKey = keys.PrivKey
|
||||
|
Loading…
x
Reference in New Issue
Block a user