chatgpt_rs
chatgpt_rs copied to clipboard
1.2.3 introduced a regression on streams.
when async processing streams with 1.2.3
I often end up in a panic whereas with 1.1.3
everything works fine
async fn process_message_stream(client: ChatGPT, prompt: &str) -> chatgpt::Result<String> {
let stream = client.send_message_streaming(prompt).await?;
// Wrapping the buffer in an Arc and Mutex
let buffer = Arc::new(Mutex::new(Vec::<String>::new()));
// Iterating over stream contents
stream
.for_each({
// Cloning the Arc to be moved into the outer move closure
let buffer = Arc::clone(&buffer);
move |each| {
// Cloning the Arc again to be moved into the async block
let buffer_clone = Arc::clone(&buffer);
async move {
match each {
ResponseChunk::Content { delta, response_index: _ } => {
// Printing part of response without the newline
print!("{delta}");
// print!(".");
// Manually flushing the standard output, as `print` macro does not do
// that
stdout().lock().flush().unwrap();
// Appending delta to buffer
let mut locked_buffer = buffer_clone.lock().unwrap();
locked_buffer.push(delta);
},
_ => {},
}
}
}
})
.await;
// Use buffer outside of for_each, by locking and dereferencing
let final_buffer = buffer.lock().unwrap();
Ok(final_buffer.join(""))
}
Stream closed abruptly!: Transport(reqwest::Error { kind: Body, source: TimedOut })
stack backtrace:
0: rust_begin_unwind
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
1: core::panicking::panic_fmt
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
2: core::result::unwrap_failed
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/result.rs:1653:5
3: core::result::Result<T,E>::expect
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/result.rs:1034:23
4: chatgpt::client::ChatGPT::process_streaming_response::{{closure}}::{{closure}}
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/chatgpt_rs-1.2.3/./src/client.rs:301:34
5: <T as futures_util::fns::FnMut1<A>>::call_mut
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/fns.rs:28:9
6: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next::{{closure}}
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/map.rs:59:33
7: core::option::Option<T>::map
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/option.rs:1072:29
8: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/map.rs:59:21
9: <futures_util::stream::stream::for_each::ForEach<St,Fut,F> as core::future::future::Future>::poll
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/for_each.rs:70:47
10: tldw::summarizer::process_message_stream::{{closure}}
at ./src/summarizer.rs:67:4
11: tldw::summarizer::process_short_input::{{closure}}
at ./src/summarizer.rs:109:59
12: tldw::main::{{closure}}
at ./src/main.rs:76:69
13: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:63
14: tokio::runtime::coop::with_budget
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:107:5
15: tokio::runtime::coop::budget
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:73:5
16: tokio::runtime::park::CachedParkThread::block_on
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:31
17: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/blocking.rs:66:9
18: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
19: tokio::runtime::context::runtime::enter_runtime
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/runtime.rs:65:16
20: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
21: tokio::runtime::runtime::Runtime::block_on
at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/runtime.rs:350:45
22: tldw::main
at ./src/main.rs:88:2
23: core::ops::function::FnOnce::call_once
at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
fm:tldw [master]$