//! Shared server routines for UDP TURN handling so integration tests can reuse the core loop. use std::sync::Arc; use tokio::net::UdpSocket; use tracing::{error, warn}; use crate::alloc::{AllocationManager, ClientSink}; use crate::alloc::AllocationError; use crate::auth::{AuthManager, AuthStatus, InMemoryStore}; use crate::constants::*; use crate::rate_limit::RateLimiters; use crate::stun::{ build_401_response, build_allocate_success_with_integrity_mode, build_error_response, build_error_response_with_integrity_mode, build_lifetime_success_with_integrity_mode, build_success_response_with_integrity_mode, decode_xor_peer_address, extract_lifetime_seconds, extract_requested_transport_protocol, parse_channel_data, parse_message, validate_fingerprint_if_present, }; use std::time::Duration; /// Main UDP reader loop shared between binary entry point and integration tests. pub async fn udp_reader_loop( udp: Arc, auth: AuthManager, allocs: AllocationManager, ) -> anyhow::Result<()> { udp_reader_loop_with_limits(udp, auth, allocs, Arc::new(RateLimiters::disabled())).await } /// UDP reader loop with explicit (per-server) rate limiters. pub async fn udp_reader_loop_with_limits( udp: Arc, auth: AuthManager, allocs: AllocationManager, rate_limiters: Arc, ) -> anyhow::Result<()> { let mut buf = vec![0u8; 1500]; loop { let (len, peer) = udp.recv_from(&mut buf).await?; tracing::debug!("got {} bytes from {}", len, peer); if let Some((channel, payload)) = parse_channel_data(&buf[..len]) { crate::metrics::inc_channel_data(); let allocation = match allocs.get_allocation(&peer) { Some(a) => a, None => { warn!("channel data without allocation from {}", peer); continue; } }; let target = match allocation.channel_peer(channel) { Some(addr) => addr, None => { warn!( "channel data with unknown channel 0x{:04x} from {}", channel, peer ); continue; } }; if !allocation.is_peer_allowed(&target) { warn!( "channel data target {} no longer permitted for {}", target, peer ); continue; } match allocation.send_to_peer(target, payload).await { Ok(sent) => tracing::debug!( "forwarded {} bytes via channel 0x{:04x} from {} to {}", sent, channel, peer, target ), Err(e) => error!( "failed to forward channel data 0x{:04x} from {} to {}: {:?}", channel, peer, target, e ), } continue; } if let Ok(msg) = parse_message(&buf[..len]) { if !validate_fingerprint_if_present(&msg) { tracing::debug!("dropping STUN/TURN message from {} due to invalid FINGERPRINT", peer); continue; } crate::metrics::inc_stun_messages(); tracing::info!( "STUN/TURN message from {} type=0x{:04x} len={}", peer, msg.header.msg_type, len ); let requires_auth = matches!( msg.header.msg_type, METHOD_ALLOCATE | METHOD_CREATE_PERMISSION | METHOD_CHANNEL_BIND | METHOD_SEND | METHOD_REFRESH ); if requires_auth { let (key, mi_mode) = match auth.authenticate(&msg, &peer).await { AuthStatus::Granted { username, key, mi_mode, } => { tracing::debug!( "TURN auth ok for {} as {} (0x{:04x})", peer, username, msg.header.msg_type ); (key, mi_mode) } AuthStatus::Challenge { nonce } => { crate::metrics::inc_auth_challenge(); if !rate_limiters.allow_unauth(peer.ip()) { crate::metrics::inc_rate_limited(); continue; } let resp = build_401_response( &msg.header, auth.realm(), &nonce, 401, "Unauthorized", ); let _ = udp.send_to(&resp, &peer).await; continue; } AuthStatus::StaleNonce { nonce } => { crate::metrics::inc_auth_stale(); if !rate_limiters.allow_unauth(peer.ip()) { crate::metrics::inc_rate_limited(); continue; } let resp = build_401_response( &msg.header, auth.realm(), &nonce, 438, "Stale Nonce", ); let _ = udp.send_to(&resp, &peer).await; continue; } AuthStatus::Reject { code, reason } => { crate::metrics::inc_auth_reject(); let resp = build_error_response(&msg.header, code, reason); let _ = udp.send_to(&resp, &peer).await; continue; } }; match msg.header.msg_type { METHOD_ALLOCATE => { crate::metrics::inc_allocate_total(); // TURN Allocate MUST include REQUESTED-TRANSPORT; WebRTC expects UDP (17). match extract_requested_transport_protocol(&msg) { Some(IPPROTO_UDP) => {} Some(_) => { crate::metrics::inc_allocate_fail(); let resp = build_error_response_with_integrity_mode( &msg.header, 442, "Unsupported Transport", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } None => { crate::metrics::inc_allocate_fail(); let resp = build_error_response_with_integrity_mode( &msg.header, 400, "Missing REQUESTED-TRANSPORT", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } } let requested_lifetime = extract_lifetime_seconds(&msg) .map(|secs| Duration::from_secs(secs as u64)) .filter(|d| !d.is_zero()); match allocs .allocate_for(peer, ClientSink::Udp { sock: udp.clone(), addr: peer, }) .await { Ok(relay_addr) => { let applied = match allocs.refresh_allocation(peer, requested_lifetime) { Ok(d) => d, Err(e) => { tracing::error!( "failed to apply lifetime for {}: {:?}", peer, e ); let resp = build_error_response_with_integrity_mode( &msg.header, 500, "Allocate Failed", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } }; let lifetime_secs = applied.as_secs().min(u32::MAX as u64) as u32; let advertised = allocs.relay_addr_for_response(relay_addr); let resp = build_allocate_success_with_integrity_mode( &msg.header, &advertised, lifetime_secs, &key, mi_mode, ); tracing::info!( "allocated relay {} for {} lifetime={}s", relay_addr, peer, lifetime_secs ); crate::metrics::inc_allocate_success(); let _ = udp.send_to(&resp, &peer).await; } Err(e) => { tracing::error!("allocate failed: {:?}", e); let (code, reason) = match e.downcast_ref::() { Some(AllocationError::AllocationQuotaExceeded) => { (486, "Allocation Quota Reached") } _ => (500, "Allocate Failed"), }; crate::metrics::inc_allocate_fail(); let resp = build_error_response_with_integrity_mode( &msg.header, code, reason, &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; } } continue; } METHOD_CREATE_PERMISSION => { if allocs.get_allocation(&peer).is_none() { warn!("create-permission without allocation from {}", peer); let resp = build_error_response_with_integrity_mode( &msg.header, 437, "Allocation Mismatch", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } let mut added = 0usize; for attr in msg .attributes .iter() .filter(|a| a.typ == ATTR_XOR_PEER_ADDRESS) { if let Some(peer_addr) = decode_xor_peer_address(&attr.value, &msg.header.transaction_id) { match allocs.add_permission(peer, peer_addr) { Ok(()) => { tracing::info!( "added permission for {} -> {}", peer, peer_addr ); crate::metrics::inc_permission_added(); added += 1; } Err(e) => { tracing::error!( "failed to persist permission {} -> {}: {:?}", peer, peer_addr, e ); if matches!( e.downcast_ref::(), Some(AllocationError::PermissionQuotaExceeded) ) { let resp = build_error_response_with_integrity_mode( &msg.header, 508, "Insufficient Capacity", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } } } } else { tracing::warn!("invalid XOR-PEER-ADDRESS in request from {}", peer); } } if added == 0 { let resp = build_error_response_with_integrity_mode( &msg.header, 400, "No valid XOR-PEER-ADDRESS", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; } else { let resp = build_success_response_with_integrity_mode(&msg.header, &key, mi_mode); let _ = udp.send_to(&resp, &peer).await; } continue; } METHOD_CHANNEL_BIND => { let allocation = match allocs.get_allocation(&peer) { Some(a) => a, None => { warn!("channel-bind without allocation from {}", peer); let resp = build_error_response_with_integrity_mode( &msg.header, 437, "Allocation Mismatch", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } }; let channel_attr = msg.attributes.iter().find(|a| a.typ == ATTR_CHANNEL_NUMBER); let peer_attr = msg .attributes .iter() .find(|a| a.typ == ATTR_XOR_PEER_ADDRESS); let (channel_attr, peer_attr) = match (channel_attr, peer_attr) { (Some(c), Some(p)) => (c, p), _ => { let resp = build_error_response_with_integrity_mode( &msg.header, 400, "Missing CHANNEL-NUMBER or XOR-PEER-ADDRESS", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } }; let channel = u16::from_be_bytes([channel_attr.value[0], channel_attr.value[1]]); let peer_addr = match decode_xor_peer_address( &peer_attr.value, &msg.header.transaction_id, ) { Some(addr) => addr, None => { let resp = build_error_response_with_integrity_mode( &msg.header, 400, "Invalid XOR-PEER-ADDRESS", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } }; if !allocation.is_peer_allowed(&peer_addr) { match allocs.add_permission(peer, peer_addr) { Ok(()) => { tracing::info!( "added implicit permission for {} -> {} (via CHANNEL-BIND)", peer, peer_addr ); crate::metrics::inc_permission_added(); } Err(e) => { tracing::error!( "failed to add implicit permission {} -> {}: {:?}", peer, peer_addr, e ); let (code, reason) = match e.downcast_ref::() { Some(AllocationError::PermissionQuotaExceeded) => { (508, "Insufficient Capacity") } _ => (403, "Peer Not Permitted"), }; let resp = build_error_response_with_integrity_mode( &msg.header, code, reason, &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } } } if let Err(e) = allocs.add_channel_binding(peer, channel, peer_addr) { tracing::error!( "failed to persist channel binding {} -> {} (0x{:04x}): {:?}", peer, peer_addr, channel, e ); let (code, reason) = match e.downcast_ref::() { Some(AllocationError::ChannelQuotaExceeded) => { (508, "Insufficient Capacity") } _ => (500, "Channel Bind Failed"), }; let resp = build_error_response_with_integrity_mode( &msg.header, code, reason, &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } crate::metrics::inc_channel_binding_added(); let resp = build_success_response_with_integrity_mode(&msg.header, &key, mi_mode); let _ = udp.send_to(&resp, &peer).await; continue; } METHOD_SEND => { let allocation = match allocs.get_allocation(&peer) { Some(a) => a, None => { warn!("send indication without allocation from {}", peer); let resp = build_error_response_with_integrity_mode( &msg.header, 437, "Allocation Mismatch", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } }; let peer_attr = msg .attributes .iter() .find(|a| a.typ == ATTR_XOR_PEER_ADDRESS); let data_attr = msg.attributes.iter().find(|a| a.typ == ATTR_DATA); let (peer_attr, data_attr) = match (peer_attr, data_attr) { (Some(p), Some(d)) => (p, d), _ => { let resp = build_error_response_with_integrity_mode( &msg.header, 400, "Missing DATA or XOR-PEER-ADDRESS", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } }; let peer_addr = match decode_xor_peer_address( &peer_attr.value, &msg.header.transaction_id, ) { Some(addr) => addr, None => { let resp = build_error_response_with_integrity_mode( &msg.header, 400, "Invalid XOR-PEER-ADDRESS", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } }; if !allocation.is_peer_allowed(&peer_addr) { let resp = build_error_response_with_integrity_mode( &msg.header, 403, "Peer Not Permitted", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } match allocation.send_to_peer(peer_addr, &data_attr.value).await { Ok(sent) => { tracing::info!( "forwarded {} bytes from {} to {}", sent, peer, peer_addr ); let resp = build_success_response_with_integrity_mode(&msg.header, &key, mi_mode); let _ = udp.send_to(&resp, &peer).await; } Err(e) => { tracing::error!( "failed to send payload from {} to {}: {:?}", peer, peer_addr, e ); let resp = build_error_response_with_integrity_mode( &msg.header, 500, "Peer Send Failed", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; } } continue; } METHOD_REFRESH => { let requested = extract_lifetime_seconds(&msg) .map(|secs| Duration::from_secs(secs as u64)); match allocs.refresh_allocation(peer, requested) { Ok(applied) => { if applied.is_zero() { tracing::info!("allocation for {} released", peer); } else { tracing::debug!( "allocation for {} refreshed to {}s", peer, applied.as_secs() ); } let resp = build_lifetime_success_with_integrity_mode( &msg.header, applied.as_secs().min(u32::MAX as u64) as u32, &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; } Err(_) => { let resp = build_error_response_with_integrity_mode( &msg.header, 437, "Allocation Mismatch", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; } } continue; } _ => { let resp = build_error_response_with_integrity_mode( &msg.header, 420, "Unknown TURN Method", &key, mi_mode, ); let _ = udp.send_to(&resp, &peer).await; continue; } } } match msg.header.msg_type { METHOD_BINDING => { if rate_limiters.allow_binding(peer.ip()) { let resp = crate::stun::build_binding_success(&msg.header, &peer); let _ = udp.send_to(&resp, &peer).await; } else { crate::metrics::inc_rate_limited(); } } _ => { if !rate_limiters.allow_unauth(peer.ip()) { crate::metrics::inc_rate_limited(); continue; } let nonce = auth.mint_nonce(&peer); let resp = build_401_response(&msg.header, auth.realm(), &nonce, 401, "Unauthorized"); if let Err(e) = udp.send_to(&resp, &peer).await { error!("failed to send 401: {:?}", e); } } } } else { tracing::debug!("Non-STUN or parse error from {} len={}", peer, len); } } }