#![allow(dead_code)] use std::io; use niom_turn::models::stun::StunMessage; use niom_turn::stun::parse_message; use tokio::io::AsyncRead; use tokio::io::AsyncReadExt; #[derive(Debug)] pub enum StreamFrame { Stun(StunMessage), ChannelData { channel: u16, payload: Vec }, } #[derive(Default)] pub struct StreamFramer { buffer: Vec, } impl StreamFramer { pub fn new() -> Self { Self { buffer: Vec::new() } } fn try_pop_next(&mut self) -> Option> { if self.buffer.len() < 4 { return None; } // ChannelData: channel number 0x4000..=0x7FFF (top bits 01) let channel = u16::from_be_bytes([self.buffer[0], self.buffer[1]]); if (channel & 0xC000) == 0x4000 { let len = u16::from_be_bytes([self.buffer[2], self.buffer[3]]) as usize; let total = 4 + len; if self.buffer.len() < total { return None; } let frame = self.buffer.drain(..total).collect::>(); return Some(Ok(StreamFrame::ChannelData { channel, payload: frame[4..].to_vec(), })); } // STUN over stream: 20 byte header + length. if self.buffer.len() < 20 { return None; } let len = u16::from_be_bytes([self.buffer[2], self.buffer[3]]) as usize; let total = 20 + len; if self.buffer.len() < total { return None; } let chunk = self.buffer.drain(..total).collect::>(); match parse_message(&chunk) { Ok(msg) => Some(Ok(StreamFrame::Stun(msg))), Err(e) => Some(Err(io::Error::new(io::ErrorKind::InvalidData, format!( "parse stun: {e:?}" )))), } } pub async fn read_frame(&mut self, reader: &mut R) -> io::Result { loop { if let Some(frame) = self.try_pop_next() { return frame; } let mut tmp = [0u8; 4096]; let n = reader.read(&mut tmp).await?; if n == 0 { return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "stream closed")); } self.buffer.extend_from_slice(&tmp[..n]); } } }