package multicast import ( "bytes" "fmt" "log" "net" "net/netip" "time" "git.crumpington.com/lib/ratelimiter" ) func Receiver(selfVPNIP netip.Addr, ch chan<- Packet) { for { if err := receiver(selfVPNIP, ch); err != nil { log.Printf("[MCReader] %v", err) } time.Sleep(errorTimeout) } } func receiver(selfVPNIP netip.Addr, ch chan<- Packet) error { limiters := map[netip.Addr]*ratelimiter.Limiter{} selfIP := selfVPNIP.As4()[3] addr := multicastAddr(selfVPNIP) log.Printf("[MC Receiver] Listening on %v.", addr) conn, err := net.ListenMulticastUDP("udp", nil, addr) if err != nil { return fmt.Errorf("bind: %w", err) } defer conn.Close() buf := make([]byte, SignedPacketSize+1) // +1 to detect oversized packets for { conn.SetReadDeadline(time.Now().Add(32 * time.Second)) n, src, err := conn.ReadFromUDPAddrPort(buf) if err != nil { if ne, ok := err.(net.Error); ok && ne.Timeout() { continue } return fmt.Errorf("read: %w", err) } if n != SignedPacketSize { continue } packet := unmarshal(buf[:n]) if packet.PeerIP == selfIP { continue } // Slightly cheaper than limiting. age := time.Since(time.Unix(packet.Timestamp, 0)) if age > maxPacketAge || age < -maxPacketAge { continue } srcAddr := src.Addr().Unmap() lim, ok := limiters[srcAddr] if !ok { lim = ratelimiter.New(ratelimiter.Config{ BurstLimit: 1, FillPeriod: broadcastInterval / 2, MaxWaitCount: 0, }) limiters[srcAddr] = lim } if err := lim.Limit(); err != nil { log.Printf("Rate limited packet from peer IP %d.", packet.PeerIP) continue } packet.Signed = bytes.Clone(packet.Signed) packet.Src = src.Addr().Unmap() ch <- packet } } func multicastAddr(vpnIP netip.Addr) *net.UDPAddr { b := vpnIP.As4() return net.UDPAddrFromAddrPort( netip.AddrPortFrom( netip.AddrFrom4([4]byte{239, b[1], b[2], 0}), 4560)) }