niom-turn/src/server.rs

644 lines
30 KiB
Rust

//! Shared server routines for UDP TURN handling so integration tests can reuse the core loop.
use std::sync::Arc;
use tokio::net::UdpSocket;
use tracing::{error, warn};
use crate::alloc::{AllocationManager, ClientSink};
use crate::alloc::AllocationError;
use crate::auth::{AuthManager, AuthStatus, InMemoryStore};
use crate::constants::*;
use crate::rate_limit::RateLimiters;
use crate::stun::{
build_401_response, build_allocate_success_with_integrity_mode, build_error_response,
build_error_response_with_integrity_mode, build_lifetime_success_with_integrity_mode,
build_success_response_with_integrity_mode, decode_xor_peer_address,
extract_lifetime_seconds, extract_requested_transport_protocol, parse_channel_data,
parse_message, validate_fingerprint_if_present,
};
use std::time::Duration;
/// Main UDP reader loop shared between binary entry point and integration tests.
pub async fn udp_reader_loop(
udp: Arc<UdpSocket>,
auth: AuthManager<InMemoryStore>,
allocs: AllocationManager,
) -> anyhow::Result<()> {
udp_reader_loop_with_limits(udp, auth, allocs, Arc::new(RateLimiters::disabled())).await
}
/// UDP reader loop with explicit (per-server) rate limiters.
pub async fn udp_reader_loop_with_limits(
udp: Arc<UdpSocket>,
auth: AuthManager<InMemoryStore>,
allocs: AllocationManager,
rate_limiters: Arc<RateLimiters>,
) -> anyhow::Result<()> {
let mut buf = vec![0u8; 1500];
loop {
let (len, peer) = udp.recv_from(&mut buf).await?;
tracing::debug!("got {} bytes from {}", len, peer);
if let Some((channel, payload)) = parse_channel_data(&buf[..len]) {
crate::metrics::inc_channel_data();
let allocation = match allocs.get_allocation(&peer) {
Some(a) => a,
None => {
warn!("channel data without allocation from {}", peer);
continue;
}
};
let target = match allocation.channel_peer(channel) {
Some(addr) => addr,
None => {
warn!(
"channel data with unknown channel 0x{:04x} from {}",
channel, peer
);
continue;
}
};
if !allocation.is_peer_allowed(&target) {
warn!(
"channel data target {} no longer permitted for {}",
target, peer
);
continue;
}
match allocation.send_to_peer(target, payload).await {
Ok(sent) => tracing::debug!(
"forwarded {} bytes via channel 0x{:04x} from {} to {}",
sent,
channel,
peer,
target
),
Err(e) => error!(
"failed to forward channel data 0x{:04x} from {} to {}: {:?}",
channel, peer, target, e
),
}
continue;
}
if let Ok(msg) = parse_message(&buf[..len]) {
if !validate_fingerprint_if_present(&msg) {
tracing::debug!("dropping STUN/TURN message from {} due to invalid FINGERPRINT", peer);
continue;
}
crate::metrics::inc_stun_messages();
tracing::info!(
"STUN/TURN message from {} type=0x{:04x} len={}",
peer,
msg.header.msg_type,
len
);
let requires_auth = matches!(
msg.header.msg_type,
METHOD_ALLOCATE
| METHOD_CREATE_PERMISSION
| METHOD_CHANNEL_BIND
| METHOD_SEND
| METHOD_REFRESH
);
if requires_auth {
let (key, mi_mode) = match auth.authenticate(&msg, &peer).await {
AuthStatus::Granted {
username,
key,
mi_mode,
} => {
tracing::debug!(
"TURN auth ok for {} as {} (0x{:04x})",
peer,
username,
msg.header.msg_type
);
(key, mi_mode)
}
AuthStatus::Challenge { nonce } => {
crate::metrics::inc_auth_challenge();
if !rate_limiters.allow_unauth(peer.ip()) {
crate::metrics::inc_rate_limited();
continue;
}
let resp = build_401_response(
&msg.header,
auth.realm(),
&nonce,
401,
"Unauthorized",
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
AuthStatus::StaleNonce { nonce } => {
crate::metrics::inc_auth_stale();
if !rate_limiters.allow_unauth(peer.ip()) {
crate::metrics::inc_rate_limited();
continue;
}
let resp = build_401_response(
&msg.header,
auth.realm(),
&nonce,
438,
"Stale Nonce",
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
AuthStatus::Reject { code, reason } => {
crate::metrics::inc_auth_reject();
let resp = build_error_response(&msg.header, code, reason);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
match msg.header.msg_type {
METHOD_ALLOCATE => {
crate::metrics::inc_allocate_total();
// TURN Allocate MUST include REQUESTED-TRANSPORT; WebRTC expects UDP (17).
match extract_requested_transport_protocol(&msg) {
Some(IPPROTO_UDP) => {}
Some(_) => {
crate::metrics::inc_allocate_fail();
let resp = build_error_response_with_integrity_mode(
&msg.header,
442,
"Unsupported Transport",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
None => {
crate::metrics::inc_allocate_fail();
let resp = build_error_response_with_integrity_mode(
&msg.header,
400,
"Missing REQUESTED-TRANSPORT",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
}
let requested_lifetime = extract_lifetime_seconds(&msg)
.map(|secs| Duration::from_secs(secs as u64))
.filter(|d| !d.is_zero());
match allocs
.allocate_for(peer, ClientSink::Udp {
sock: udp.clone(),
addr: peer,
})
.await
{
Ok(relay_addr) => {
let applied =
match allocs.refresh_allocation(peer, requested_lifetime) {
Ok(d) => d,
Err(e) => {
tracing::error!(
"failed to apply lifetime for {}: {:?}",
peer,
e
);
let resp = build_error_response_with_integrity_mode(
&msg.header,
500,
"Allocate Failed",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
let lifetime_secs = applied.as_secs().min(u32::MAX as u64) as u32;
let advertised = allocs.relay_addr_for_response(relay_addr);
let resp = build_allocate_success_with_integrity_mode(
&msg.header,
&advertised,
lifetime_secs,
&key,
mi_mode,
);
tracing::info!(
"allocated relay {} for {} lifetime={}s",
relay_addr,
peer,
lifetime_secs
);
crate::metrics::inc_allocate_success();
let _ = udp.send_to(&resp, &peer).await;
}
Err(e) => {
tracing::error!("allocate failed: {:?}", e);
let (code, reason) = match e.downcast_ref::<AllocationError>() {
Some(AllocationError::AllocationQuotaExceeded) => {
(486, "Allocation Quota Reached")
}
_ => (500, "Allocate Failed"),
};
crate::metrics::inc_allocate_fail();
let resp = build_error_response_with_integrity_mode(
&msg.header,
code,
reason,
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
}
}
continue;
}
METHOD_CREATE_PERMISSION => {
if allocs.get_allocation(&peer).is_none() {
warn!("create-permission without allocation from {}", peer);
let resp =
build_error_response_with_integrity_mode(
&msg.header,
437,
"Allocation Mismatch",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
let mut added = 0usize;
for attr in msg
.attributes
.iter()
.filter(|a| a.typ == ATTR_XOR_PEER_ADDRESS)
{
if let Some(peer_addr) =
decode_xor_peer_address(&attr.value, &msg.header.transaction_id)
{
match allocs.add_permission(peer, peer_addr) {
Ok(()) => {
tracing::info!(
"added permission for {} -> {}",
peer,
peer_addr
);
crate::metrics::inc_permission_added();
added += 1;
}
Err(e) => {
tracing::error!(
"failed to persist permission {} -> {}: {:?}",
peer,
peer_addr,
e
);
if matches!(
e.downcast_ref::<AllocationError>(),
Some(AllocationError::PermissionQuotaExceeded)
) {
let resp = build_error_response_with_integrity_mode(
&msg.header,
508,
"Insufficient Capacity",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
}
}
} else {
tracing::warn!("invalid XOR-PEER-ADDRESS in request from {}", peer);
}
}
if added == 0 {
let resp = build_error_response_with_integrity_mode(
&msg.header,
400,
"No valid XOR-PEER-ADDRESS",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
} else {
let resp =
build_success_response_with_integrity_mode(&msg.header, &key, mi_mode);
let _ = udp.send_to(&resp, &peer).await;
}
continue;
}
METHOD_CHANNEL_BIND => {
let allocation = match allocs.get_allocation(&peer) {
Some(a) => a,
None => {
warn!("channel-bind without allocation from {}", peer);
let resp = build_error_response_with_integrity_mode(
&msg.header,
437,
"Allocation Mismatch",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
let channel_attr =
msg.attributes.iter().find(|a| a.typ == ATTR_CHANNEL_NUMBER);
let peer_attr = msg
.attributes
.iter()
.find(|a| a.typ == ATTR_XOR_PEER_ADDRESS);
let (channel_attr, peer_attr) = match (channel_attr, peer_attr) {
(Some(c), Some(p)) => (c, p),
_ => {
let resp = build_error_response_with_integrity_mode(
&msg.header,
400,
"Missing CHANNEL-NUMBER or XOR-PEER-ADDRESS",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
let channel =
u16::from_be_bytes([channel_attr.value[0], channel_attr.value[1]]);
let peer_addr = match decode_xor_peer_address(
&peer_attr.value,
&msg.header.transaction_id,
) {
Some(addr) => addr,
None => {
let resp = build_error_response_with_integrity_mode(
&msg.header,
400,
"Invalid XOR-PEER-ADDRESS",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
if !allocation.is_peer_allowed(&peer_addr) {
match allocs.add_permission(peer, peer_addr) {
Ok(()) => {
tracing::info!(
"added implicit permission for {} -> {} (via CHANNEL-BIND)",
peer,
peer_addr
);
crate::metrics::inc_permission_added();
}
Err(e) => {
tracing::error!(
"failed to add implicit permission {} -> {}: {:?}",
peer,
peer_addr,
e
);
let (code, reason) = match e.downcast_ref::<AllocationError>() {
Some(AllocationError::PermissionQuotaExceeded) => {
(508, "Insufficient Capacity")
}
_ => (403, "Peer Not Permitted"),
};
let resp = build_error_response_with_integrity_mode(
&msg.header,
code,
reason,
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
}
}
if let Err(e) = allocs.add_channel_binding(peer, channel, peer_addr) {
tracing::error!(
"failed to persist channel binding {} -> {} (0x{:04x}): {:?}",
peer,
peer_addr,
channel,
e
);
let (code, reason) = match e.downcast_ref::<AllocationError>() {
Some(AllocationError::ChannelQuotaExceeded) => {
(508, "Insufficient Capacity")
}
_ => (500, "Channel Bind Failed"),
};
let resp = build_error_response_with_integrity_mode(
&msg.header,
code,
reason,
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
crate::metrics::inc_channel_binding_added();
let resp = build_success_response_with_integrity_mode(&msg.header, &key, mi_mode);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
METHOD_SEND => {
let allocation = match allocs.get_allocation(&peer) {
Some(a) => a,
None => {
warn!("send indication without allocation from {}", peer);
let resp = build_error_response_with_integrity_mode(
&msg.header,
437,
"Allocation Mismatch",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
let peer_attr = msg
.attributes
.iter()
.find(|a| a.typ == ATTR_XOR_PEER_ADDRESS);
let data_attr = msg.attributes.iter().find(|a| a.typ == ATTR_DATA);
let (peer_attr, data_attr) = match (peer_attr, data_attr) {
(Some(p), Some(d)) => (p, d),
_ => {
let resp = build_error_response_with_integrity_mode(
&msg.header,
400,
"Missing DATA or XOR-PEER-ADDRESS",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
let peer_addr = match decode_xor_peer_address(
&peer_attr.value,
&msg.header.transaction_id,
) {
Some(addr) => addr,
None => {
let resp = build_error_response_with_integrity_mode(
&msg.header,
400,
"Invalid XOR-PEER-ADDRESS",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
};
if !allocation.is_peer_allowed(&peer_addr) {
let resp = build_error_response_with_integrity_mode(
&msg.header,
403,
"Peer Not Permitted",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
match allocation.send_to_peer(peer_addr, &data_attr.value).await {
Ok(sent) => {
tracing::info!(
"forwarded {} bytes from {} to {}",
sent,
peer,
peer_addr
);
let resp =
build_success_response_with_integrity_mode(&msg.header, &key, mi_mode);
let _ = udp.send_to(&resp, &peer).await;
}
Err(e) => {
tracing::error!(
"failed to send payload from {} to {}: {:?}",
peer,
peer_addr,
e
);
let resp = build_error_response_with_integrity_mode(
&msg.header,
500,
"Peer Send Failed",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
}
}
continue;
}
METHOD_REFRESH => {
let requested = extract_lifetime_seconds(&msg)
.map(|secs| Duration::from_secs(secs as u64));
match allocs.refresh_allocation(peer, requested) {
Ok(applied) => {
if applied.is_zero() {
tracing::info!("allocation for {} released", peer);
} else {
tracing::debug!(
"allocation for {} refreshed to {}s",
peer,
applied.as_secs()
);
}
let resp = build_lifetime_success_with_integrity_mode(
&msg.header,
applied.as_secs().min(u32::MAX as u64) as u32,
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
}
Err(_) => {
let resp = build_error_response_with_integrity_mode(
&msg.header,
437,
"Allocation Mismatch",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
}
}
continue;
}
_ => {
let resp = build_error_response_with_integrity_mode(
&msg.header,
420,
"Unknown TURN Method",
&key,
mi_mode,
);
let _ = udp.send_to(&resp, &peer).await;
continue;
}
}
}
match msg.header.msg_type {
METHOD_BINDING => {
if rate_limiters.allow_binding(peer.ip()) {
let resp = crate::stun::build_binding_success(&msg.header, &peer);
let _ = udp.send_to(&resp, &peer).await;
} else {
crate::metrics::inc_rate_limited();
}
}
_ => {
if !rate_limiters.allow_unauth(peer.ip()) {
crate::metrics::inc_rate_limited();
continue;
}
let nonce = auth.mint_nonce(&peer);
let resp =
build_401_response(&msg.header, auth.realm(), &nonce, 401, "Unauthorized");
if let Err(e) = udp.send_to(&resp, &peer).await {
error!("failed to send 401: {:?}", e);
}
}
}
} else {
tracing::debug!("Non-STUN or parse error from {} len={}", peer, len);
}
}
}