rodio
rodio copied to clipboard
Is there support for streaming audio?
Hi,
I have tried integrating rodio with a websocket, as and when 4096 bytes of u8 data comes in, I have tried to pass it as i16 for play_raw api. The speaker is too noisy.
let mut converted: Vec<i16> = data.into_iter().map(|i|i as i16).collect();
rodio::play_raw(&endpoint, SamplesBuffer::new(1,44100, converted).convert_samples());
The same binary stream is sent to a node js server to write to a wav file plays just fine. So the binary stream seems to be proper.
var fileWriter = new wav.FileWriter(outFile, {
channels: 1,
sampleRate: 48000,
bitDepth: 16
});
client.on('stream', function(stream, meta) {
stream.pipe(fileWriter);
Is there something I am missing here to make it work with rodio?
You might find more luck by creating your own Source that can have data appended to it after being added to the sink rather than just using the play_raw api. This will allow rodio to control the pace of playback by itself.
What do you mean exactly by "The speaker is too noisy."? Too loud?
I understand what you’re saying, maybe you can try this code
let device = rodio::default_output_device().unwrap();
let sink = Sink::new(&device);
let mut decoder = Decoder::new(File::open("audio.mp3").unwrap());
let mut count = 0;
loop {
match decoder.next_frame() {
Ok(Frame { data, sample_rate, channels, .. }) => {
println!("Decoded {} samples {} {}", data.len(), sample_rate, channels);
let buff = SamplesBuffer::new(channels as u16, sample_rate as u32, data);
sink.append(buff);
println!("{}", count);
while count > 26 {
// sleep and wait for rodio to drain a bit
count = 0;
thread::sleep(Duration::from_millis(10));
}
count += 1;
println!("finish append");
// thread::sleep(Duration::from_secs(1));
},
Err(Error::Eof) => break,
Err(e) => panic!("{:?}", e),
}
}
sink.sleep_until_end()
I'm kinda new to rust so I don't know if this is the best way of doing it, but I had success with audio streaming in a pet project of mine using https://github.com/bernhardfritz/pitunes/blob/master/pitunes_client/src/http_stream_reader.rs
Hey,
I'm a little bit late to the party. With the symphonia -Decoder i got streamin to work!
But at the moment it seams that rodio does not allow symphonia to select the audio format by it self.
Is there any reasen behind that? If not I could make a pull request.
I added this funktio to decoder/mod.rs and it seams to work for my usecase.
pub fn new_symphonia_no_hint(data: R) -> Result<Decoder<R>, DecoderError> {
let mss = MediaSourceStream::new(
Box::new(ReadSeekSource::new(data)) as Box<dyn MediaSource>,
Default::default(),
);
match symphonia::SymphoniaDecoder::new(mss, None) {
Err(e) => Err(e),
Ok(decoder) => {
return Ok(Decoder(DecoderImpl::Symphonia(decoder)));
}
}
}```
Hear is the rest of the code to get it up and running. Please be aware that I`m new to rust and this is still work in progress. Alsow the stream can not seek so any attempt will result in a panic.
use std::fs::File; use std::io::BufReader; use ringbuf::ring_buffer::RbBase; use rodio::{Decoder, OutputStream, Sink}; use tokio::sync::broadcast; use std::sync::{Arc, Mutex, mpsc}; use bytes::Bytes; use ringbuf::{Rb, StaticRb};
async fn play_url(){ let (data_sender, receiver) = mpsc::channel(); let _handle = play_chunks_in_thread(receiver); fetch_data(data_sender).await; }
async fn fetch_data(data_sender: mpsc::Senderbytes::Bytes){ let client = reqwest::Client::new();
// Make a request to the stream URL
let url = "http://regiocast.streamabc.net/regc-radiobobmittelalter7445036-mp3-192-8025500?sABC=65p6s021%230%232qqpnss01895rqr0s8oq129o03s183o0%23fgernzf.enqvbobo.qr&aw_0_1st.playerid=streams.radiobob.de&amsparams=playerid:streams.radiobob.de;skey:1707536417"; // Replace with your stream URL
let mut respons = client.get(url).send().await.expect("url error");
// Check if the request was successful
// Check if the request was successful
if !respons.status().is_success() {
panic!("Request failed with status: {}", respons.status());
}
while let Some(chunk) = respons.chunk().await.expect("chunk error") {
println!("Read {} bytes from stream", chunk.len());
data_sender.send(chunk).expect("sending failed");
}
}
pub struct WebMediaSource { receiver: Arc<Mutex<mpsc::Receiver<Bytes>>>, ring_buffer: ringbuf::StaticRb<u8, 10240>, }
impl WebMediaSource { pub fn new(receiver: mpsc::Receiver<Bytes>) -> Self { Self { receiver: Arc::new(Mutex::new(receiver)), ring_buffer: StaticRb::<u8, 10240>::default()} } }
impl symphonia::core::io::MediaSource for WebMediaSource {
fn is_seekable(&self) -> bool{
false
}
fn byte_len(&self) -> Option
impl std::io::Read for WebMediaSource {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result
match receiver.recv() {
Ok(bytes) => {
let len = *[bytes.len(), buf.len()].iter().min().unwrap();
buf[..len].copy_from_slice(&bytes[..len]);
if len < bytes.len(){
let mut writer = bytes[len..].into_iter().map(|v| *v);
self.ring_buffer.push_iter(&mut writer);
if !writer.next().is_none() {
println!("data is dropped");
}
}
return Ok(len);
}
Err(_) => {return Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "Receive timeout"));}
}
}
let start_size = self.ring_buffer.capacity() - self.ring_buffer.free_len();
let len = *[start_size, buf.len()].iter().min().unwrap();
self.ring_buffer.pop_slice(&mut buf[0..len]);
let end_size = self.ring_buffer.capacity() - self.ring_buffer.free_len();
let filled = start_size - end_size;
Ok(filled)
}
}
impl std::io::Seek for WebMediaSource {
fn seek(&mut self, _pos: std::io::SeekFrom) -> std::io::Result
fn play_chunks_in_thread(data_receiver: mpsc::Receiver<Bytes>) -> tokio::task::JoinHandle<()> {
tokio::task::spawn_blocking(
{move || {
let (_stream, stream_handle) = OutputStream::try_default().unwrap();
let source = Decoder::new_symphonia_no_hint(WebMediaSource::new(data_receiver)).unwrap();
let sink = Sink::try_new(&stream_handle).unwrap();
sink.append(source);
sink.play();
sink.sleep_until_end();
}})
}