154 lines
5.1 KiB
Rust
154 lines
5.1 KiB
Rust
use std::net::SocketAddr;
|
|
use std::sync::Arc;
|
|
|
|
use niom_turn::alloc::AllocationManager;
|
|
use niom_turn::auth::InMemoryStore;
|
|
use niom_turn::config::LimitsOptions;
|
|
use niom_turn::rate_limit::RateLimiters;
|
|
use tokio::io::AsyncWriteExt;
|
|
use tokio::net::{TcpListener, TcpStream};
|
|
use tokio::time::{timeout, Duration};
|
|
|
|
use crate::support::stream::{StreamFrame, StreamFramer};
|
|
use crate::support::stun_builders::{build_allocate_request, build_binding_request};
|
|
use crate::support::{default_test_credentials, init_tracing_with, test_auth_manager};
|
|
|
|
mod support;
|
|
|
|
async fn start_tcp_test_server(
|
|
auth: niom_turn::auth::AuthManager<InMemoryStore>,
|
|
allocs: AllocationManager,
|
|
rate_limiters: Arc<RateLimiters>,
|
|
) -> SocketAddr {
|
|
let tcp_listener = TcpListener::bind("127.0.0.1:0").await.expect("tcp bind");
|
|
let tcp_addr = tcp_listener.local_addr().expect("tcp addr");
|
|
|
|
tokio::spawn(async move {
|
|
loop {
|
|
let (stream, peer) = match tcp_listener.accept().await {
|
|
Ok(conn) => conn,
|
|
Err(_) => break,
|
|
};
|
|
let auth_clone = auth.clone();
|
|
let alloc_clone = allocs.clone();
|
|
let rl = rate_limiters.clone();
|
|
tokio::spawn(async move {
|
|
let _ = niom_turn::turn_stream::handle_turn_stream_connection_with_limits(
|
|
stream, peer, auth_clone, alloc_clone, rl,
|
|
)
|
|
.await;
|
|
});
|
|
}
|
|
});
|
|
|
|
tcp_addr
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn tcp_binding_is_rate_limited_by_ip() {
|
|
init_tracing_with("warn,niom_turn=info");
|
|
|
|
// Configure a very small burst to make the test deterministic.
|
|
let mut limits = LimitsOptions::default();
|
|
limits.binding_rps = Some(1);
|
|
limits.binding_burst = Some(1);
|
|
let rate_limiters = Arc::new(RateLimiters::from_limits(&limits));
|
|
|
|
let (username, password) = default_test_credentials();
|
|
let auth = test_auth_manager(username, password);
|
|
let allocs = AllocationManager::new();
|
|
let server_addr = start_tcp_test_server(auth.clone(), allocs.clone(), rate_limiters.clone()).await;
|
|
|
|
let mut stream = TcpStream::connect(server_addr).await.expect("tcp connect");
|
|
|
|
// Fire multiple Binding requests quickly; with burst=1 we should only get 1 success response.
|
|
for _ in 0..3 {
|
|
let req = build_binding_request();
|
|
stream.write_all(&req).await.expect("write binding");
|
|
}
|
|
|
|
let mut framer = StreamFramer::new();
|
|
let mut responses = 0usize;
|
|
|
|
// Read for a short bounded period.
|
|
let deadline = tokio::time::Instant::now() + Duration::from_millis(150);
|
|
loop {
|
|
let now = tokio::time::Instant::now();
|
|
if now >= deadline {
|
|
break;
|
|
}
|
|
let remaining = deadline - now;
|
|
|
|
let frame = match timeout(remaining, framer.read_frame(&mut stream)).await {
|
|
Ok(Ok(f)) => f,
|
|
Ok(Err(e)) => panic!("read_frame error: {e:?}"),
|
|
Err(_) => break,
|
|
};
|
|
|
|
match frame {
|
|
StreamFrame::Stun(msg) => {
|
|
assert_eq!(msg.header.msg_type & 0x0110, 0x0100);
|
|
responses += 1;
|
|
}
|
|
other => panic!("expected STUN response, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
assert_eq!(responses, 1, "expected exactly 1 Binding response under burst=1");
|
|
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn tcp_unauth_challenge_is_rate_limited_by_ip() {
|
|
init_tracing_with("warn,niom_turn=info");
|
|
|
|
let mut limits = LimitsOptions::default();
|
|
limits.unauth_rps = Some(1);
|
|
limits.unauth_burst = Some(1);
|
|
let rate_limiters = Arc::new(RateLimiters::from_limits(&limits));
|
|
|
|
let (username, password) = default_test_credentials();
|
|
let auth = test_auth_manager(username, password);
|
|
let allocs = AllocationManager::new();
|
|
let server_addr = start_tcp_test_server(auth.clone(), allocs.clone(), rate_limiters.clone()).await;
|
|
|
|
let mut stream = TcpStream::connect(server_addr).await.expect("tcp connect");
|
|
|
|
for _ in 0..3 {
|
|
let req = build_allocate_request(None, None, None, None, None);
|
|
stream.write_all(&req).await.expect("write allocate");
|
|
}
|
|
|
|
let mut framer = StreamFramer::new();
|
|
let mut responses = 0usize;
|
|
|
|
let deadline = tokio::time::Instant::now() + Duration::from_millis(150);
|
|
loop {
|
|
let now = tokio::time::Instant::now();
|
|
if now >= deadline {
|
|
break;
|
|
}
|
|
let remaining = deadline - now;
|
|
|
|
let frame = match timeout(remaining, framer.read_frame(&mut stream)).await {
|
|
Ok(Ok(f)) => f,
|
|
Ok(Err(e)) => panic!("read_frame error: {e:?}"),
|
|
Err(_) => break,
|
|
};
|
|
|
|
match frame {
|
|
StreamFrame::Stun(msg) => {
|
|
assert_eq!(msg.header.msg_type & 0x0110, niom_turn::constants::CLASS_ERROR);
|
|
msg.attributes
|
|
.iter()
|
|
.find(|a| a.typ == niom_turn::constants::ATTR_NONCE)
|
|
.expect("nonce attr");
|
|
responses += 1;
|
|
}
|
|
other => panic!("expected STUN response, got: {other:?}"),
|
|
}
|
|
}
|
|
|
|
assert_eq!(responses, 1, "expected exactly 1 unauth challenge under burst=1");
|
|
}
|