niom-turn/tests/support/stream.rs

79 lines
2.3 KiB
Rust

#![allow(dead_code)]
use std::io;
use niom_turn::models::stun::StunMessage;
use niom_turn::stun::parse_message;
use tokio::io::AsyncRead;
use tokio::io::AsyncReadExt;
#[derive(Debug)]
pub enum StreamFrame {
Stun(StunMessage),
ChannelData { channel: u16, payload: Vec<u8> },
}
#[derive(Default)]
pub struct StreamFramer {
buffer: Vec<u8>,
}
impl StreamFramer {
pub fn new() -> Self {
Self { buffer: Vec::new() }
}
fn try_pop_next(&mut self) -> Option<io::Result<StreamFrame>> {
if self.buffer.len() < 4 {
return None;
}
// ChannelData: channel number 0x4000..=0x7FFF (top bits 01)
let channel = u16::from_be_bytes([self.buffer[0], self.buffer[1]]);
if (channel & 0xC000) == 0x4000 {
let len = u16::from_be_bytes([self.buffer[2], self.buffer[3]]) as usize;
let total = 4 + len;
if self.buffer.len() < total {
return None;
}
let frame = self.buffer.drain(..total).collect::<Vec<u8>>();
return Some(Ok(StreamFrame::ChannelData {
channel,
payload: frame[4..].to_vec(),
}));
}
// STUN over stream: 20 byte header + length.
if self.buffer.len() < 20 {
return None;
}
let len = u16::from_be_bytes([self.buffer[2], self.buffer[3]]) as usize;
let total = 20 + len;
if self.buffer.len() < total {
return None;
}
let chunk = self.buffer.drain(..total).collect::<Vec<u8>>();
match parse_message(&chunk) {
Ok(msg) => Some(Ok(StreamFrame::Stun(msg))),
Err(e) => Some(Err(io::Error::new(io::ErrorKind::InvalidData, format!(
"parse stun: {e:?}"
)))),
}
}
pub async fn read_frame<R: AsyncRead + Unpin>(&mut self, reader: &mut R) -> io::Result<StreamFrame> {
loop {
if let Some(frame) = self.try_pop_next() {
return frame;
}
let mut tmp = [0u8; 4096];
let n = reader.read(&mut tmp).await?;
if n == 0 {
return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "stream closed"));
}
self.buffer.extend_from_slice(&tmp[..n]);
}
}
}