From 2bdf4789bdde5ae8d1e9291576f03ffce28ec0ac Mon Sep 17 00:00:00 2001 From: ghost Date: Tue, 4 Nov 2025 17:37:30 +0100 Subject: [PATCH] test(signaling): cover offer/answer routing --- src/models/signaling_message.rs | 2 +- src/services/signaling.rs | 243 +++++++++++++---------- src/services/signaling/message_router.rs | 145 ++++++++++++++ 3 files changed, 280 insertions(+), 110 deletions(-) create mode 100644 src/services/signaling/message_router.rs diff --git a/src/models/signaling_message.rs b/src/models/signaling_message.rs index 24c7a6b..f201e31 100644 --- a/src/models/signaling_message.rs +++ b/src/models/signaling_message.rs @@ -1,6 +1,6 @@ use serde::{Deserialize, Serialize}; -#[derive(Debug, Serialize, Deserialize, Clone)] +#[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)] pub struct SignalingMessage { pub from: String, pub to: String, diff --git a/src/services/signaling.rs b/src/services/signaling.rs index 2530de7..71325db 100644 --- a/src/services/signaling.rs +++ b/src/services/signaling.rs @@ -1,3 +1,4 @@ +mod message_router; mod reconnect; use std::{cell::RefCell, rc::Rc}; @@ -8,6 +9,7 @@ use crate::{ use dioxus::prelude::*; use futures::StreamExt; use gloo_timers::future::TimeoutFuture; +use message_router::{Directive as MessageDirective, MessageRouter, RouterState}; use reconnect::{DisconnectReason, ReconnectController}; use wasm_bindgen::{prelude::Closure, JsCast, JsValue}; use wasm_bindgen_futures::spawn_local; @@ -322,11 +324,10 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { } let attempt_status = { - let current = reconnect_ctrl_handle.read().clone(); - let mut updated = current; - let status = updated.begin_attempt(); + let mut ctrl = reconnect_ctrl_handle.read().clone(); + let status = ctrl.begin_attempt(); let mut writer = reconnect_ctrl_handle.clone(); - writer.set(updated); + writer.set(ctrl); status }; ws_status_handle.set(attempt_status.status_message.clone()); @@ -340,11 +341,10 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { let connect_slot_ref = connect_slot_ref.clone(); Rc::new(move |reason| { let retry_plan = { - let current = reconnect_ctrl_handle.read().clone(); - let mut updated = current; - let plan = updated.schedule_retry(reason); + let mut ctrl = reconnect_ctrl_handle.read().clone(); + let plan = ctrl.schedule_retry(reason); let mut writer = reconnect_ctrl_handle.clone(); - writer.set(updated); + writer.set(ctrl); plan }; let mut status_writer = ws_status_signal.clone(); @@ -379,11 +379,10 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { let onopen = Closure::wrap(Box::new(move |_: web_sys::Event| { log::info!("✅ WebSocket verbunden!"); let status = { - let current = reconnect_for_open.read().clone(); - let mut updated = current; - let status = updated.handle_success(); + let mut ctrl = reconnect_for_open.read().clone(); + let status = ctrl.handle_success(); let mut writer = reconnect_for_open.clone(); - writer.set(updated); + writer.set(ctrl); status }; ws_status_clone.set(status); @@ -441,112 +440,138 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { log::info!("📨 WebSocket Nachricht: {}", text); match serde_json::from_str::(&text) { - Ok(msg) => match msg.msg_type.as_str() { - "offer" => offer_tx.send(msg), - "answer" => { - let data_clone = msg.data.clone(); - if let Some(pc) = initiator_for_router.read().as_ref() { - let pc_clone = pc.clone(); - let err_signal = error_for_message.clone(); - spawn_local(async move { - match MediaManager::handle_answer( - &pc_clone, - &data_clone, - ) - .await - { - Ok(_) => log::info!( - "✅ Answer erfolgreich gesetzt auf Initiator-PC" - ), - Err(e) => { - log::error!( - "❌ Fehler beim Setzen der Answer auf Initiator-PC: {}", - e - ); - let mut handle = err_signal.clone(); - handle.set(Some(format!( - "Failed to apply remote answer: {}", - e - ))); - } - } - }); - } else { - log::warn!( - "⚠️ Keine Initiator-PeerConnection vorhanden - buffer Answer" - ); - let mut pending_handle = pending_answer_signal.clone(); - pending_handle.set(Some(data_clone)); + Ok(msg) => { + let state = RouterState { + initiator_present: initiator_for_router.read().is_some(), + responder_present: responder_for_router.read().is_some(), + }; + match MessageRouter::default().route(msg, state) { + MessageDirective::ForwardOffer(message) => { + offer_tx.send(message) } - } - "candidate" => { - let data_clone = msg.data.clone(); - if let Some(pc) = initiator_for_router.read().as_ref() { - let pc_clone = pc.clone(); - let err_signal = error_for_message.clone(); - spawn_local(async move { - match MediaManager::add_ice_candidate( - &pc_clone, - &data_clone, - ) { - Ok(_) => log::info!( - "✅ Kandidat zur Initiator-PC hinzugefügt" - ), - Err(e) => { - log::error!( - "❌ Kandidat konnte nicht hinzugefügt werden: {}", - e - ); - let mut handle = err_signal.clone(); - handle.set(Some(format!( - "Failed to add ICE candidate (initiator): {}", - e - ))); + MessageDirective::ApplyAnswer(data) => { + if let Some(pc) = initiator_for_router.read().as_ref() { + let pc_clone = pc.clone(); + let err_signal = error_for_message.clone(); + let answer_data = data.clone(); + spawn_local(async move { + match MediaManager::handle_answer( + &pc_clone, + &answer_data, + ) + .await + { + Ok(_) => log::info!( + "✅ Answer erfolgreich gesetzt auf Initiator-PC" + ), + Err(e) => { + log::error!( + "❌ Fehler beim Setzen der Answer auf Initiator-PC: {}", + e + ); + let mut handle = err_signal.clone(); + handle.set(Some(format!( + "Failed to apply remote answer: {}", + e + ))); + } } - } - }); - } else if let Some(pc) = - responder_for_router.read().as_ref() - { - let pc_clone = pc.clone(); - let err_signal = error_for_message.clone(); - spawn_local(async move { - match MediaManager::add_ice_candidate( - &pc_clone, - &data_clone, - ) { - Ok(_) => log::info!( - "✅ Kandidat zur Responder-PC hinzugefügt" - ), - Err(e) => { - log::error!( - "❌ Kandidat konnte nicht hinzugefügt werden: {}", - e - ); - let mut handle = err_signal.clone(); - handle.set(Some(format!( - "Failed to add ICE candidate (responder): {}", - e - ))); + }); + } else { + log::warn!( + "⚠️ Keine Initiator-PeerConnection vorhanden - buffer Answer" + ); + let mut pending_handle = + pending_answer_signal.clone(); + pending_handle.set(Some(data)); + } + } + MessageDirective::BufferAnswer(data) => { + let mut pending_handle = pending_answer_signal.clone(); + pending_handle.set(Some(data)); + } + MessageDirective::AddCandidateInitiator(candidate) => { + if let Some(pc) = initiator_for_router.read().as_ref() { + let pc_clone = pc.clone(); + let err_signal = error_for_message.clone(); + let candidate_payload = candidate.clone(); + spawn_local(async move { + match MediaManager::add_ice_candidate( + &pc_clone, + &candidate_payload, + ) { + Ok(_) => log::info!( + "✅ Kandidat zur Initiator-PC hinzugefügt" + ), + Err(e) => { + log::error!( + "❌ Kandidat konnte nicht hinzugefügt werden: {}", + e + ); + let mut handle = err_signal.clone(); + handle.set(Some(format!( + "Failed to add ICE candidate (initiator): {}", + e + ))); + } } - } - }); - } else { + }); + } else { + log::warn!( + "⚠️ Initiator-PC verschwunden, Kandidat verworfen" + ); + } + } + MessageDirective::AddCandidateResponder(candidate) => { + if let Some(pc) = responder_for_router.read().as_ref() { + let pc_clone = pc.clone(); + let err_signal = error_for_message.clone(); + let candidate_payload = candidate.clone(); + spawn_local(async move { + match MediaManager::add_ice_candidate( + &pc_clone, + &candidate_payload, + ) { + Ok(_) => log::info!( + "✅ Kandidat zur Responder-PC hinzugefügt" + ), + Err(e) => { + log::error!( + "❌ Kandidat konnte nicht hinzugefügt werden: {}", + e + ); + let mut handle = err_signal.clone(); + handle.set(Some(format!( + "Failed to add ICE candidate (responder): {}", + e + ))); + } + } + }); + } else { + log::warn!( + "⚠️ Responder-PC fehlt, Kandidat verworfen" + ); + } + } + MessageDirective::IgnoreCandidate => { log::warn!( "⚠️ Kein PeerConnection verfügbar, um Kandidaten hinzuzufügen" ); } - } - "text" => { - if let Some(window) = web_sys::window() { - let _ = window.alert_with_message(&format!( - "Nachricht von {}:\n{}", - msg.from, msg.data - )); + MessageDirective::ShowText { from, body } => { + if let Some(window) = web_sys::window() { + let _ = window.alert_with_message(&format!( + "Nachricht von {}:\n{}", + from, body + )); + } + } + MessageDirective::Unknown(kind) => { + log::info!("❓ Unbekannte Nachricht: {}", kind); } } - _ => log::info!("❓ Unbekannte Nachricht: {}", msg.msg_type), - }, + } Err(e) => { log::error!( "❌ Signaling Nachricht konnte nicht gelesen werden: {}", diff --git a/src/services/signaling/message_router.rs b/src/services/signaling/message_router.rs new file mode 100644 index 0000000..a25d650 --- /dev/null +++ b/src/services/signaling/message_router.rs @@ -0,0 +1,145 @@ +use crate::models::SignalingMessage; + +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub struct RouterState { + pub initiator_present: bool, + pub responder_present: bool, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum Directive { + ForwardOffer(SignalingMessage), + ApplyAnswer(String), + BufferAnswer(String), + AddCandidateInitiator(String), + AddCandidateResponder(String), + IgnoreCandidate, + ShowText { from: String, body: String }, + Unknown(String), +} + +#[derive(Default)] +pub struct MessageRouter; + +impl MessageRouter { + pub fn route(&self, message: SignalingMessage, state: RouterState) -> Directive { + match message.msg_type.as_str() { + "offer" => Directive::ForwardOffer(message), + "answer" => { + if state.initiator_present { + Directive::ApplyAnswer(message.data) + } else { + Directive::BufferAnswer(message.data) + } + } + "candidate" => { + if state.initiator_present { + Directive::AddCandidateInitiator(message.data) + } else if state.responder_present { + Directive::AddCandidateResponder(message.data) + } else { + Directive::IgnoreCandidate + } + } + "text" => Directive::ShowText { + from: message.from, + body: message.data, + }, + other => Directive::Unknown(other.to_string()), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn sample_message(msg_type: &str, data: &str) -> SignalingMessage { + SignalingMessage { + from: "alice".into(), + to: "bob".into(), + msg_type: msg_type.into(), + data: data.into(), + } + } + + #[test] + fn routes_offer() { + let router = MessageRouter::default(); + let msg = sample_message("offer", "sdp"); + let directive = router.route(msg.clone(), RouterState::default()); + assert_eq!(directive, Directive::ForwardOffer(msg)); + } + + #[test] + fn routes_answer_based_on_initiator_presence() { + let router = MessageRouter::default(); + let msg = sample_message("answer", "sdp"); + let directive_with_pc = router.route( + msg.clone(), + RouterState { + initiator_present: true, + responder_present: false, + }, + ); + assert_eq!(directive_with_pc, Directive::ApplyAnswer("sdp".into())); + + let directive_without_pc = router.route( + msg.clone(), + RouterState { + initiator_present: false, + responder_present: false, + }, + ); + assert_eq!(directive_without_pc, Directive::BufferAnswer("sdp".into())); + } + + #[test] + fn routes_candidate_preferring_initiator_then_responder() { + let router = MessageRouter::default(); + let msg = sample_message("candidate", "ice"); + + let directive_init = router.route( + msg.clone(), + RouterState { + initiator_present: true, + responder_present: true, + }, + ); + assert_eq!( + directive_init, + Directive::AddCandidateInitiator("ice".into()) + ); + + let directive_resp = router.route( + msg.clone(), + RouterState { + initiator_present: false, + responder_present: true, + }, + ); + assert_eq!( + directive_resp, + Directive::AddCandidateResponder("ice".into()) + ); + + let directive_none = router.route(msg, RouterState::default()); + assert_eq!(directive_none, Directive::IgnoreCandidate); + } + + #[test] + fn routes_text_and_unknown() { + let router = MessageRouter::default(); + let text = router.route(sample_message("text", "hi"), RouterState::default()); + assert_eq!( + text, + Directive::ShowText { + from: "alice".into(), + body: "hi".into(), + } + ); + + let unknown = router.route(sample_message("foo", "bar"), RouterState::default()); + assert_eq!(unknown, Directive::Unknown("foo".into())); + } +}