vppn/node/peer-states.go

377 lines
8.5 KiB
Go

package node
import (
"fmt"
"log"
"math/rand"
"net/netip"
"sync/atomic"
"time"
"vppn/m"
)
type peerState interface {
Name() string
OnSyn(netip.AddrPort, synPacket) peerState
OnSynAck(netip.AddrPort, synAckPacket) peerState
OnAck(netip.AddrPort, ackPacket) peerState
// When the peer is updated, we reset. Handled by base state.
OnPeerUpdate(*m.Peer) peerState
// To determe up / dataCipher. Handled by base state.
OnPing(netip.AddrPort, pingPacket) peerState
OnPong(netip.AddrPort, pongPacket) peerState
OnPingTimer() peerState
OnTimeoutTimer() peerState
}
// ----------------------------------------------------------------------------
type stateBase struct {
// The purpose of this state machine is to manage this published data.
published *atomic.Pointer[peerRoutingData]
staged peerRoutingData // Local copy of shared data. See publish().
// The other remote peers.
peers *remotePeers
// Immutable data.
localIP byte
localPub bool
remoteIP byte
privKey []byte
conn *connWriter
// For sending to peer.
counter *uint64
// Mutable peer data.
peer *m.Peer
remotePub bool
// Timers
pingTimer *time.Timer
timeoutTimer *time.Timer
buf []byte
encBuf []byte
}
func (sb *stateBase) Name() string { return "idle" }
func (s *stateBase) OnPeerUpdate(peer *m.Peer) peerState {
// Both nil: no change.
if peer == nil && s.peer == nil {
return nil
}
// No change.
if peer != nil && s.peer != nil && s.peer.Version == peer.Version {
return nil
}
return s.selectStateFromPeer(peer)
}
func (s *stateBase) selectStateFromPeer(peer *m.Peer) peerState {
s.peer = peer
s.staged = peerRoutingData{}
defer s.publish()
if peer == nil {
return newStateNoPeer(s)
}
s.staged.controlCipher = newControlCipher(s.privKey, peer.EncPubKey)
ip, isValid := netip.AddrFromSlice(peer.PublicIP)
if isValid {
s.remotePub = true
s.staged.remoteAddr = netip.AddrPortFrom(ip, peer.Port)
s.staged.relay = peer.Mediator
if s.localPub && s.localIP < s.remoteIP {
return newStateServer(s)
}
return newStateClient(s)
}
if s.localPub {
return newStateServer(s)
}
return newStateSelectRelay(s)
}
func (s *stateBase) OnSyn(rAddr netip.AddrPort, p synPacket) peerState { return nil }
func (s *stateBase) OnSynAck(rAddr netip.AddrPort, p synAckPacket) peerState { return nil }
func (s *stateBase) OnAck(rAddr netip.AddrPort, p ackPacket) peerState { return nil }
func (s *stateBase) OnPing(rAddr netip.AddrPort, p pingPacket) peerState { return nil }
func (s *stateBase) OnPong(rAddr netip.AddrPort, p pongPacket) peerState { return nil }
func (s *stateBase) OnPingTimer() peerState { return nil }
func (s *stateBase) OnTimeoutTimer() peerState {
return s.selectStateFromPeer(s.peer)
}
// Helpers.
func (s *stateBase) resetPingTimer() { s.pingTimer.Reset(pingInterval) }
func (s *stateBase) resetTimeoutTimer() { s.timeoutTimer.Reset(timeoutInterval) }
func (s *stateBase) stopPingTimer() { s.pingTimer.Stop() }
func (s *stateBase) stopTimeoutTimer() { s.timeoutTimer.Stop() }
func (s *stateBase) logf(msg string, args ...any) {
log.Printf(fmt.Sprintf("[%03d] ", s.remoteIP)+msg, args...)
}
func (s *stateBase) publish() {
data := s.staged
s.published.Store(&data)
}
func (s *stateBase) selectRelay() byte {
possible := make([]byte, 0, 8)
for i, peer := range s.peers {
if peer.CanRelay() {
possible = append(possible, byte(i))
}
}
if len(possible) == 0 {
return 0
}
return possible[rand.Intn(len(possible))]
}
func (s *stateBase) sendPing(sharedKey [32]byte) {
s.sendControlPacket(newPingPacket(sharedKey))
}
func (s *stateBase) sendPong(ping pingPacket) {
s.sendControlPacket(newPongPacket(ping.SentAt))
}
func (s *stateBase) sendControlPacket(pkt interface{ Marshal([]byte) []byte }) {
buf := pkt.Marshal(s.buf)
h := header{
StreamID: controlStreamID,
Counter: atomic.AddUint64(s.counter, 1),
SourceIP: s.localIP,
DestIP: s.remoteIP,
}
buf = s.staged.controlCipher.Encrypt(h, buf, s.encBuf)
if s.staged.relayIP != 0 {
s.peers[s.staged.relayIP].RelayFor(s.remoteIP, buf)
} else {
s.conn.WriteTo(buf, s.staged.remoteAddr)
}
}
// ----------------------------------------------------------------------------
type stateNoPeer struct{ *stateBase }
func newStateNoPeer(b *stateBase) *stateNoPeer {
s := &stateNoPeer{b}
s.pingTimer.Stop()
s.timeoutTimer.Stop()
s.publish()
return s
}
// ----------------------------------------------------------------------------
type stateClient struct {
sharedKey [32]byte
*stateBase
}
func newStateClient(b *stateBase) peerState {
s := &stateClient{stateBase: b}
s.publish()
s.staged.dataCipher = newDataCipher()
s.sharedKey = s.staged.dataCipher.Key()
s.sendPing(s.sharedKey)
s.resetPingTimer()
s.resetTimeoutTimer()
return s
}
func (s *stateClient) Name() string { return "client" }
func (s *stateClient) OnPong(addr netip.AddrPort, p pongPacket) peerState {
if !s.staged.up {
s.staged.up = true
s.publish()
}
s.resetTimeoutTimer()
return nil
}
func (s *stateClient) OnPingTimer() peerState {
s.sendPing(s.sharedKey)
s.resetPingTimer()
return nil
}
func (s *stateClient) OnTimeoutTimer() peerState {
s.staged.up = false
s.publish()
return nil
}
// ----------------------------------------------------------------------------
type stateServer struct {
*stateBase
}
func newStateServer(b *stateBase) peerState {
s := &stateServer{b}
s.publish()
s.stopPingTimer()
s.stopTimeoutTimer()
return s
}
func (s *stateServer) Name() string { return "server" }
func (s *stateServer) OnPing(addr netip.AddrPort, p pingPacket) peerState {
if addr != s.staged.remoteAddr {
s.logf("Got new peer address: %v", addr)
s.staged.remoteAddr = addr
s.staged.up = true
s.publish()
}
if s.staged.dataCipher == nil || p.SharedKey != s.staged.dataCipher.Key() {
s.logf("Got new shared key.")
s.staged.dataCipher = newDataCipherFromKey(p.SharedKey)
s.staged.up = true
s.publish()
}
s.sendPong(p)
return nil
}
// ----------------------------------------------------------------------------
type stateSelectRelay struct {
*stateBase
}
func newStateSelectRelay(b *stateBase) peerState {
s := &stateSelectRelay{stateBase: b}
s.staged.dataCipher = nil
s.staged.up = false
s.publish()
if relay := s.selectRelay(); relay != 0 {
s.staged.up = false
s.staged.relayIP = relay
return s.selectRole()
}
s.resetPingTimer()
s.stopTimeoutTimer()
return s
}
func (s *stateSelectRelay) selectRole() peerState {
if s.localIP < s.remoteIP {
return newStateServerRelayed(s.stateBase)
}
return newStateClientRelayed(s.stateBase)
}
func (s *stateSelectRelay) Name() string { return "select-relay" }
func (s *stateSelectRelay) OnPingTimer() peerState {
if relay := s.selectRelay(); relay != 0 {
s.logf("Got relay IP: %d", relay)
s.staged.relayIP = relay
return s.selectRole()
}
s.resetPingTimer()
return nil
}
// ----------------------------------------------------------------------------
type stateClientRelayed struct {
sharedKey [32]byte
*stateBase
}
func newStateClientRelayed(b *stateBase) peerState {
s := &stateClientRelayed{stateBase: b}
s.staged.dataCipher = newDataCipher()
s.sharedKey = s.staged.dataCipher.Key()
s.publish()
s.sendPing(s.sharedKey)
s.resetPingTimer()
s.resetTimeoutTimer()
return s
}
func (s *stateClientRelayed) Name() string { return "client-relayed" }
func (s *stateClientRelayed) OnPong(addr netip.AddrPort, p pongPacket) peerState {
if !s.staged.up {
s.staged.up = true
s.publish()
}
s.resetTimeoutTimer()
return nil
}
func (s *stateClientRelayed) OnPingTimer() peerState {
s.sendPing(s.sharedKey)
s.resetPingTimer()
return nil
}
func (s *stateClientRelayed) OnTimeoutTimer() peerState {
return newStateSelectRelay(s.stateBase)
}
// ----------------------------------------------------------------------------
type stateServerRelayed struct {
*stateBase
}
func newStateServerRelayed(b *stateBase) peerState {
s := &stateServerRelayed{b}
s.stopPingTimer()
s.resetTimeoutTimer()
return s
}
func (s *stateServerRelayed) Name() string { return "server-relayed" }
func (s *stateServerRelayed) OnPing(addr netip.AddrPort, p pingPacket) peerState {
if s.staged.dataCipher == nil || p.SharedKey != s.staged.dataCipher.Key() {
s.logf("Got new shared key.")
s.staged.up = true
s.staged.dataCipher = newDataCipherFromKey(p.SharedKey)
s.publish()
}
s.sendPong(p)
s.resetTimeoutTimer()
return nil
}
func (s *stateServerRelayed) OnTimeoutTimer() peerState {
return newStateSelectRelay(s.stateBase)
}