use actix_web::{web, App, Error, HttpRequest, HttpResponse, HttpServer, Result}; use futures_util::StreamExt; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::broadcast; #[derive(Clone)] struct AppState { broadcaster: Arc> } #[derive(Debug, Serialize, Deserialize, Clone)] struct SignalingMessage { from: String, to: String, msg_type: String, // "offer", "answer", "ice-candidate", "text", "screen-share" data: String // SDP, ICE, Text, Screen-Share metadata } #[actix_web::main] async fn main() -> std::io::Result<()> { env_logger::init(); let (tx, _rx) = broadcast::channel::(1000); let app_state = AppState { broadcaster: Arc::new(tx) }; println!("Server is running on http://localhost:8080"); HttpServer::new(move || { App::new() .app_data(web::Data::new(app_state.clone())) .route("/ws", web::get().to(websocket_handler)) }) .bind("127.0.0.1:8080")? .run() .await } async fn websocket_handler( req: HttpRequest, body: web::Payload, data: web::Data ) -> Result { let (response, mut session, mut msg_stream) = actix_ws::handle(&req, body)?; let broadcaster = data.broadcaster.clone(); let mut rx = broadcaster.subscribe(); // Handle incoming messages let broadcaster_clone = broadcaster.clone(); actix_web::rt::spawn(async move { while let Some(Ok(msg)) = msg_stream.next().await { match msg { actix_ws::Message::Text(text) => { println!("Received text message: {}", text); if let Ok(signal_msg) = serde_json::from_str::(&text) { // Broadcast the message to all subscribers (group chat) // or handle private messages based on `to` field let _ = broadcaster_clone.send(signal_msg); } } actix_ws::Message::Close(_) => break, _ => {}, } } }); // Handle outgoing messages actix_web::rt::spawn(async move { while let Ok(msg) = rx.recv().await { if let Ok(json) = serde_json::to_string(&msg) { if session.text(json).await.is_err() { break; } } } }); Ok(response) }