644 lines
30 KiB
Rust
644 lines
30 KiB
Rust
//! Shared server routines for UDP TURN handling so integration tests can reuse the core loop.
|
|
use std::sync::Arc;
|
|
use tokio::net::UdpSocket;
|
|
use tracing::{error, warn};
|
|
|
|
use crate::alloc::{AllocationManager, ClientSink};
|
|
use crate::alloc::AllocationError;
|
|
use crate::auth::{AuthManager, AuthStatus, InMemoryStore};
|
|
use crate::constants::*;
|
|
use crate::rate_limit::RateLimiters;
|
|
use crate::stun::{
|
|
build_401_response, build_allocate_success_with_integrity_mode, build_error_response,
|
|
build_error_response_with_integrity_mode, build_lifetime_success_with_integrity_mode,
|
|
build_success_response_with_integrity_mode, decode_xor_peer_address,
|
|
extract_lifetime_seconds, extract_requested_transport_protocol, parse_channel_data,
|
|
parse_message, validate_fingerprint_if_present,
|
|
};
|
|
use std::time::Duration;
|
|
|
|
/// Main UDP reader loop shared between binary entry point and integration tests.
|
|
pub async fn udp_reader_loop(
|
|
udp: Arc<UdpSocket>,
|
|
auth: AuthManager<InMemoryStore>,
|
|
allocs: AllocationManager,
|
|
) -> anyhow::Result<()> {
|
|
udp_reader_loop_with_limits(udp, auth, allocs, Arc::new(RateLimiters::disabled())).await
|
|
}
|
|
|
|
/// UDP reader loop with explicit (per-server) rate limiters.
|
|
pub async fn udp_reader_loop_with_limits(
|
|
udp: Arc<UdpSocket>,
|
|
auth: AuthManager<InMemoryStore>,
|
|
allocs: AllocationManager,
|
|
rate_limiters: Arc<RateLimiters>,
|
|
) -> anyhow::Result<()> {
|
|
let mut buf = vec![0u8; 1500];
|
|
loop {
|
|
let (len, peer) = udp.recv_from(&mut buf).await?;
|
|
tracing::debug!("got {} bytes from {}", len, peer);
|
|
|
|
if let Some((channel, payload)) = parse_channel_data(&buf[..len]) {
|
|
crate::metrics::inc_channel_data();
|
|
let allocation = match allocs.get_allocation(&peer) {
|
|
Some(a) => a,
|
|
None => {
|
|
warn!("channel data without allocation from {}", peer);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let target = match allocation.channel_peer(channel) {
|
|
Some(addr) => addr,
|
|
None => {
|
|
warn!(
|
|
"channel data with unknown channel 0x{:04x} from {}",
|
|
channel, peer
|
|
);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
if !allocation.is_peer_allowed(&target) {
|
|
warn!(
|
|
"channel data target {} no longer permitted for {}",
|
|
target, peer
|
|
);
|
|
continue;
|
|
}
|
|
|
|
match allocation.send_to_peer(target, payload).await {
|
|
Ok(sent) => tracing::debug!(
|
|
"forwarded {} bytes via channel 0x{:04x} from {} to {}",
|
|
sent,
|
|
channel,
|
|
peer,
|
|
target
|
|
),
|
|
Err(e) => error!(
|
|
"failed to forward channel data 0x{:04x} from {} to {}: {:?}",
|
|
channel, peer, target, e
|
|
),
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if let Ok(msg) = parse_message(&buf[..len]) {
|
|
if !validate_fingerprint_if_present(&msg) {
|
|
tracing::debug!("dropping STUN/TURN message from {} due to invalid FINGERPRINT", peer);
|
|
continue;
|
|
}
|
|
crate::metrics::inc_stun_messages();
|
|
tracing::info!(
|
|
"STUN/TURN message from {} type=0x{:04x} len={}",
|
|
peer,
|
|
msg.header.msg_type,
|
|
len
|
|
);
|
|
let requires_auth = matches!(
|
|
msg.header.msg_type,
|
|
METHOD_ALLOCATE
|
|
| METHOD_CREATE_PERMISSION
|
|
| METHOD_CHANNEL_BIND
|
|
| METHOD_SEND
|
|
| METHOD_REFRESH
|
|
);
|
|
|
|
if requires_auth {
|
|
let (key, mi_mode) = match auth.authenticate(&msg, &peer).await {
|
|
AuthStatus::Granted {
|
|
username,
|
|
key,
|
|
mi_mode,
|
|
} => {
|
|
tracing::debug!(
|
|
"TURN auth ok for {} as {} (0x{:04x})",
|
|
peer,
|
|
username,
|
|
msg.header.msg_type
|
|
);
|
|
(key, mi_mode)
|
|
}
|
|
AuthStatus::Challenge { nonce } => {
|
|
crate::metrics::inc_auth_challenge();
|
|
if !rate_limiters.allow_unauth(peer.ip()) {
|
|
crate::metrics::inc_rate_limited();
|
|
continue;
|
|
}
|
|
let resp = build_401_response(
|
|
&msg.header,
|
|
auth.realm(),
|
|
&nonce,
|
|
401,
|
|
"Unauthorized",
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
AuthStatus::StaleNonce { nonce } => {
|
|
crate::metrics::inc_auth_stale();
|
|
if !rate_limiters.allow_unauth(peer.ip()) {
|
|
crate::metrics::inc_rate_limited();
|
|
continue;
|
|
}
|
|
let resp = build_401_response(
|
|
&msg.header,
|
|
auth.realm(),
|
|
&nonce,
|
|
438,
|
|
"Stale Nonce",
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
AuthStatus::Reject { code, reason } => {
|
|
crate::metrics::inc_auth_reject();
|
|
let resp = build_error_response(&msg.header, code, reason);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
match msg.header.msg_type {
|
|
METHOD_ALLOCATE => {
|
|
crate::metrics::inc_allocate_total();
|
|
|
|
// TURN Allocate MUST include REQUESTED-TRANSPORT; WebRTC expects UDP (17).
|
|
match extract_requested_transport_protocol(&msg) {
|
|
Some(IPPROTO_UDP) => {}
|
|
Some(_) => {
|
|
crate::metrics::inc_allocate_fail();
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
442,
|
|
"Unsupported Transport",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
None => {
|
|
crate::metrics::inc_allocate_fail();
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
400,
|
|
"Missing REQUESTED-TRANSPORT",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
}
|
|
|
|
let requested_lifetime = extract_lifetime_seconds(&msg)
|
|
.map(|secs| Duration::from_secs(secs as u64))
|
|
.filter(|d| !d.is_zero());
|
|
|
|
match allocs
|
|
.allocate_for(peer, ClientSink::Udp {
|
|
sock: udp.clone(),
|
|
addr: peer,
|
|
})
|
|
.await
|
|
{
|
|
Ok(relay_addr) => {
|
|
let applied =
|
|
match allocs.refresh_allocation(peer, requested_lifetime) {
|
|
Ok(d) => d,
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"failed to apply lifetime for {}: {:?}",
|
|
peer,
|
|
e
|
|
);
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
500,
|
|
"Allocate Failed",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let lifetime_secs = applied.as_secs().min(u32::MAX as u64) as u32;
|
|
let advertised = allocs.relay_addr_for_response(relay_addr);
|
|
let resp = build_allocate_success_with_integrity_mode(
|
|
&msg.header,
|
|
&advertised,
|
|
lifetime_secs,
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
tracing::info!(
|
|
"allocated relay {} for {} lifetime={}s",
|
|
relay_addr,
|
|
peer,
|
|
lifetime_secs
|
|
);
|
|
crate::metrics::inc_allocate_success();
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
}
|
|
Err(e) => {
|
|
tracing::error!("allocate failed: {:?}", e);
|
|
let (code, reason) = match e.downcast_ref::<AllocationError>() {
|
|
Some(AllocationError::AllocationQuotaExceeded) => {
|
|
(486, "Allocation Quota Reached")
|
|
}
|
|
_ => (500, "Allocate Failed"),
|
|
};
|
|
crate::metrics::inc_allocate_fail();
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
code,
|
|
reason,
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
METHOD_CREATE_PERMISSION => {
|
|
if allocs.get_allocation(&peer).is_none() {
|
|
warn!("create-permission without allocation from {}", peer);
|
|
let resp =
|
|
build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
437,
|
|
"Allocation Mismatch",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
|
|
let mut added = 0usize;
|
|
for attr in msg
|
|
.attributes
|
|
.iter()
|
|
.filter(|a| a.typ == ATTR_XOR_PEER_ADDRESS)
|
|
{
|
|
if let Some(peer_addr) =
|
|
decode_xor_peer_address(&attr.value, &msg.header.transaction_id)
|
|
{
|
|
match allocs.add_permission(peer, peer_addr) {
|
|
Ok(()) => {
|
|
tracing::info!(
|
|
"added permission for {} -> {}",
|
|
peer,
|
|
peer_addr
|
|
);
|
|
crate::metrics::inc_permission_added();
|
|
added += 1;
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"failed to persist permission {} -> {}: {:?}",
|
|
peer,
|
|
peer_addr,
|
|
e
|
|
);
|
|
if matches!(
|
|
e.downcast_ref::<AllocationError>(),
|
|
Some(AllocationError::PermissionQuotaExceeded)
|
|
) {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
508,
|
|
"Insufficient Capacity",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
tracing::warn!("invalid XOR-PEER-ADDRESS in request from {}", peer);
|
|
}
|
|
}
|
|
|
|
if added == 0 {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
400,
|
|
"No valid XOR-PEER-ADDRESS",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
} else {
|
|
let resp =
|
|
build_success_response_with_integrity_mode(&msg.header, &key, mi_mode);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
}
|
|
continue;
|
|
}
|
|
METHOD_CHANNEL_BIND => {
|
|
let allocation = match allocs.get_allocation(&peer) {
|
|
Some(a) => a,
|
|
None => {
|
|
warn!("channel-bind without allocation from {}", peer);
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
437,
|
|
"Allocation Mismatch",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let channel_attr =
|
|
msg.attributes.iter().find(|a| a.typ == ATTR_CHANNEL_NUMBER);
|
|
let peer_attr = msg
|
|
.attributes
|
|
.iter()
|
|
.find(|a| a.typ == ATTR_XOR_PEER_ADDRESS);
|
|
let (channel_attr, peer_attr) = match (channel_attr, peer_attr) {
|
|
(Some(c), Some(p)) => (c, p),
|
|
_ => {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
400,
|
|
"Missing CHANNEL-NUMBER or XOR-PEER-ADDRESS",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let channel =
|
|
u16::from_be_bytes([channel_attr.value[0], channel_attr.value[1]]);
|
|
let peer_addr = match decode_xor_peer_address(
|
|
&peer_attr.value,
|
|
&msg.header.transaction_id,
|
|
) {
|
|
Some(addr) => addr,
|
|
None => {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
400,
|
|
"Invalid XOR-PEER-ADDRESS",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
if !allocation.is_peer_allowed(&peer_addr) {
|
|
match allocs.add_permission(peer, peer_addr) {
|
|
Ok(()) => {
|
|
tracing::info!(
|
|
"added implicit permission for {} -> {} (via CHANNEL-BIND)",
|
|
peer,
|
|
peer_addr
|
|
);
|
|
crate::metrics::inc_permission_added();
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"failed to add implicit permission {} -> {}: {:?}",
|
|
peer,
|
|
peer_addr,
|
|
e
|
|
);
|
|
let (code, reason) = match e.downcast_ref::<AllocationError>() {
|
|
Some(AllocationError::PermissionQuotaExceeded) => {
|
|
(508, "Insufficient Capacity")
|
|
}
|
|
_ => (403, "Peer Not Permitted"),
|
|
};
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
code,
|
|
reason,
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
if let Err(e) = allocs.add_channel_binding(peer, channel, peer_addr) {
|
|
tracing::error!(
|
|
"failed to persist channel binding {} -> {} (0x{:04x}): {:?}",
|
|
peer,
|
|
peer_addr,
|
|
channel,
|
|
e
|
|
);
|
|
let (code, reason) = match e.downcast_ref::<AllocationError>() {
|
|
Some(AllocationError::ChannelQuotaExceeded) => {
|
|
(508, "Insufficient Capacity")
|
|
}
|
|
_ => (500, "Channel Bind Failed"),
|
|
};
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
code,
|
|
reason,
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
|
|
crate::metrics::inc_channel_binding_added();
|
|
let resp = build_success_response_with_integrity_mode(&msg.header, &key, mi_mode);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
METHOD_SEND => {
|
|
let allocation = match allocs.get_allocation(&peer) {
|
|
Some(a) => a,
|
|
None => {
|
|
warn!("send indication without allocation from {}", peer);
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
437,
|
|
"Allocation Mismatch",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let peer_attr = msg
|
|
.attributes
|
|
.iter()
|
|
.find(|a| a.typ == ATTR_XOR_PEER_ADDRESS);
|
|
let data_attr = msg.attributes.iter().find(|a| a.typ == ATTR_DATA);
|
|
let (peer_attr, data_attr) = match (peer_attr, data_attr) {
|
|
(Some(p), Some(d)) => (p, d),
|
|
_ => {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
400,
|
|
"Missing DATA or XOR-PEER-ADDRESS",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
let peer_addr = match decode_xor_peer_address(
|
|
&peer_attr.value,
|
|
&msg.header.transaction_id,
|
|
) {
|
|
Some(addr) => addr,
|
|
None => {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
400,
|
|
"Invalid XOR-PEER-ADDRESS",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
};
|
|
|
|
if !allocation.is_peer_allowed(&peer_addr) {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
403,
|
|
"Peer Not Permitted",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
|
|
match allocation.send_to_peer(peer_addr, &data_attr.value).await {
|
|
Ok(sent) => {
|
|
tracing::info!(
|
|
"forwarded {} bytes from {} to {}",
|
|
sent,
|
|
peer,
|
|
peer_addr
|
|
);
|
|
let resp =
|
|
build_success_response_with_integrity_mode(&msg.header, &key, mi_mode);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(
|
|
"failed to send payload from {} to {}: {:?}",
|
|
peer,
|
|
peer_addr,
|
|
e
|
|
);
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
500,
|
|
"Peer Send Failed",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
METHOD_REFRESH => {
|
|
let requested = extract_lifetime_seconds(&msg)
|
|
.map(|secs| Duration::from_secs(secs as u64));
|
|
|
|
match allocs.refresh_allocation(peer, requested) {
|
|
Ok(applied) => {
|
|
if applied.is_zero() {
|
|
tracing::info!("allocation for {} released", peer);
|
|
} else {
|
|
tracing::debug!(
|
|
"allocation for {} refreshed to {}s",
|
|
peer,
|
|
applied.as_secs()
|
|
);
|
|
}
|
|
let resp = build_lifetime_success_with_integrity_mode(
|
|
&msg.header,
|
|
applied.as_secs().min(u32::MAX as u64) as u32,
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
}
|
|
Err(_) => {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
437,
|
|
"Allocation Mismatch",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
}
|
|
}
|
|
continue;
|
|
}
|
|
_ => {
|
|
let resp = build_error_response_with_integrity_mode(
|
|
&msg.header,
|
|
420,
|
|
"Unknown TURN Method",
|
|
&key,
|
|
mi_mode,
|
|
);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
continue;
|
|
}
|
|
}
|
|
}
|
|
|
|
match msg.header.msg_type {
|
|
METHOD_BINDING => {
|
|
if rate_limiters.allow_binding(peer.ip()) {
|
|
let resp = crate::stun::build_binding_success(&msg.header, &peer);
|
|
let _ = udp.send_to(&resp, &peer).await;
|
|
} else {
|
|
crate::metrics::inc_rate_limited();
|
|
}
|
|
}
|
|
_ => {
|
|
if !rate_limiters.allow_unauth(peer.ip()) {
|
|
crate::metrics::inc_rate_limited();
|
|
continue;
|
|
}
|
|
let nonce = auth.mint_nonce(&peer);
|
|
let resp =
|
|
build_401_response(&msg.header, auth.realm(), &nonce, 401, "Unauthorized");
|
|
if let Err(e) = udp.send_to(&resp, &peer).await {
|
|
error!("failed to send 401: {:?}", e);
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
tracing::debug!("Non-STUN or parse error from {} len={}", peer, len);
|
|
}
|
|
}
|
|
}
|