test(signaling): cover reconnect backoff and error helper
This commit is contained in:
parent
fa6f292b9f
commit
c6e1934ceb
@ -1,3 +1,5 @@
|
||||
mod reconnect;
|
||||
|
||||
use std::{cell::RefCell, rc::Rc};
|
||||
|
||||
use crate::{
|
||||
@ -6,6 +8,7 @@ use crate::{
|
||||
use dioxus::prelude::*;
|
||||
use futures::StreamExt;
|
||||
use gloo_timers::future::TimeoutFuture;
|
||||
use reconnect::{DisconnectReason, ReconnectController};
|
||||
use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
|
||||
use wasm_bindgen_futures::spawn_local;
|
||||
use web_sys::{
|
||||
@ -51,11 +54,6 @@ pub fn use_signaling() -> SignalingService {
|
||||
use_context::<SignalingService>()
|
||||
}
|
||||
|
||||
fn retry_delay_ms(attempt: u32) -> u32 {
|
||||
let capped = attempt.min(5);
|
||||
1000 * (1u32 << capped)
|
||||
}
|
||||
|
||||
#[derive(Props, Clone, PartialEq)]
|
||||
pub struct SignalingProviderProps {
|
||||
pub children: Element,
|
||||
@ -76,7 +74,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
let in_call = use_signal(|| false);
|
||||
let audio_muted = use_signal(|| false);
|
||||
let last_error = use_signal(|| None::<String>);
|
||||
let reconnect_attempts = use_signal(|| 0u32);
|
||||
let reconnect_ctrl = use_signal(ReconnectController::default);
|
||||
let connect_slot: Rc<RefCell<Option<Rc<dyn Fn()>>>> = Rc::new(RefCell::new(None));
|
||||
|
||||
let cfg_signal: Signal<Config> = use_context();
|
||||
@ -305,7 +303,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
let initiator_connection_signal = initiator_connection.clone();
|
||||
let responder_connection_signal = responder_connection.clone();
|
||||
let last_error_signal = last_error.clone();
|
||||
let reconnect_attempts_signal = reconnect_attempts.clone();
|
||||
let reconnect_ctrl_signal = reconnect_ctrl.clone();
|
||||
let connect_slot_ref = connect_slot.clone();
|
||||
|
||||
Rc::new(move || {
|
||||
@ -316,38 +314,44 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
let responder_connection = responder_connection_signal.clone();
|
||||
let cfg_signal = cfg_signal_handle.clone();
|
||||
let error_signal = last_error_signal.clone();
|
||||
let reconnect_attempts_handle = reconnect_attempts_signal.clone();
|
||||
let reconnect_ctrl_handle = reconnect_ctrl_signal.clone();
|
||||
let connect_slot_ref = connect_slot_ref.clone();
|
||||
|
||||
if *connected_handle.read() || websocket_handle.read().is_some() {
|
||||
return;
|
||||
}
|
||||
|
||||
let attempt_index = *reconnect_attempts_handle.read();
|
||||
if attempt_index == 0 {
|
||||
ws_status_handle.set("Verbinde...".to_string());
|
||||
} else {
|
||||
ws_status_handle.set(format!("Verbinde... (Versuch {})", attempt_index + 1));
|
||||
}
|
||||
let attempt_status = {
|
||||
let current = reconnect_ctrl_handle.read().clone();
|
||||
let mut updated = current;
|
||||
let status = updated.begin_attempt();
|
||||
let mut writer = reconnect_ctrl_handle.clone();
|
||||
writer.set(updated);
|
||||
status
|
||||
};
|
||||
ws_status_handle.set(attempt_status.status_message.clone());
|
||||
|
||||
let mut err_handle = error_signal.clone();
|
||||
err_handle.set(None);
|
||||
|
||||
let schedule_retry: Rc<dyn Fn()> = {
|
||||
let reconnect_attempts_handle = reconnect_attempts_handle.clone();
|
||||
let schedule_retry: Rc<dyn Fn(DisconnectReason)> = {
|
||||
let reconnect_ctrl_handle = reconnect_ctrl_handle.clone();
|
||||
let ws_status_signal = ws_status_signal.clone();
|
||||
let connect_slot_ref = connect_slot_ref.clone();
|
||||
Rc::new(move || {
|
||||
let attempt = *reconnect_attempts_handle.read();
|
||||
let delay_ms = retry_delay_ms(attempt);
|
||||
let mut attempts_writer = reconnect_attempts_handle.clone();
|
||||
attempts_writer.set(attempt.saturating_add(1));
|
||||
let seconds = (delay_ms / 1000).max(1);
|
||||
Rc::new(move |reason| {
|
||||
let retry_plan = {
|
||||
let current = reconnect_ctrl_handle.read().clone();
|
||||
let mut updated = current;
|
||||
let plan = updated.schedule_retry(reason);
|
||||
let mut writer = reconnect_ctrl_handle.clone();
|
||||
writer.set(updated);
|
||||
plan
|
||||
};
|
||||
let mut status_writer = ws_status_signal.clone();
|
||||
status_writer.set(format!("Reconnect in {}s...", seconds));
|
||||
status_writer.set(retry_plan.status_message.clone());
|
||||
let connect_slot_clone = connect_slot_ref.clone();
|
||||
spawn_local(async move {
|
||||
TimeoutFuture::new(delay_ms).await;
|
||||
TimeoutFuture::new(retry_plan.delay_ms).await;
|
||||
if let Some(cb) = connect_slot_clone.borrow().as_ref() {
|
||||
cb();
|
||||
}
|
||||
@ -371,15 +375,21 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
let mut ws_status_clone = ws_status_handle.clone();
|
||||
let mut connected_clone = connected_handle.clone();
|
||||
let error_for_open = error_signal.clone();
|
||||
let reconnect_for_open = reconnect_attempts_handle.clone();
|
||||
let reconnect_for_open = reconnect_ctrl_handle.clone();
|
||||
let onopen = Closure::wrap(Box::new(move |_: web_sys::Event| {
|
||||
log::info!("✅ WebSocket verbunden!");
|
||||
ws_status_clone.set("Verbunden".to_string());
|
||||
let status = {
|
||||
let current = reconnect_for_open.read().clone();
|
||||
let mut updated = current;
|
||||
let status = updated.handle_success();
|
||||
let mut writer = reconnect_for_open.clone();
|
||||
writer.set(updated);
|
||||
status
|
||||
};
|
||||
ws_status_clone.set(status);
|
||||
connected_clone.set(true);
|
||||
let mut handle = error_for_open.clone();
|
||||
handle.set(None);
|
||||
let mut reset_attempts = reconnect_for_open.clone();
|
||||
reset_attempts.set(0);
|
||||
})
|
||||
as Box<dyn FnMut(web_sys::Event)>);
|
||||
|
||||
@ -396,7 +406,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
handle.set(Some("Verbindung zum Signaling-Server beendet".to_string()));
|
||||
let mut ws_handle = websocket_for_close.clone();
|
||||
ws_handle.set(None);
|
||||
schedule_on_close();
|
||||
schedule_on_close(DisconnectReason::Closed);
|
||||
})
|
||||
as Box<dyn FnMut(web_sys::CloseEvent)>);
|
||||
|
||||
@ -416,7 +426,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
let mut ws_handle = websocket_for_error.clone();
|
||||
if ws_handle.read().is_some() {
|
||||
ws_handle.set(None);
|
||||
schedule_on_error();
|
||||
schedule_on_error(DisconnectReason::Error);
|
||||
}
|
||||
})
|
||||
as Box<dyn FnMut(web_sys::Event)>);
|
||||
@ -573,7 +583,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
"Verbindung zum Signaling-Server fehlgeschlagen: {:?}",
|
||||
e
|
||||
)));
|
||||
schedule_retry();
|
||||
schedule_retry(DisconnectReason::Error);
|
||||
}
|
||||
}
|
||||
})
|
||||
@ -585,11 +595,11 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
|
||||
}
|
||||
|
||||
let connect_action = {
|
||||
let reconnect_attempts = reconnect_attempts.clone();
|
||||
let reconnect_ctrl_signal = reconnect_ctrl.clone();
|
||||
let connect_logic = connect_logic.clone();
|
||||
Rc::new(move || {
|
||||
let mut attempts = reconnect_attempts.clone();
|
||||
attempts.set(0);
|
||||
let mut ctrl_signal = reconnect_ctrl_signal.clone();
|
||||
ctrl_signal.set(ReconnectController::default());
|
||||
connect_logic();
|
||||
})
|
||||
};
|
||||
|
||||
123
src/services/signaling/reconnect.rs
Normal file
123
src/services/signaling/reconnect.rs
Normal file
@ -0,0 +1,123 @@
|
||||
use std::fmt::{self, Display, Formatter};
|
||||
|
||||
/// Controls reconnect attempts and status messages for the signaling WebSocket.
|
||||
#[derive(Clone, Debug, Default, PartialEq, Eq)]
|
||||
pub struct ReconnectController {
|
||||
attempt: u32,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct AttemptStatus {
|
||||
pub attempt_index: u32,
|
||||
pub status_message: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub struct RetryPlan {
|
||||
pub next_attempt_index: u32,
|
||||
pub delay_ms: u32,
|
||||
pub status_message: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
|
||||
pub enum DisconnectReason {
|
||||
Closed,
|
||||
Error,
|
||||
}
|
||||
|
||||
impl Display for DisconnectReason {
|
||||
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
|
||||
match self {
|
||||
DisconnectReason::Closed => write!(f, "Close"),
|
||||
DisconnectReason::Error => write!(f, "Error"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ReconnectController {
|
||||
#[allow(dead_code)]
|
||||
pub fn new() -> Self {
|
||||
Self { attempt: 0 }
|
||||
}
|
||||
|
||||
pub fn reset(&mut self) {
|
||||
self.attempt = 0;
|
||||
}
|
||||
|
||||
pub fn begin_attempt(&mut self) -> AttemptStatus {
|
||||
let message = if self.attempt == 0 {
|
||||
"Verbinde...".to_string()
|
||||
} else {
|
||||
format!("Verbinde... (Versuch {})", self.attempt + 1)
|
||||
};
|
||||
AttemptStatus {
|
||||
attempt_index: self.attempt,
|
||||
status_message: message,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_success(&mut self) -> String {
|
||||
self.reset();
|
||||
"Verbunden".to_string()
|
||||
}
|
||||
|
||||
pub fn schedule_retry(&mut self, _reason: DisconnectReason) -> RetryPlan {
|
||||
let delay_ms = retry_delay_ms(self.attempt);
|
||||
let next_attempt = self.attempt.saturating_add(1);
|
||||
let seconds = (delay_ms / 1000).max(1);
|
||||
let status = format!("Reconnect in {}s...", seconds);
|
||||
self.attempt = next_attempt;
|
||||
RetryPlan {
|
||||
next_attempt_index: next_attempt,
|
||||
delay_ms,
|
||||
status_message: status,
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
pub fn current_attempt(&self) -> u32 {
|
||||
self.attempt
|
||||
}
|
||||
}
|
||||
|
||||
pub fn retry_delay_ms(attempt: u32) -> u32 {
|
||||
let capped = attempt.min(5);
|
||||
1000 * (1u32 << capped)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn attempt_messages_change_with_retry() {
|
||||
let mut ctrl = ReconnectController::new();
|
||||
assert_eq!(ctrl.begin_attempt().status_message, "Verbinde...");
|
||||
let plan = ctrl.schedule_retry(DisconnectReason::Error);
|
||||
assert_eq!(plan.next_attempt_index, 1);
|
||||
assert_eq!(plan.status_message, "Reconnect in 1s...");
|
||||
let status = ctrl.begin_attempt();
|
||||
assert_eq!(status.status_message, "Verbinde... (Versuch 2)");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn success_resets_attempts() {
|
||||
let mut ctrl = ReconnectController::new();
|
||||
ctrl.schedule_retry(DisconnectReason::Error);
|
||||
let status = ctrl.handle_success();
|
||||
assert_eq!(status, "Verbunden");
|
||||
assert_eq!(ctrl.current_attempt(), 0);
|
||||
assert_eq!(ctrl.begin_attempt().status_message, "Verbinde...");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn delay_caps_after_five_attempts() {
|
||||
assert_eq!(retry_delay_ms(0), 1000);
|
||||
assert_eq!(retry_delay_ms(1), 2000);
|
||||
assert_eq!(retry_delay_ms(2), 4000);
|
||||
assert_eq!(retry_delay_ms(3), 8000);
|
||||
assert_eq!(retry_delay_ms(4), 16000);
|
||||
assert_eq!(retry_delay_ms(5), 32000);
|
||||
assert_eq!(retry_delay_ms(6), 32000);
|
||||
}
|
||||
}
|
||||
32
tests/error_actions_tests.rs
Normal file
32
tests/error_actions_tests.rs
Normal file
@ -0,0 +1,32 @@
|
||||
use std::cell::RefCell;
|
||||
use std::rc::Rc;
|
||||
|
||||
use niom_webrtc::components::ErrorActions;
|
||||
|
||||
#[test]
|
||||
fn report_and_clear_delegate_to_callbacks() {
|
||||
let reported: Rc<RefCell<Vec<String>>> = Rc::new(RefCell::new(Vec::new()));
|
||||
let cleared: Rc<RefCell<u32>> = Rc::new(RefCell::new(0));
|
||||
|
||||
let report_fn = {
|
||||
let reported = Rc::clone(&reported);
|
||||
Rc::new(move |msg: String| {
|
||||
reported.borrow_mut().push(msg);
|
||||
}) as Rc<dyn Fn(String)>
|
||||
};
|
||||
|
||||
let clear_fn = {
|
||||
let cleared = Rc::clone(&cleared);
|
||||
Rc::new(move || {
|
||||
*cleared.borrow_mut() += 1;
|
||||
}) as Rc<dyn Fn()>
|
||||
};
|
||||
|
||||
let actions = ErrorActions::new(report_fn, clear_fn);
|
||||
actions.report("boom");
|
||||
actions.report("bam");
|
||||
actions.clear();
|
||||
|
||||
assert_eq!(reported.borrow().as_slice(), ["boom", "bam"]);
|
||||
assert_eq!(*cleared.borrow(), 1);
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user