diff --git a/src/services/signaling.rs b/src/services/signaling.rs index c7abeef..2530de7 100644 --- a/src/services/signaling.rs +++ b/src/services/signaling.rs @@ -1,3 +1,5 @@ +mod reconnect; + use std::{cell::RefCell, rc::Rc}; use crate::{ @@ -6,6 +8,7 @@ use crate::{ use dioxus::prelude::*; use futures::StreamExt; use gloo_timers::future::TimeoutFuture; +use reconnect::{DisconnectReason, ReconnectController}; use wasm_bindgen::{prelude::Closure, JsCast, JsValue}; use wasm_bindgen_futures::spawn_local; use web_sys::{ @@ -51,11 +54,6 @@ pub fn use_signaling() -> SignalingService { use_context::() } -fn retry_delay_ms(attempt: u32) -> u32 { - let capped = attempt.min(5); - 1000 * (1u32 << capped) -} - #[derive(Props, Clone, PartialEq)] pub struct SignalingProviderProps { pub children: Element, @@ -76,7 +74,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { let in_call = use_signal(|| false); let audio_muted = use_signal(|| false); let last_error = use_signal(|| None::); - let reconnect_attempts = use_signal(|| 0u32); + let reconnect_ctrl = use_signal(ReconnectController::default); let connect_slot: Rc>>> = Rc::new(RefCell::new(None)); let cfg_signal: Signal = use_context(); @@ -305,7 +303,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { let initiator_connection_signal = initiator_connection.clone(); let responder_connection_signal = responder_connection.clone(); let last_error_signal = last_error.clone(); - let reconnect_attempts_signal = reconnect_attempts.clone(); + let reconnect_ctrl_signal = reconnect_ctrl.clone(); let connect_slot_ref = connect_slot.clone(); Rc::new(move || { @@ -316,38 +314,44 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { let responder_connection = responder_connection_signal.clone(); let cfg_signal = cfg_signal_handle.clone(); let error_signal = last_error_signal.clone(); - let reconnect_attempts_handle = reconnect_attempts_signal.clone(); + let reconnect_ctrl_handle = reconnect_ctrl_signal.clone(); let connect_slot_ref = connect_slot_ref.clone(); if *connected_handle.read() || websocket_handle.read().is_some() { return; } - let attempt_index = *reconnect_attempts_handle.read(); - if attempt_index == 0 { - ws_status_handle.set("Verbinde...".to_string()); - } else { - ws_status_handle.set(format!("Verbinde... (Versuch {})", attempt_index + 1)); - } + let attempt_status = { + let current = reconnect_ctrl_handle.read().clone(); + let mut updated = current; + let status = updated.begin_attempt(); + let mut writer = reconnect_ctrl_handle.clone(); + writer.set(updated); + status + }; + ws_status_handle.set(attempt_status.status_message.clone()); let mut err_handle = error_signal.clone(); err_handle.set(None); - let schedule_retry: Rc = { - let reconnect_attempts_handle = reconnect_attempts_handle.clone(); + let schedule_retry: Rc = { + let reconnect_ctrl_handle = reconnect_ctrl_handle.clone(); let ws_status_signal = ws_status_signal.clone(); let connect_slot_ref = connect_slot_ref.clone(); - Rc::new(move || { - let attempt = *reconnect_attempts_handle.read(); - let delay_ms = retry_delay_ms(attempt); - let mut attempts_writer = reconnect_attempts_handle.clone(); - attempts_writer.set(attempt.saturating_add(1)); - let seconds = (delay_ms / 1000).max(1); + Rc::new(move |reason| { + let retry_plan = { + let current = reconnect_ctrl_handle.read().clone(); + let mut updated = current; + let plan = updated.schedule_retry(reason); + let mut writer = reconnect_ctrl_handle.clone(); + writer.set(updated); + plan + }; let mut status_writer = ws_status_signal.clone(); - status_writer.set(format!("Reconnect in {}s...", seconds)); + status_writer.set(retry_plan.status_message.clone()); let connect_slot_clone = connect_slot_ref.clone(); spawn_local(async move { - TimeoutFuture::new(delay_ms).await; + TimeoutFuture::new(retry_plan.delay_ms).await; if let Some(cb) = connect_slot_clone.borrow().as_ref() { cb(); } @@ -371,15 +375,21 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { let mut ws_status_clone = ws_status_handle.clone(); let mut connected_clone = connected_handle.clone(); let error_for_open = error_signal.clone(); - let reconnect_for_open = reconnect_attempts_handle.clone(); + let reconnect_for_open = reconnect_ctrl_handle.clone(); let onopen = Closure::wrap(Box::new(move |_: web_sys::Event| { log::info!("✅ WebSocket verbunden!"); - ws_status_clone.set("Verbunden".to_string()); + let status = { + let current = reconnect_for_open.read().clone(); + let mut updated = current; + let status = updated.handle_success(); + let mut writer = reconnect_for_open.clone(); + writer.set(updated); + status + }; + ws_status_clone.set(status); connected_clone.set(true); let mut handle = error_for_open.clone(); handle.set(None); - let mut reset_attempts = reconnect_for_open.clone(); - reset_attempts.set(0); }) as Box); @@ -396,7 +406,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { handle.set(Some("Verbindung zum Signaling-Server beendet".to_string())); let mut ws_handle = websocket_for_close.clone(); ws_handle.set(None); - schedule_on_close(); + schedule_on_close(DisconnectReason::Closed); }) as Box); @@ -416,7 +426,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { let mut ws_handle = websocket_for_error.clone(); if ws_handle.read().is_some() { ws_handle.set(None); - schedule_on_error(); + schedule_on_error(DisconnectReason::Error); } }) as Box); @@ -573,7 +583,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { "Verbindung zum Signaling-Server fehlgeschlagen: {:?}", e ))); - schedule_retry(); + schedule_retry(DisconnectReason::Error); } } }) @@ -585,11 +595,11 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element { } let connect_action = { - let reconnect_attempts = reconnect_attempts.clone(); + let reconnect_ctrl_signal = reconnect_ctrl.clone(); let connect_logic = connect_logic.clone(); Rc::new(move || { - let mut attempts = reconnect_attempts.clone(); - attempts.set(0); + let mut ctrl_signal = reconnect_ctrl_signal.clone(); + ctrl_signal.set(ReconnectController::default()); connect_logic(); }) }; diff --git a/src/services/signaling/reconnect.rs b/src/services/signaling/reconnect.rs new file mode 100644 index 0000000..5380379 --- /dev/null +++ b/src/services/signaling/reconnect.rs @@ -0,0 +1,123 @@ +use std::fmt::{self, Display, Formatter}; + +/// Controls reconnect attempts and status messages for the signaling WebSocket. +#[derive(Clone, Debug, Default, PartialEq, Eq)] +pub struct ReconnectController { + attempt: u32, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct AttemptStatus { + pub attempt_index: u32, + pub status_message: String, +} + +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct RetryPlan { + pub next_attempt_index: u32, + pub delay_ms: u32, + pub status_message: String, +} + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum DisconnectReason { + Closed, + Error, +} + +impl Display for DisconnectReason { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match self { + DisconnectReason::Closed => write!(f, "Close"), + DisconnectReason::Error => write!(f, "Error"), + } + } +} + +impl ReconnectController { + #[allow(dead_code)] + pub fn new() -> Self { + Self { attempt: 0 } + } + + pub fn reset(&mut self) { + self.attempt = 0; + } + + pub fn begin_attempt(&mut self) -> AttemptStatus { + let message = if self.attempt == 0 { + "Verbinde...".to_string() + } else { + format!("Verbinde... (Versuch {})", self.attempt + 1) + }; + AttemptStatus { + attempt_index: self.attempt, + status_message: message, + } + } + + pub fn handle_success(&mut self) -> String { + self.reset(); + "Verbunden".to_string() + } + + pub fn schedule_retry(&mut self, _reason: DisconnectReason) -> RetryPlan { + let delay_ms = retry_delay_ms(self.attempt); + let next_attempt = self.attempt.saturating_add(1); + let seconds = (delay_ms / 1000).max(1); + let status = format!("Reconnect in {}s...", seconds); + self.attempt = next_attempt; + RetryPlan { + next_attempt_index: next_attempt, + delay_ms, + status_message: status, + } + } + + #[allow(dead_code)] + pub fn current_attempt(&self) -> u32 { + self.attempt + } +} + +pub fn retry_delay_ms(attempt: u32) -> u32 { + let capped = attempt.min(5); + 1000 * (1u32 << capped) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn attempt_messages_change_with_retry() { + let mut ctrl = ReconnectController::new(); + assert_eq!(ctrl.begin_attempt().status_message, "Verbinde..."); + let plan = ctrl.schedule_retry(DisconnectReason::Error); + assert_eq!(plan.next_attempt_index, 1); + assert_eq!(plan.status_message, "Reconnect in 1s..."); + let status = ctrl.begin_attempt(); + assert_eq!(status.status_message, "Verbinde... (Versuch 2)"); + } + + #[test] + fn success_resets_attempts() { + let mut ctrl = ReconnectController::new(); + ctrl.schedule_retry(DisconnectReason::Error); + let status = ctrl.handle_success(); + assert_eq!(status, "Verbunden"); + assert_eq!(ctrl.current_attempt(), 0); + assert_eq!(ctrl.begin_attempt().status_message, "Verbinde..."); + } + + #[test] + fn delay_caps_after_five_attempts() { + assert_eq!(retry_delay_ms(0), 1000); + assert_eq!(retry_delay_ms(1), 2000); + assert_eq!(retry_delay_ms(2), 4000); + assert_eq!(retry_delay_ms(3), 8000); + assert_eq!(retry_delay_ms(4), 16000); + assert_eq!(retry_delay_ms(5), 32000); + assert_eq!(retry_delay_ms(6), 32000); + } +} diff --git a/tests/error_actions_tests.rs b/tests/error_actions_tests.rs new file mode 100644 index 0000000..0dbb998 --- /dev/null +++ b/tests/error_actions_tests.rs @@ -0,0 +1,32 @@ +use std::cell::RefCell; +use std::rc::Rc; + +use niom_webrtc::components::ErrorActions; + +#[test] +fn report_and_clear_delegate_to_callbacks() { + let reported: Rc>> = Rc::new(RefCell::new(Vec::new())); + let cleared: Rc> = Rc::new(RefCell::new(0)); + + let report_fn = { + let reported = Rc::clone(&reported); + Rc::new(move |msg: String| { + reported.borrow_mut().push(msg); + }) as Rc + }; + + let clear_fn = { + let cleared = Rc::clone(&cleared); + Rc::new(move || { + *cleared.borrow_mut() += 1; + }) as Rc + }; + + let actions = ErrorActions::new(report_fn, clear_fn); + actions.report("boom"); + actions.report("bam"); + actions.clear(); + + assert_eq!(reported.borrow().as_slice(), ["boom", "bam"]); + assert_eq!(*cleared.borrow(), 1); +}