test(signaling): cover offer/answer routing

This commit is contained in:
ghost 2025-11-04 17:37:30 +01:00
parent c6e1934ceb
commit 2bdf4789bd
3 changed files with 280 additions and 110 deletions

View File

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone, PartialEq, Eq)]
pub struct SignalingMessage { pub struct SignalingMessage {
pub from: String, pub from: String,
pub to: String, pub to: String,

View File

@ -1,3 +1,4 @@
mod message_router;
mod reconnect; mod reconnect;
use std::{cell::RefCell, rc::Rc}; use std::{cell::RefCell, rc::Rc};
@ -8,6 +9,7 @@ use crate::{
use dioxus::prelude::*; use dioxus::prelude::*;
use futures::StreamExt; use futures::StreamExt;
use gloo_timers::future::TimeoutFuture; use gloo_timers::future::TimeoutFuture;
use message_router::{Directive as MessageDirective, MessageRouter, RouterState};
use reconnect::{DisconnectReason, ReconnectController}; use reconnect::{DisconnectReason, ReconnectController};
use wasm_bindgen::{prelude::Closure, JsCast, JsValue}; use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
use wasm_bindgen_futures::spawn_local; use wasm_bindgen_futures::spawn_local;
@ -322,11 +324,10 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
} }
let attempt_status = { let attempt_status = {
let current = reconnect_ctrl_handle.read().clone(); let mut ctrl = reconnect_ctrl_handle.read().clone();
let mut updated = current; let status = ctrl.begin_attempt();
let status = updated.begin_attempt();
let mut writer = reconnect_ctrl_handle.clone(); let mut writer = reconnect_ctrl_handle.clone();
writer.set(updated); writer.set(ctrl);
status status
}; };
ws_status_handle.set(attempt_status.status_message.clone()); ws_status_handle.set(attempt_status.status_message.clone());
@ -340,11 +341,10 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let connect_slot_ref = connect_slot_ref.clone(); let connect_slot_ref = connect_slot_ref.clone();
Rc::new(move |reason| { Rc::new(move |reason| {
let retry_plan = { let retry_plan = {
let current = reconnect_ctrl_handle.read().clone(); let mut ctrl = reconnect_ctrl_handle.read().clone();
let mut updated = current; let plan = ctrl.schedule_retry(reason);
let plan = updated.schedule_retry(reason);
let mut writer = reconnect_ctrl_handle.clone(); let mut writer = reconnect_ctrl_handle.clone();
writer.set(updated); writer.set(ctrl);
plan plan
}; };
let mut status_writer = ws_status_signal.clone(); let mut status_writer = ws_status_signal.clone();
@ -379,11 +379,10 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let onopen = Closure::wrap(Box::new(move |_: web_sys::Event| { let onopen = Closure::wrap(Box::new(move |_: web_sys::Event| {
log::info!("✅ WebSocket verbunden!"); log::info!("✅ WebSocket verbunden!");
let status = { let status = {
let current = reconnect_for_open.read().clone(); let mut ctrl = reconnect_for_open.read().clone();
let mut updated = current; let status = ctrl.handle_success();
let status = updated.handle_success();
let mut writer = reconnect_for_open.clone(); let mut writer = reconnect_for_open.clone();
writer.set(updated); writer.set(ctrl);
status status
}; };
ws_status_clone.set(status); ws_status_clone.set(status);
@ -441,112 +440,138 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
log::info!("📨 WebSocket Nachricht: {}", text); log::info!("📨 WebSocket Nachricht: {}", text);
match serde_json::from_str::<SignalingMessage>(&text) { match serde_json::from_str::<SignalingMessage>(&text) {
Ok(msg) => match msg.msg_type.as_str() { Ok(msg) => {
"offer" => offer_tx.send(msg), let state = RouterState {
"answer" => { initiator_present: initiator_for_router.read().is_some(),
let data_clone = msg.data.clone(); responder_present: responder_for_router.read().is_some(),
if let Some(pc) = initiator_for_router.read().as_ref() { };
let pc_clone = pc.clone(); match MessageRouter::default().route(msg, state) {
let err_signal = error_for_message.clone(); MessageDirective::ForwardOffer(message) => {
spawn_local(async move { offer_tx.send(message)
match MediaManager::handle_answer(
&pc_clone,
&data_clone,
)
.await
{
Ok(_) => log::info!(
"✅ Answer erfolgreich gesetzt auf Initiator-PC"
),
Err(e) => {
log::error!(
"❌ Fehler beim Setzen der Answer auf Initiator-PC: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to apply remote answer: {}",
e
)));
}
}
});
} else {
log::warn!(
"⚠️ Keine Initiator-PeerConnection vorhanden - buffer Answer"
);
let mut pending_handle = pending_answer_signal.clone();
pending_handle.set(Some(data_clone));
} }
} MessageDirective::ApplyAnswer(data) => {
"candidate" => { if let Some(pc) = initiator_for_router.read().as_ref() {
let data_clone = msg.data.clone(); let pc_clone = pc.clone();
if let Some(pc) = initiator_for_router.read().as_ref() { let err_signal = error_for_message.clone();
let pc_clone = pc.clone(); let answer_data = data.clone();
let err_signal = error_for_message.clone(); spawn_local(async move {
spawn_local(async move { match MediaManager::handle_answer(
match MediaManager::add_ice_candidate( &pc_clone,
&pc_clone, &answer_data,
&data_clone, )
) { .await
Ok(_) => log::info!( {
"✅ Kandidat zur Initiator-PC hinzugefügt" Ok(_) => log::info!(
), "✅ Answer erfolgreich gesetzt auf Initiator-PC"
Err(e) => { ),
log::error!( Err(e) => {
"❌ Kandidat konnte nicht hinzugefügt werden: {}", log::error!(
e "❌ Fehler beim Setzen der Answer auf Initiator-PC: {}",
); e
let mut handle = err_signal.clone(); );
handle.set(Some(format!( let mut handle = err_signal.clone();
"Failed to add ICE candidate (initiator): {}", handle.set(Some(format!(
e "Failed to apply remote answer: {}",
))); e
)));
}
} }
} });
}); } else {
} else if let Some(pc) = log::warn!(
responder_for_router.read().as_ref() "⚠️ Keine Initiator-PeerConnection vorhanden - buffer Answer"
{ );
let pc_clone = pc.clone(); let mut pending_handle =
let err_signal = error_for_message.clone(); pending_answer_signal.clone();
spawn_local(async move { pending_handle.set(Some(data));
match MediaManager::add_ice_candidate( }
&pc_clone, }
&data_clone, MessageDirective::BufferAnswer(data) => {
) { let mut pending_handle = pending_answer_signal.clone();
Ok(_) => log::info!( pending_handle.set(Some(data));
"✅ Kandidat zur Responder-PC hinzugefügt" }
), MessageDirective::AddCandidateInitiator(candidate) => {
Err(e) => { if let Some(pc) = initiator_for_router.read().as_ref() {
log::error!( let pc_clone = pc.clone();
"❌ Kandidat konnte nicht hinzugefügt werden: {}", let err_signal = error_for_message.clone();
e let candidate_payload = candidate.clone();
); spawn_local(async move {
let mut handle = err_signal.clone(); match MediaManager::add_ice_candidate(
handle.set(Some(format!( &pc_clone,
"Failed to add ICE candidate (responder): {}", &candidate_payload,
e ) {
))); Ok(_) => log::info!(
"✅ Kandidat zur Initiator-PC hinzugefügt"
),
Err(e) => {
log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to add ICE candidate (initiator): {}",
e
)));
}
} }
} });
}); } else {
} else { log::warn!(
"⚠️ Initiator-PC verschwunden, Kandidat verworfen"
);
}
}
MessageDirective::AddCandidateResponder(candidate) => {
if let Some(pc) = responder_for_router.read().as_ref() {
let pc_clone = pc.clone();
let err_signal = error_for_message.clone();
let candidate_payload = candidate.clone();
spawn_local(async move {
match MediaManager::add_ice_candidate(
&pc_clone,
&candidate_payload,
) {
Ok(_) => log::info!(
"✅ Kandidat zur Responder-PC hinzugefügt"
),
Err(e) => {
log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to add ICE candidate (responder): {}",
e
)));
}
}
});
} else {
log::warn!(
"⚠️ Responder-PC fehlt, Kandidat verworfen"
);
}
}
MessageDirective::IgnoreCandidate => {
log::warn!( log::warn!(
"⚠️ Kein PeerConnection verfügbar, um Kandidaten hinzuzufügen" "⚠️ Kein PeerConnection verfügbar, um Kandidaten hinzuzufügen"
); );
} }
} MessageDirective::ShowText { from, body } => {
"text" => { if let Some(window) = web_sys::window() {
if let Some(window) = web_sys::window() { let _ = window.alert_with_message(&format!(
let _ = window.alert_with_message(&format!( "Nachricht von {}:\n{}",
"Nachricht von {}:\n{}", from, body
msg.from, msg.data ));
)); }
}
MessageDirective::Unknown(kind) => {
log::info!("❓ Unbekannte Nachricht: {}", kind);
} }
} }
_ => log::info!("❓ Unbekannte Nachricht: {}", msg.msg_type), }
},
Err(e) => { Err(e) => {
log::error!( log::error!(
"❌ Signaling Nachricht konnte nicht gelesen werden: {}", "❌ Signaling Nachricht konnte nicht gelesen werden: {}",

View File

@ -0,0 +1,145 @@
use crate::models::SignalingMessage;
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub struct RouterState {
pub initiator_present: bool,
pub responder_present: bool,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum Directive {
ForwardOffer(SignalingMessage),
ApplyAnswer(String),
BufferAnswer(String),
AddCandidateInitiator(String),
AddCandidateResponder(String),
IgnoreCandidate,
ShowText { from: String, body: String },
Unknown(String),
}
#[derive(Default)]
pub struct MessageRouter;
impl MessageRouter {
pub fn route(&self, message: SignalingMessage, state: RouterState) -> Directive {
match message.msg_type.as_str() {
"offer" => Directive::ForwardOffer(message),
"answer" => {
if state.initiator_present {
Directive::ApplyAnswer(message.data)
} else {
Directive::BufferAnswer(message.data)
}
}
"candidate" => {
if state.initiator_present {
Directive::AddCandidateInitiator(message.data)
} else if state.responder_present {
Directive::AddCandidateResponder(message.data)
} else {
Directive::IgnoreCandidate
}
}
"text" => Directive::ShowText {
from: message.from,
body: message.data,
},
other => Directive::Unknown(other.to_string()),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn sample_message(msg_type: &str, data: &str) -> SignalingMessage {
SignalingMessage {
from: "alice".into(),
to: "bob".into(),
msg_type: msg_type.into(),
data: data.into(),
}
}
#[test]
fn routes_offer() {
let router = MessageRouter::default();
let msg = sample_message("offer", "sdp");
let directive = router.route(msg.clone(), RouterState::default());
assert_eq!(directive, Directive::ForwardOffer(msg));
}
#[test]
fn routes_answer_based_on_initiator_presence() {
let router = MessageRouter::default();
let msg = sample_message("answer", "sdp");
let directive_with_pc = router.route(
msg.clone(),
RouterState {
initiator_present: true,
responder_present: false,
},
);
assert_eq!(directive_with_pc, Directive::ApplyAnswer("sdp".into()));
let directive_without_pc = router.route(
msg.clone(),
RouterState {
initiator_present: false,
responder_present: false,
},
);
assert_eq!(directive_without_pc, Directive::BufferAnswer("sdp".into()));
}
#[test]
fn routes_candidate_preferring_initiator_then_responder() {
let router = MessageRouter::default();
let msg = sample_message("candidate", "ice");
let directive_init = router.route(
msg.clone(),
RouterState {
initiator_present: true,
responder_present: true,
},
);
assert_eq!(
directive_init,
Directive::AddCandidateInitiator("ice".into())
);
let directive_resp = router.route(
msg.clone(),
RouterState {
initiator_present: false,
responder_present: true,
},
);
assert_eq!(
directive_resp,
Directive::AddCandidateResponder("ice".into())
);
let directive_none = router.route(msg, RouterState::default());
assert_eq!(directive_none, Directive::IgnoreCandidate);
}
#[test]
fn routes_text_and_unknown() {
let router = MessageRouter::default();
let text = router.route(sample_message("text", "hi"), RouterState::default());
assert_eq!(
text,
Directive::ShowText {
from: "alice".into(),
body: "hi".into(),
}
);
let unknown = router.route(sample_message("foo", "bar"), RouterState::default());
assert_eq!(unknown, Directive::Unknown("foo".into()));
}
}