feat(signaling): add exponential reconnect backoff

This commit is contained in:
ghost 2025-11-03 15:34:52 +01:00
parent a75b995cb0
commit fa6f292b9f
11 changed files with 454 additions and 59 deletions

1
Cargo.lock generated
View File

@ -2937,6 +2937,7 @@ dependencies = [
"dioxus-logger",
"futures",
"gloo-net",
"gloo-timers",
"js-sys",
"log",
"serde",

View File

@ -53,6 +53,7 @@ serde = { version = "1.0.142", features = ["derive"] }
serde_json = "1.0.100"
futures = "0.3.31"
gloo-net = "0.6"
gloo-timers = { version = "0.3", features = ["futures"] }
[dev-dependencies]
tempfile = "3.6"

View File

@ -8,7 +8,7 @@ Dieses Dokument beschreibt, wie das Voice-Channel-Modul mit Signaling-Server (`n
- **TURN (`niom-turn`)**: UDP/TLS TURN-Server mit STUN-Fallback. Sorgt für NAT-Traversal, sobald ICE-Candidates verwendet werden.
## Flow (MVP Heutiger Stand)
1. **WebSocket-Aufbau**: `ConnectionPanel` verbindet sich zu `ws://localhost:3478/ws`.
1. **WebSocket-Aufbau & Reconnect**: `ConnectionPanel` verbindet sich zu `ws://localhost:3478/ws` und nutzt den `SignalingProvider`, der bei Fehlern mit exponentiellem Backoff erneut verbindet.
2. **Peer-ID**: Clients generieren lokale IDs; Remote-ID wird manuell eingetragen.
3. **Offer/Answer**:
- Initiator (`CallControls`) baut `RtcPeerConnection`, hängt lokale Audio-Tracks an und erzeugt ein Offer (SDP).

View File

@ -13,7 +13,7 @@
## Ablauf
1. **Peer-ID generieren** (`use_effect` mit Timestamp + Random).
2. **WebSocket verbinden** (`ws://localhost:3478/ws`): setzt `onopen/onclose/onmessage` Handler.
2. **WebSocket verbinden** (`ws://localhost:3478/ws`): setzt `onopen/onclose/onmessage` Handler und löst bei Verbindungsabbruch automatisch exponentielles Backoff-Reconnect aus.
3. **Offer-Coroutine** (`use_coroutine`): empfängt `SignalingMessage` vom Typ `offer`, baut PeerConnection (Responder) und sendet Answer.
4. **ICE-Kandidaten**: `onicecandidate` sendet Kandidaten zurück; empfangene Kandidaten werden an vorhandene PCs angehängt.
5. **Audio-Wiedergabe**: `ontrack` erstellt `<audio>` Element und setzt `srcObject`.
@ -22,4 +22,4 @@
- Room-basierte IDs statt manueller Eingabe.
- UI-Feedback bei kopierter Peer-ID (Clipboard API).
- Mehr Statusanzeigen (ICE connected, TURN usage).
- Fehler-Handling für WebSocket-Disconnects mit Auto-Reconnect.
- Countdown/Versuchsindikator für laufende Reconnects im UI anzeigen.

View File

@ -11,12 +11,18 @@ Diese Dokumentationssammlung beschreibt das MVP-Modul "Voice Channel" im Projekt
## Aktueller Fokus
- Discord-Voice-Channel UI als reines Modul ohne restliche App-Shell.
- Saubere Trennung von Initiator/Responder-Logik.
- Automatisches WebSocket-Reconnect mit exponentiellem Backoff (Stand 03.11.2025).
- Testbarkeit im Browser (WASM) und auf CLI-Ebene.
## Offene ToDos (Stand 02.11.2025)
1. WebRTC-/Signaling-Logik aus Komponenten in dedizierte Hooks/Services auslagern (z.B. `use_signaling`, `use_peer_connection`) und globalen State für Teilnehmer & Sessions einführen.
## Offene ToDos (Stand 03.11.2025)
1. UI-Feedback für Reconnect-Zustand verbessern (Versuchs-/Countdown-Anzeige, Fehlerbanner konsolidieren).
2. WebRTC-/Signaling-Logik weiter aus Komponenten in dedizierte Hooks/Services auslagern (z.B. `use_signaling`, `use_peer_connection`) und globalen State für Teilnehmer & Sessions einführen.
- Ziel: UI-Komponenten konsumieren nur noch lesende Signale & Events, Logik wird separat testbar.
2. TURN-Infrastruktur produktionsreif aufsetzen (Zertifikate, Auth, Monitoring) und E2E-Tests (Peer↔Peer via TURN) ergänzen.
3. UI modularisieren: Geräte-Setup, Fehlerbanner, Status-Badges, Vorbereitung auf Video-/Screen-Sharing-Tiles.
4. Signaling-Server erweitern (Raum-/Teilnehmermodell, AuthZ, robustes Error-Handling) und Schnittstellen dokumentieren.
5. CI-Pipeline mit `fmt`/`clippy`/Tests, Smoke-Tests (Web + CLI) und Playwright-Szenarien für Browserflows anlegen.
3. TURN-Infrastruktur produktionsreif aufsetzen (Zertifikate, Auth, Monitoring) und E2E-Tests (Peer↔Peer via TURN) ergänzen.
4. UI modularisieren: Geräte-Setup, Fehlerbanner, Status-Badges, Vorbereitung auf Video-/Screen-Sharing-Tiles.
5. Signaling-Server erweitern (Raum-/Teilnehmermodell, AuthZ, robustes Error-Handling) und Schnittstellen dokumentieren.
6. CI-Pipeline mit `fmt`/`clippy`/Tests, Smoke-Tests (Web + CLI) und Playwright-Szenarien für Browserflows anlegen.
## Changelog 03.11.2025
- Signaling-Service verbindet nach Verbindungsfehlern automatisch mit exponentiellem Backoff (`TimeoutFuture`).
- `ErrorActions`-Helper in UI integriert, um Fehlerzustände zentral zu setzen/zu löschen.

View File

@ -1,3 +1,4 @@
use super::ErrorActions;
use crate::services::signaling::use_signaling;
use dioxus::prelude::*;
@ -13,16 +14,20 @@ pub fn CallControls() -> Element {
let connected = *state.connected.read();
let mic_ready = *state.mic_granted.read();
let in_call = *state.in_call.read();
let audio_muted_signal = state.audio_muted.clone();
let muted = *audio_muted_signal.read();
let has_target = !remote_id_value.is_empty();
let request_microphone = actions.request_microphone.clone();
let start_call = actions.start_call.clone();
let leave_call = actions.leave_call.clone();
let mut audio_muted = use_signal(|| false);
let mut mute_signal = audio_muted.clone();
let mut reset_signal = audio_muted.clone();
let muted = *audio_muted.read();
let toggle_mute = actions.toggle_mute.clone();
let mute_signal_for_click = audio_muted_signal.clone();
let error_actions =
ErrorActions::new(actions.report_error.clone(), actions.clear_error.clone());
let errors_for_mic = error_actions.clone();
let errors_for_start = error_actions.clone();
let errors_for_leave = error_actions.clone();
rsx! {
div { class: "call-controls",
@ -38,6 +43,7 @@ pub fn CallControls() -> Element {
disabled: mic_ready,
onclick: move |_| {
log::info!("Requesting microphone permission");
errors_for_mic.clear();
request_microphone();
},
if mic_ready { "Mic ready" } else { "Enable mic" }
@ -46,11 +52,11 @@ pub fn CallControls() -> Element {
class: "ctrl-btn ctrl-btn--primary",
disabled: !mic_ready || !connected || !has_target || in_call,
onclick: move |_| {
errors_for_start.clear();
if in_call {
return;
}
log::info!("Launching WebRTC call as initiator");
audio_muted.set(false);
start_call();
},
if in_call { "In call" } else { "Start call" }
@ -59,9 +65,9 @@ pub fn CallControls() -> Element {
class: if muted { "ctrl-btn ctrl-btn--muted" } else { "ctrl-btn" },
disabled: !in_call,
onclick: move |_| {
let current = *mute_signal.read();
mute_signal.set(!current);
log::info!("Audio {}", if current { "unmuted" } else { "muted" });
let was_muted = *mute_signal_for_click.read();
(toggle_mute)();
log::info!("Audio {}", if was_muted { "unmuted" } else { "muted" });
},
if muted { "Unmute" } else { "Mute" }
}
@ -69,7 +75,7 @@ pub fn CallControls() -> Element {
class: "ctrl-btn ctrl-btn--danger",
disabled: !in_call,
onclick: move |_| {
reset_signal.set(false);
errors_for_leave.clear();
leave_call();
log::info!("Call ended");
},

View File

@ -1,3 +1,4 @@
use super::ErrorActions;
use crate::services::signaling::use_signaling;
use dioxus::prelude::*;
use wasm_bindgen::JsCast;
@ -20,6 +21,10 @@ pub fn ConnectionPanel() -> Element {
let reconnect = actions.connect.clone();
let set_remote_id_action = actions.set_remote_id.clone();
let error_actions =
ErrorActions::new(actions.report_error.clone(), actions.clear_error.clone());
let errors_for_copy = error_actions.clone();
let errors_for_input = error_actions.clone();
rsx! {
div { class: "connection-panel",
@ -84,6 +89,7 @@ pub fn ConnectionPanel() -> Element {
class: "icon-btn",
onclick: move |_| {
let copy_target = peer_id_for_copy.clone();
let error_actions = errors_for_copy.clone();
spawn_local(async move {
if let Some(window) = web_sys::window() {
let navigator = window.navigator();
@ -94,20 +100,41 @@ pub fn ConnectionPanel() -> Element {
let promise = clipboard.write_text(&copy_target);
if let Err(err) = JsFuture::from(promise).await {
log::warn!("Clipboard write failed: {:?}", err);
error_actions.report(format!(
"Clipboard write failed: {:?}",
err
));
} else {
log::info!("Peer ID copied to clipboard");
error_actions.clear();
}
}
Err(err) => log::warn!("Clipboard handle cast failed: {:?}", err),
Err(err) => {
log::warn!("Clipboard handle cast failed: {:?}", err);
error_actions.report(format!(
"Clipboard handle cast failed: {:?}",
err
));
}
}
}
Ok(_) => {
log::warn!("Clipboard API undefined on navigator");
error_actions.report("Clipboard API undefined".to_string());
}
Err(err) => {
log::warn!("Clipboard lookup failed: {:?}", err);
error_actions.report(format!(
"Clipboard lookup failed: {:?}",
err
));
}
Err(err) => log::warn!("Clipboard lookup failed: {:?}", err),
}
} else {
log::warn!("Clipboard copy skipped: no window available");
error_actions.report(
"Clipboard copy skipped: no window available".to_string(),
);
}
});
},
@ -123,6 +150,7 @@ pub fn ConnectionPanel() -> Element {
placeholder: "Paste peer ID",
value: "{remote_id_value}",
oninput: move |event| {
errors_for_input.clear();
(set_remote_id_action.clone())(event.value());
}
}

View File

@ -0,0 +1,24 @@
use std::rc::Rc;
#[derive(Clone)]
pub struct ErrorActions {
report_fn: Rc<dyn Fn(String)>,
clear_fn: Rc<dyn Fn()>,
}
impl ErrorActions {
pub fn new(report_fn: Rc<dyn Fn(String)>, clear_fn: Rc<dyn Fn()>) -> Self {
Self {
report_fn,
clear_fn,
}
}
pub fn report(&self, message: impl Into<String>) {
(self.report_fn.clone())(message.into());
}
pub fn clear(&self) {
(self.clear_fn.clone())();
}
}

View File

@ -1,9 +1,11 @@
mod call_controls;
mod connection_panel;
mod error_actions;
mod status_display;
mod voice_channel;
pub use call_controls::CallControls;
pub use connection_panel::ConnectionPanel;
pub use error_actions::ErrorActions;
pub use status_display::StatusDisplay;
pub use voice_channel::VoiceChannelLayout;

View File

@ -5,11 +5,15 @@ use dioxus::prelude::*;
pub fn StatusDisplay() -> Element {
let service = use_signaling();
let state = service.state.clone();
let actions = service.actions.clone();
let connected = *state.connected.read();
let ws_status = state.ws_status.read().clone();
let in_call = *state.in_call.read();
let hint_text = format!(
let error_message = state.last_error.read().clone();
let clear_error = actions.clear_error.clone();
let default_hint = format!(
"{} · {}",
ws_status,
if in_call {
@ -26,7 +30,18 @@ pub fn StatusDisplay() -> Element {
class: if connected { "status-widget__value status-widget__value--online" } else { "status-widget__value status-widget__value--offline" },
if connected { "Online" } else { "Offline" }
}
span { class: "status-widget__hint", "{hint_text}" }
if let Some(err) = error_message.clone() {
span { class: "status-widget__hint status-widget__hint--error", "⚠️ {err}" }
button {
class: "status-widget__dismiss",
onclick: move |_| {
(clear_error.clone())();
},
"Dismiss"
}
} else {
span { class: "status-widget__hint", "{default_hint}" }
}
}
}
}

View File

@ -1,10 +1,11 @@
use std::rc::Rc;
use std::{cell::RefCell, rc::Rc};
use crate::{
config::Config, constants::DEFAULT_SIGNALING_URL, models::SignalingMessage, utils::MediaManager,
};
use dioxus::prelude::*;
use futures::StreamExt;
use gloo_timers::future::TimeoutFuture;
use wasm_bindgen::{prelude::Closure, JsCast, JsValue};
use wasm_bindgen_futures::spawn_local;
use web_sys::{
@ -24,6 +25,8 @@ pub struct SignalingState {
pub pending_answer: Signal<Option<String>>,
pub mic_granted: Signal<bool>,
pub in_call: Signal<bool>,
pub audio_muted: Signal<bool>,
pub last_error: Signal<Option<String>>,
}
#[derive(Clone)]
@ -33,6 +36,9 @@ pub struct SignalingActions {
pub request_microphone: Rc<dyn Fn()>,
pub start_call: Rc<dyn Fn()>,
pub leave_call: Rc<dyn Fn()>,
pub toggle_mute: Rc<dyn Fn()>,
pub report_error: Rc<dyn Fn(String)>,
pub clear_error: Rc<dyn Fn()>,
}
#[derive(Clone)]
@ -45,6 +51,11 @@ pub fn use_signaling() -> SignalingService {
use_context::<SignalingService>()
}
fn retry_delay_ms(attempt: u32) -> u32 {
let capped = attempt.min(5);
1000 * (1u32 << capped)
}
#[derive(Props, Clone, PartialEq)]
pub struct SignalingProviderProps {
pub children: Element,
@ -63,6 +74,10 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let pending_answer = use_signal(|| None::<String>);
let mic_granted = use_signal(|| false);
let in_call = use_signal(|| false);
let audio_muted = use_signal(|| false);
let last_error = use_signal(|| None::<String>);
let reconnect_attempts = use_signal(|| 0u32);
let connect_slot: Rc<RefCell<Option<Rc<dyn Fn()>>>> = Rc::new(RefCell::new(None));
let cfg_signal: Signal<Config> = use_context();
@ -72,6 +87,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let peer_id = peer_id.clone();
let pending_answer_signal = pending_answer.clone();
let initiator_connection = initiator_connection.clone();
let last_error_signal = last_error.clone();
use_coroutine(move |mut rx| async move {
while let Some(msg) = rx.next().await {
let SignalingMessage {
@ -169,6 +185,9 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
}
Err(e) => {
log::error!("❌ Responder PeerConnection-Fehler: {}", e);
let mut err_handle = last_error_signal.clone();
err_handle
.set(Some(format!("Peer connection error (responder): {}", e)));
continue;
}
}
@ -194,22 +213,32 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
}
}
}
Err(e) => log::error!("❌ Responder Answer-Fehler: {}", e),
Err(e) => {
log::error!("❌ Responder Answer-Fehler: {}", e);
let mut err_handle = last_error_signal.clone();
err_handle.set(Some(format!("Failed to generate answer: {}", e)));
}
}
} else if msg_type == "answer" {
log::info!("🔀 Answer empfangen - leite an Initiator-PeerConnection weiter");
let data_clone = data.clone();
if let Some(pc) = initiator_connection.read().as_ref() {
let pc_clone = pc.clone();
let err_signal = last_error_signal.clone();
spawn_local(async move {
match MediaManager::handle_answer(&pc_clone, &data_clone).await {
Ok(_) => {
log::info!("✅ Answer erfolgreich gesetzt auf Initiator-PC")
}
Err(e) => log::error!(
"❌ Fehler beim Setzen der Answer auf Initiator-PC: {}",
e
),
Err(e) => {
log::error!(
"❌ Fehler beim Setzen der Answer auf Initiator-PC: {}",
e
);
let mut handle = err_signal.clone();
handle
.set(Some(format!("Failed to apply remote answer: {}", e)));
}
}
});
} else {
@ -221,24 +250,40 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let data_clone = data.clone();
if let Some(pc) = initiator_connection.read().as_ref() {
let pc_clone = pc.clone();
let err_signal = last_error_signal.clone();
spawn_local(async move {
match MediaManager::add_ice_candidate(&pc_clone, &data_clone) {
Ok(_) => log::info!("✅ Kandidat zur Initiator-PC hinzugefügt"),
Err(e) => log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
),
Err(e) => {
log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to add ICE candidate (initiator): {}",
e
)));
}
}
});
} else if let Some(pc) = peer_connection.read().as_ref() {
let pc_clone = pc.clone();
let err_signal = last_error_signal.clone();
spawn_local(async move {
match MediaManager::add_ice_candidate(&pc_clone, &data_clone) {
Ok(_) => log::info!("✅ Kandidat zur Responder-PC hinzugefügt"),
Err(e) => log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
),
Err(e) => {
log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to add ICE candidate (responder): {}",
e
)));
}
}
});
} else {
@ -259,6 +304,9 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let cfg_signal_handle = cfg_signal.clone();
let initiator_connection_signal = initiator_connection.clone();
let responder_connection_signal = responder_connection.clone();
let last_error_signal = last_error.clone();
let reconnect_attempts_signal = reconnect_attempts.clone();
let connect_slot_ref = connect_slot.clone();
Rc::new(move || {
let mut ws_status_handle = ws_status_signal.clone();
@ -267,12 +315,45 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let initiator_connection = initiator_connection_signal.clone();
let responder_connection = responder_connection_signal.clone();
let cfg_signal = cfg_signal_handle.clone();
let error_signal = last_error_signal.clone();
let reconnect_attempts_handle = reconnect_attempts_signal.clone();
let connect_slot_ref = connect_slot_ref.clone();
if *connected_handle.read() || websocket_handle.read().is_some() {
return;
}
ws_status_handle.set("Verbinde...".to_string());
let attempt_index = *reconnect_attempts_handle.read();
if attempt_index == 0 {
ws_status_handle.set("Verbinde...".to_string());
} else {
ws_status_handle.set(format!("Verbinde... (Versuch {})", attempt_index + 1));
}
let mut err_handle = error_signal.clone();
err_handle.set(None);
let schedule_retry: Rc<dyn Fn()> = {
let reconnect_attempts_handle = reconnect_attempts_handle.clone();
let ws_status_signal = ws_status_signal.clone();
let connect_slot_ref = connect_slot_ref.clone();
Rc::new(move || {
let attempt = *reconnect_attempts_handle.read();
let delay_ms = retry_delay_ms(attempt);
let mut attempts_writer = reconnect_attempts_handle.clone();
attempts_writer.set(attempt.saturating_add(1));
let seconds = (delay_ms / 1000).max(1);
let mut status_writer = ws_status_signal.clone();
status_writer.set(format!("Reconnect in {}s...", seconds));
let connect_slot_clone = connect_slot_ref.clone();
spawn_local(async move {
TimeoutFuture::new(delay_ms).await;
if let Some(cb) = connect_slot_clone.borrow().as_ref() {
cb();
}
});
})
};
let endpoint = cfg_signal.read().server.signaling_url.trim().to_string();
let target = if endpoint.is_empty() {
@ -289,45 +370,101 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let mut ws_status_clone = ws_status_handle.clone();
let mut connected_clone = connected_handle.clone();
let error_for_open = error_signal.clone();
let reconnect_for_open = reconnect_attempts_handle.clone();
let onopen = Closure::wrap(Box::new(move |_: web_sys::Event| {
log::info!("✅ WebSocket verbunden!");
ws_status_clone.set("Verbunden".to_string());
connected_clone.set(true);
let mut handle = error_for_open.clone();
handle.set(None);
let mut reset_attempts = reconnect_for_open.clone();
reset_attempts.set(0);
})
as Box<dyn FnMut(web_sys::Event)>);
let mut ws_status_clone2 = ws_status_handle.clone();
let mut connected_clone2 = connected_handle.clone();
let error_for_close = error_signal.clone();
let websocket_for_close = websocket_signal.clone();
let schedule_on_close = schedule_retry.clone();
let onclose = Closure::wrap(Box::new(move |_: web_sys::CloseEvent| {
log::warn!("❌ WebSocket getrennt");
ws_status_clone2.set("Getrennt".to_string());
connected_clone2.set(false);
let mut handle = error_for_close.clone();
handle.set(Some("Verbindung zum Signaling-Server beendet".to_string()));
let mut ws_handle = websocket_for_close.clone();
ws_handle.set(None);
schedule_on_close();
})
as Box<dyn FnMut(web_sys::CloseEvent)>);
let error_for_event = error_signal.clone();
let websocket_for_error = websocket_signal.clone();
let ws_status_for_error = ws_status_handle.clone();
let connected_for_error = connected_handle.clone();
let schedule_on_error = schedule_retry.clone();
let onerror = Closure::wrap(Box::new(move |_: web_sys::Event| {
log::error!("❌ WebSocket Fehler gemeldet");
let mut handle = error_for_event.clone();
handle.set(Some("Signaling-Server Fehler".to_string()));
let mut status = ws_status_for_error.clone();
status.set("Fehler".to_string());
let mut connected_signal = connected_for_error.clone();
connected_signal.set(false);
let mut ws_handle = websocket_for_error.clone();
if ws_handle.read().is_some() {
ws_handle.set(None);
schedule_on_error();
}
})
as Box<dyn FnMut(web_sys::Event)>);
let offer_tx = offer_handler.clone();
let initiator_for_router = initiator_connection.clone();
let responder_for_router = responder_connection.clone();
let pending_answer_signal = pending_answer.clone();
let error_for_message = error_signal.clone();
let onmessage = Closure::wrap(Box::new(move |e: MessageEvent| {
if let Some(text) = e.data().as_string() {
log::info!("📨 WebSocket Nachricht: {}", text);
if let Ok(msg) = serde_json::from_str::<SignalingMessage>(&text) {
match msg.msg_type.as_str() {
match serde_json::from_str::<SignalingMessage>(&text) {
Ok(msg) => match msg.msg_type.as_str() {
"offer" => offer_tx.send(msg),
"answer" => {
let data_clone = msg.data.clone();
if let Some(pc) = initiator_for_router.read().as_ref() {
let pc_clone = pc.clone();
let err_signal = error_for_message.clone();
spawn_local(async move {
match MediaManager::handle_answer(&pc_clone, &data_clone).await {
Ok(_) => log::info!("✅ Answer erfolgreich gesetzt auf Initiator-PC"),
Err(e) => log::error!("❌ Fehler beim Setzen der Answer auf Initiator-PC: {}", e),
match MediaManager::handle_answer(
&pc_clone,
&data_clone,
)
.await
{
Ok(_) => log::info!(
"✅ Answer erfolgreich gesetzt auf Initiator-PC"
),
Err(e) => {
log::error!(
"❌ Fehler beim Setzen der Answer auf Initiator-PC: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to apply remote answer: {}",
e
)));
}
}
});
} else {
log::warn!("⚠️ Keine Initiator-PeerConnection vorhanden - buffer Answer");
log::warn!(
"⚠️ Keine Initiator-PeerConnection vorhanden - buffer Answer"
);
let mut pending_handle = pending_answer_signal.clone();
pending_handle.set(Some(data_clone));
}
@ -336,24 +473,58 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let data_clone = msg.data.clone();
if let Some(pc) = initiator_for_router.read().as_ref() {
let pc_clone = pc.clone();
let err_signal = error_for_message.clone();
spawn_local(async move {
match MediaManager::add_ice_candidate(&pc_clone, &data_clone) {
Ok(_) => log::info!("✅ Kandidat zur Initiator-PC hinzugefügt"),
Err(e) => log::error!("❌ Kandidat konnte nicht hinzugefügt werden: {}", e),
match MediaManager::add_ice_candidate(
&pc_clone,
&data_clone,
) {
Ok(_) => log::info!(
"✅ Kandidat zur Initiator-PC hinzugefügt"
),
Err(e) => {
log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to add ICE candidate (initiator): {}",
e
)));
}
}
});
} else if let Some(pc) =
responder_for_router.read().as_ref()
{
let pc_clone = pc.clone();
let err_signal = error_for_message.clone();
spawn_local(async move {
match MediaManager::add_ice_candidate(&pc_clone, &data_clone) {
Ok(_) => log::info!("✅ Kandidat zur Responder-PC hinzugefügt"),
Err(e) => log::error!("❌ Kandidat konnte nicht hinzugefügt werden: {}", e),
match MediaManager::add_ice_candidate(
&pc_clone,
&data_clone,
) {
Ok(_) => log::info!(
"✅ Kandidat zur Responder-PC hinzugefügt"
),
Err(e) => {
log::error!(
"❌ Kandidat konnte nicht hinzugefügt werden: {}",
e
);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to add ICE candidate (responder): {}",
e
)));
}
}
});
} else {
log::warn!("⚠️ Kein PeerConnection verfügbar, um Kandidaten hinzuzufügen");
log::warn!(
"⚠️ Kein PeerConnection verfügbar, um Kandidaten hinzuzufügen"
);
}
}
"text" => {
@ -365,6 +536,17 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
}
}
_ => log::info!("❓ Unbekannte Nachricht: {}", msg.msg_type),
},
Err(e) => {
log::error!(
"❌ Signaling Nachricht konnte nicht gelesen werden: {}",
e
);
let mut handle = error_for_message.clone();
handle.set(Some(format!(
"Received malformed signaling message: {}",
e
)));
}
}
}
@ -374,21 +556,44 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
socket.set_onopen(Some(onopen.as_ref().unchecked_ref()));
socket.set_onclose(Some(onclose.as_ref().unchecked_ref()));
socket.set_onmessage(Some(onmessage.as_ref().unchecked_ref()));
socket.set_onerror(Some(onerror.as_ref().unchecked_ref()));
onopen.forget();
onclose.forget();
onmessage.forget();
onerror.forget();
websocket_handle.set(Some(socket));
websocket_handle.set(Some(socket.clone()));
}
Err(e) => {
log::error!("❌ WebSocket Fehler: {:?}", e);
ws_status_handle.set("Verbindungsfehler".to_string());
let mut handle = error_signal.clone();
handle.set(Some(format!(
"Verbindung zum Signaling-Server fehlgeschlagen: {:?}",
e
)));
schedule_retry();
}
}
})
};
{
let mut slot = connect_slot.borrow_mut();
*slot = Some(connect_logic.clone());
}
let connect_action = {
let reconnect_attempts = reconnect_attempts.clone();
let connect_logic = connect_logic.clone();
Rc::new(move || {
let mut attempts = reconnect_attempts.clone();
attempts.set(0);
connect_logic();
})
};
use_effect({
let peer_id = peer_id.clone();
move || {
@ -403,28 +608,35 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
});
use_effect({
let connect_logic = connect_logic.clone();
let connect_action = connect_action.clone();
move || {
connect_logic();
connect_action();
}
});
{
let pending = pending_answer.clone();
let initiator_connection = initiator_connection.clone();
let last_error_signal = last_error.clone();
use_effect(move || {
if let Some(answer_sdp) = pending.read().clone() {
if let Some(pc) = initiator_connection.read().as_ref() {
let pc_clone = pc.clone();
let answer_clone = answer_sdp.clone();
let mut pending_signal = pending.clone();
let err_signal = last_error_signal.clone();
spawn_local(async move {
match MediaManager::handle_answer(&pc_clone, &answer_clone).await {
Ok(_) => log::info!(
"✅ Gepufferte Answer erfolgreich gesetzt auf Initiator-PC"
),
Err(e) => {
log::error!("❌ Fehler beim Setzen der gepufferten Answer: {}", e)
log::error!("❌ Fehler beim Setzen der gepufferten Answer: {}", e);
let mut handle = err_signal.clone();
handle.set(Some(format!(
"Failed to apply buffered remote answer: {}",
e
)));
}
}
pending_signal.set(None);
@ -447,29 +659,50 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let mic_granted = mic_granted.clone();
let local_media = local_media.clone();
let initiator_connection = initiator_connection.clone();
let audio_muted = audio_muted.clone();
let last_error = last_error.clone();
Rc::new(move || {
let mut mic_granted_signal = mic_granted.clone();
let mut local_media_signal = local_media.clone();
let initiator_connection_signal = initiator_connection.clone();
let audio_muted_signal = audio_muted.clone();
let mut error_signal = last_error.clone();
spawn(async move {
let mut manager = MediaManager::new();
match manager.request_microphone_access().await {
Ok(stream) => {
log::info!("Microphone granted");
error_signal.set(None);
mic_granted_signal.set(true);
local_media_signal.set(Some(stream.clone()));
if *audio_muted_signal.read() {
let tracks = stream.get_audio_tracks();
for idx in 0..tracks.length() {
if let Ok(track) =
tracks.get(idx).dyn_into::<web_sys::MediaStreamTrack>()
{
track.set_enabled(false);
}
}
}
if let Some(pc) = initiator_connection_signal.read().as_ref() {
if let Err(e) = MediaManager::add_stream_to_pc(pc, &stream) {
log::warn!("Failed to attach local tracks: {}", e);
error_signal
.set(Some(format!("Failed to attach local audio: {}", e)));
} else {
error_signal.set(None);
}
}
}
Err(e) => log::error!("Microphone request failed: {}", e),
Err(e) => {
log::error!("Microphone request failed: {}", e);
error_signal.set(Some(format!("Microphone access failed: {}", e)));
}
}
});
})
};
let start_call = {
let initiator_connection = initiator_connection.clone();
let websocket = websocket.clone();
@ -478,6 +711,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let local_media = local_media.clone();
let mic_granted = mic_granted.clone();
let in_call = in_call.clone();
let last_error = last_error.clone();
Rc::new(move || {
let initiator_signal = initiator_connection.clone();
let websocket_signal = websocket.clone();
@ -486,16 +720,36 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let local_media_signal = local_media.clone();
let mic_granted_signal = mic_granted.clone();
let in_call_signal = in_call.clone();
let mut error_signal = last_error.clone();
spawn(async move {
if *in_call_signal.read() {
log::info!("Call already active; ignoring start request");
error_signal.set(Some("Call already active".to_string()));
return;
}
if !*mic_granted_signal.read() {
log::warn!("Mic not granted, cannot start call");
error_signal.set(Some(
"Microphone access missing allow the mic before starting a call"
.to_string(),
));
return;
}
if websocket_signal.read().is_none() {
log::warn!("No signaling socket available for call start");
error_signal.set(Some(
"No signaling connection connect before starting a call".to_string(),
));
return;
}
if remote_id_signal.read().trim().is_empty() {
log::warn!("Remote ID missing cannot dial");
error_signal.set(Some("Target ID fehlt".to_string()));
return;
}
error_signal.set(None);
let pc = if initiator_signal.read().is_none() {
match MediaManager::create_peer_connection() {
@ -575,6 +829,8 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
}
Err(e) => {
log::error!("Failed to create initiator peer connection: {}", e);
error_signal
.set(Some(format!("Failed to create peer connection: {}", e)));
return;
}
}
@ -585,6 +841,7 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
if let Some(local) = local_media_signal.read().as_ref() {
if let Err(e) = MediaManager::add_stream_to_pc(&pc, local) {
log::warn!("Failed to attach local tracks before offer: {}", e);
error_signal.set(Some(format!("Failed to attach local audio: {}", e)));
}
}
@ -606,10 +863,21 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
);
let mut in_call_writer = in_call_signal.clone();
in_call_writer.set(true);
error_signal.set(None);
} else {
error_signal.set(Some("Failed to encode offer".to_string()));
}
} else {
error_signal.set(Some(
"No signaling connection connect before starting a call"
.to_string(),
));
}
}
Err(e) => log::error!("Offer creation failed: {}", e),
Err(e) => {
log::error!("Offer creation failed: {}", e);
error_signal.set(Some(format!("Failed to create offer: {}", e)));
}
}
});
})
@ -619,12 +887,14 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
let initiator_connection = initiator_connection.clone();
let in_call = in_call.clone();
let mic_granted = mic_granted.clone();
let audio_muted = audio_muted.clone();
let local_media = local_media.clone();
Rc::new(move || {
let mut initiator_signal = initiator_connection.clone();
let mut in_call_signal = in_call.clone();
let mut local_media_signal = local_media.clone();
let mut mic_granted_signal = mic_granted.clone();
let mut audio_muted_signal = audio_muted.clone();
if *in_call_signal.read() {
if let Some(pc) = initiator_signal.read().as_ref() {
@ -645,6 +915,43 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
}
mic_granted_signal.set(false);
in_call_signal.set(false);
audio_muted_signal.set(false);
})
};
let toggle_mute = {
let local_media = local_media.clone();
let audio_muted = audio_muted.clone();
Rc::new(move || {
let mut muted_signal = audio_muted.clone();
let media_signal = local_media.clone();
let target_state = !*muted_signal.read();
muted_signal.set(target_state);
let current_stream = media_signal.read().as_ref().cloned();
if let Some(stream) = current_stream {
let tracks = stream.get_audio_tracks();
for idx in 0..tracks.length() {
if let Ok(track) = tracks.get(idx).dyn_into::<web_sys::MediaStreamTrack>() {
track.set_enabled(!target_state);
}
}
}
})
};
let report_error = {
let last_error = last_error.clone();
Rc::new(move |message: String| {
let mut handle = last_error.clone();
handle.set(Some(message));
})
};
let clear_error = {
let last_error = last_error.clone();
Rc::new(move || {
let mut handle = last_error.clone();
handle.set(None);
})
};
@ -661,13 +968,18 @@ pub fn SignalingProvider(props: SignalingProviderProps) -> Element {
pending_answer: pending_answer.clone(),
mic_granted: mic_granted.clone(),
in_call: in_call.clone(),
audio_muted: audio_muted.clone(),
last_error: last_error.clone(),
},
actions: SignalingActions {
connect: connect_logic.clone(),
connect: connect_action.clone(),
set_remote_id,
request_microphone,
start_call,
leave_call,
toggle_mute,
report_error,
clear_error,
},
};