use std::net::SocketAddr; use std::sync::Arc; use niom_turn::alloc::AllocationManager; use niom_turn::auth::InMemoryStore; use niom_turn::config::LimitsOptions; use niom_turn::rate_limit::RateLimiters; use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream}; use tokio::time::{timeout, Duration}; use crate::support::stream::{StreamFrame, StreamFramer}; use crate::support::stun_builders::{build_allocate_request, build_binding_request}; use crate::support::{default_test_credentials, init_tracing_with, test_auth_manager}; mod support; async fn start_tcp_test_server( auth: niom_turn::auth::AuthManager, allocs: AllocationManager, rate_limiters: Arc, ) -> SocketAddr { let tcp_listener = TcpListener::bind("127.0.0.1:0").await.expect("tcp bind"); let tcp_addr = tcp_listener.local_addr().expect("tcp addr"); tokio::spawn(async move { loop { let (stream, peer) = match tcp_listener.accept().await { Ok(conn) => conn, Err(_) => break, }; let auth_clone = auth.clone(); let alloc_clone = allocs.clone(); let rl = rate_limiters.clone(); tokio::spawn(async move { let _ = niom_turn::turn_stream::handle_turn_stream_connection_with_limits( stream, peer, auth_clone, alloc_clone, rl, ) .await; }); } }); tcp_addr } #[tokio::test] async fn tcp_binding_is_rate_limited_by_ip() { init_tracing_with("warn,niom_turn=info"); // Configure a very small burst to make the test deterministic. let mut limits = LimitsOptions::default(); limits.binding_rps = Some(1); limits.binding_burst = Some(1); let rate_limiters = Arc::new(RateLimiters::from_limits(&limits)); let (username, password) = default_test_credentials(); let auth = test_auth_manager(username, password); let allocs = AllocationManager::new(); let server_addr = start_tcp_test_server(auth.clone(), allocs.clone(), rate_limiters.clone()).await; let mut stream = TcpStream::connect(server_addr).await.expect("tcp connect"); // Fire multiple Binding requests quickly; with burst=1 we should only get 1 success response. for _ in 0..3 { let req = build_binding_request(); stream.write_all(&req).await.expect("write binding"); } let mut framer = StreamFramer::new(); let mut responses = 0usize; // Read for a short bounded period. let deadline = tokio::time::Instant::now() + Duration::from_millis(150); loop { let now = tokio::time::Instant::now(); if now >= deadline { break; } let remaining = deadline - now; let frame = match timeout(remaining, framer.read_frame(&mut stream)).await { Ok(Ok(f)) => f, Ok(Err(e)) => panic!("read_frame error: {e:?}"), Err(_) => break, }; match frame { StreamFrame::Stun(msg) => { assert_eq!(msg.header.msg_type & 0x0110, 0x0100); responses += 1; } other => panic!("expected STUN response, got: {other:?}"), } } assert_eq!(responses, 1, "expected exactly 1 Binding response under burst=1"); } #[tokio::test] async fn tcp_unauth_challenge_is_rate_limited_by_ip() { init_tracing_with("warn,niom_turn=info"); let mut limits = LimitsOptions::default(); limits.unauth_rps = Some(1); limits.unauth_burst = Some(1); let rate_limiters = Arc::new(RateLimiters::from_limits(&limits)); let (username, password) = default_test_credentials(); let auth = test_auth_manager(username, password); let allocs = AllocationManager::new(); let server_addr = start_tcp_test_server(auth.clone(), allocs.clone(), rate_limiters.clone()).await; let mut stream = TcpStream::connect(server_addr).await.expect("tcp connect"); for _ in 0..3 { let req = build_allocate_request(None, None, None, None, None); stream.write_all(&req).await.expect("write allocate"); } let mut framer = StreamFramer::new(); let mut responses = 0usize; let deadline = tokio::time::Instant::now() + Duration::from_millis(150); loop { let now = tokio::time::Instant::now(); if now >= deadline { break; } let remaining = deadline - now; let frame = match timeout(remaining, framer.read_frame(&mut stream)).await { Ok(Ok(f)) => f, Ok(Err(e)) => panic!("read_frame error: {e:?}"), Err(_) => break, }; match frame { StreamFrame::Stun(msg) => { assert_eq!(msg.header.msg_type & 0x0110, niom_turn::constants::CLASS_ERROR); msg.attributes .iter() .find(|a| a.typ == niom_turn::constants::ATTR_NONCE) .expect("nonce attr"); responses += 1; } other => panic!("expected STUN response, got: {other:?}"), } } assert_eq!(responses, 1, "expected exactly 1 unauth challenge under burst=1"); }