use std::net::SocketAddr; use niom_turn::alloc::AllocationManager; use niom_turn::auth::{compute_a1_md5, InMemoryStore}; use tokio::io::AsyncWriteExt; use tokio::net::{TcpListener, TcpStream, UdpSocket}; use tokio::time::{timeout, Duration}; use crate::support::stream::{StreamFrame, StreamFramer}; use crate::support::stun_builders::{ build_allocate_request, build_channel_bind_request, build_create_permission_request, build_send_request, }; use crate::support::{default_test_credentials, init_tracing_with, test_auth_manager}; mod support; async fn start_tcp_test_server( auth: niom_turn::auth::AuthManager, allocs: AllocationManager, ) -> SocketAddr { let tcp_listener = TcpListener::bind("127.0.0.1:0").await.expect("tcp bind"); let tcp_addr = tcp_listener.local_addr().expect("tcp addr"); tokio::spawn(async move { loop { let (stream, peer) = match tcp_listener.accept().await { Ok(conn) => conn, Err(_) => break, }; let auth_clone = auth.clone(); let alloc_clone = allocs.clone(); tokio::spawn(async move { if let Err(e) = niom_turn::turn_stream::handle_turn_stream_connection( stream, peer, auth_clone, alloc_clone, ) .await { tracing::info!("tcp connection ended: {:?}", e); } }); } }); tcp_addr } #[tokio::test] async fn tcp_stream_resyncs_after_garbage_and_still_processes_allocate() { init_tracing_with("warn,niom_turn=info"); let (username, password) = default_test_credentials(); let auth = test_auth_manager(username, password); let allocs = AllocationManager::new(); let server_addr = start_tcp_test_server(auth.clone(), allocs.clone()).await; let mut stream = TcpStream::connect(server_addr).await.expect("tcp connect"); // Send garbage bytes that look like a STUN header with a huge length but invalid cookie. // Without resync, the server could wait for 20+65535 bytes and stall parsing. let garbage = [0x00, 0x01, 0xFF, 0xFF, 0x00, 0x00, 0x00, 0x00]; stream.write_all(&garbage).await.expect("write garbage"); // Now send a valid Allocate request; server should still respond with a 401 challenge. let allocate = build_allocate_request(None, None, None, None, None); stream.write_all(&allocate).await.expect("write allocate"); let mut framer = StreamFramer::new(); let frame = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout response") .expect("read response"); match frame { StreamFrame::Stun(msg) => { assert_eq!(msg.header.msg_type & 0x0110, niom_turn::constants::CLASS_ERROR); msg.attributes .iter() .find(|a| a.typ == niom_turn::constants::ATTR_NONCE) .expect("nonce attr"); } other => panic!("expected STUN challenge, got: {:?}", other), } } #[tokio::test] async fn tcp_peer_data_is_delivered_over_stream_as_data_indication() { init_tracing_with("warn,niom_turn=info"); let (username, password) = default_test_credentials(); let auth = test_auth_manager(username, password); let allocs = AllocationManager::new(); let server_addr = start_tcp_test_server(auth.clone(), allocs.clone()).await; let mut stream = TcpStream::connect(server_addr).await.expect("tcp connect"); let client_addr = stream.local_addr().expect("client addr"); // 1) Allocate without auth -> 401 + NONCE let allocate = build_allocate_request(None, None, None, None, None); stream.write_all(&allocate).await.expect("write allocate"); let mut framer = StreamFramer::new(); let challenge = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout challenge") .expect("read challenge"); let nonce = match challenge { StreamFrame::Stun(msg) => { assert_eq!(msg.header.msg_type & 0x0110, niom_turn::constants::CLASS_ERROR); let nonce_attr = msg .attributes .iter() .find(|a| a.typ == niom_turn::constants::ATTR_NONCE) .expect("nonce attr"); String::from_utf8(nonce_attr.value.clone()).expect("nonce utf8") } _ => panic!("expected STUN 401 challenge"), }; // 2) Authenticated allocate let key = compute_a1_md5(username, auth.realm(), password); let allocate = build_allocate_request( Some(username), Some(auth.realm()), Some(&nonce), Some(&key), Some(600), ); stream.write_all(&allocate).await.expect("write auth allocate"); let alloc_success = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout alloc success") .expect("read alloc success"); match alloc_success { StreamFrame::Stun(msg) => { assert_eq!(msg.header.msg_type & 0x0110, niom_turn::constants::CLASS_SUCCESS); msg.attributes .iter() .find(|a| a.typ == niom_turn::constants::ATTR_XOR_RELAYED_ADDRESS) .expect("xor-relayed attr"); } _ => panic!("expected STUN allocate success"), } let relay_addr = allocs .get_allocation(&client_addr) .expect("allocation exists") .relay_addr; // 3) CreatePermission let peer_sock = UdpSocket::bind("127.0.0.1:0").await.expect("peer bind"); let perm = build_create_permission_request( username, auth.realm(), &nonce, &key, &peer_sock.local_addr().unwrap(), ); stream.write_all(&perm).await.expect("write create permission"); let perm_resp = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout perm resp") .expect("read perm resp"); match perm_resp { StreamFrame::Stun(msg) => assert_eq!(msg.header.msg_type & 0x0110, niom_turn::constants::CLASS_SUCCESS), _ => panic!("expected STUN permission success"), } // 4) Send indication -> peer should receive UDP payload let payload = b"hello-turn-tcp"; let send = build_send_request( username, auth.realm(), &nonce, &key, &peer_sock.local_addr().unwrap(), payload, ); stream.write_all(&send).await.expect("write send"); // This implementation responds to SEND with a success response. let send_resp = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout send resp") .expect("read send resp"); match send_resp { StreamFrame::Stun(msg) => { assert_eq!( msg.header.msg_type, niom_turn::constants::METHOD_SEND | niom_turn::constants::CLASS_SUCCESS ); } _ => panic!("expected STUN send success"), } let mut peer_buf = [0u8; 1500]; let (n, from) = timeout(Duration::from_secs(2), peer_sock.recv_from(&mut peer_buf)) .await .expect("timeout peer recv") .expect("peer recv"); assert_eq!(&peer_buf[..n], payload); assert!(from.ip().is_loopback()); // 5) Peer -> relay -> client: should come back over TCP as Data Indication let back = b"peer-reply"; peer_sock .send_to(back, relay_addr) .await .expect("peer send back"); let frame = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout data indication") .expect("read data indication"); match frame { StreamFrame::Stun(msg) => { assert_eq!(msg.header.msg_type, niom_turn::constants::METHOD_DATA | niom_turn::constants::CLASS_INDICATION); let data_attr = msg .attributes .iter() .find(|a| a.typ == niom_turn::constants::ATTR_DATA) .expect("data attr"); assert_eq!(data_attr.value.as_slice(), back); } _ => panic!("expected STUN data indication"), } // sanity: allocation exists by client addr assert!(allocs.get_allocation(&client_addr).is_some()); } #[tokio::test] async fn tcp_channel_data_round_trip_works() { init_tracing_with("warn,niom_turn=info"); let (username, password) = default_test_credentials(); let auth = test_auth_manager(username, password); let allocs = AllocationManager::new(); let server_addr = start_tcp_test_server(auth.clone(), allocs.clone()).await; let mut stream = TcpStream::connect(server_addr).await.expect("tcp connect"); let allocate = build_allocate_request(None, None, None, None, None); stream.write_all(&allocate).await.expect("write allocate"); let mut framer = StreamFramer::new(); let challenge = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout challenge") .expect("read challenge"); let nonce = match challenge { StreamFrame::Stun(msg) => { let nonce_attr = msg .attributes .iter() .find(|a| a.typ == niom_turn::constants::ATTR_NONCE) .expect("nonce attr"); String::from_utf8(nonce_attr.value.clone()).expect("nonce utf8") } _ => panic!("expected STUN 401 challenge"), }; let key = compute_a1_md5(username, auth.realm(), password); let allocate = build_allocate_request( Some(username), Some(auth.realm()), Some(&nonce), Some(&key), Some(600), ); stream.write_all(&allocate).await.expect("write auth allocate"); let alloc_success = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout alloc success") .expect("read alloc success"); match alloc_success { StreamFrame::Stun(msg) => { msg.attributes .iter() .find(|a| a.typ == niom_turn::constants::ATTR_XOR_RELAYED_ADDRESS) .expect("xor-relayed attr"); } _ => panic!("expected STUN allocate success"), } let client_addr = stream.local_addr().expect("client addr"); let relay_addr = allocs .get_allocation(&client_addr) .expect("allocation exists") .relay_addr; let peer_sock = UdpSocket::bind("127.0.0.1:0").await.expect("peer bind"); // Permission let perm = build_create_permission_request( username, auth.realm(), &nonce, &key, &peer_sock.local_addr().unwrap(), ); stream.write_all(&perm).await.expect("write create permission"); let _ = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout perm resp") .expect("read perm resp"); // ChannelBind let channel: u16 = 0x4000; let bind = build_channel_bind_request( username, auth.realm(), &nonce, &key, channel, &peer_sock.local_addr().unwrap(), ); stream.write_all(&bind).await.expect("write channel bind"); let bind_resp = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout bind resp") .expect("read bind resp"); match bind_resp { StreamFrame::Stun(msg) => assert_eq!(msg.header.msg_type & 0x0110, niom_turn::constants::CLASS_SUCCESS), _ => panic!("expected STUN channel bind success"), } // Client -> Server -> Peer via ChannelData let payload = b"chan-hello"; let ch = niom_turn::stun::build_channel_data(channel, payload); stream.write_all(&ch).await.expect("write channel data"); let mut peer_buf = [0u8; 1500]; let (n, from) = timeout(Duration::from_secs(2), peer_sock.recv_from(&mut peer_buf)) .await .expect("timeout peer recv") .expect("peer recv"); assert_eq!(&peer_buf[..n], payload); assert!(from.ip().is_loopback()); // Peer -> Relay -> Client as ChannelData let back = b"chan-back"; peer_sock.send_to(back, relay_addr).await.expect("peer send back"); let frame = timeout(Duration::from_secs(2), framer.read_frame(&mut stream)) .await .expect("timeout channel back") .expect("read channel back"); match frame { StreamFrame::ChannelData { channel: chn, payload } => { assert_eq!(chn, channel); assert_eq!(payload.as_slice(), back); } _ => panic!("expected ChannelData frame"), } }