chatgpt_rs icon indicating copy to clipboard operation
chatgpt_rs copied to clipboard

1.2.3 introduced a regression on streams.

Open fmeyer opened this issue 1 year ago • 0 comments

when async processing streams with 1.2.3 I often end up in a panic whereas with 1.1.3 everything works fine

async fn process_message_stream(client: ChatGPT, prompt: &str) -> chatgpt::Result<String> {
	let stream = client.send_message_streaming(prompt).await?;

	// Wrapping the buffer in an Arc and Mutex
	let buffer = Arc::new(Mutex::new(Vec::<String>::new()));

	// Iterating over stream contents
	stream
		.for_each({
			// Cloning the Arc to be moved into the outer move closure
			let buffer = Arc::clone(&buffer);
			move |each| {
				// Cloning the Arc again to be moved into the async block
				let buffer_clone = Arc::clone(&buffer);
				async move {
					match each {
						ResponseChunk::Content { delta, response_index: _ } => {
							// Printing part of response without the newline
							print!("{delta}");
							// print!(".");
							// Manually flushing the standard output, as `print` macro does not do
							// that
							stdout().lock().flush().unwrap();
							// Appending delta to buffer
							let mut locked_buffer = buffer_clone.lock().unwrap();
							locked_buffer.push(delta);
						},
						_ => {},
					}
				}
			}
		})
		.await;

	// Use buffer outside of for_each, by locking and dereferencing
	let final_buffer = buffer.lock().unwrap();

	Ok(final_buffer.join(""))
}
Stream closed abruptly!: Transport(reqwest::Error { kind: Body, source: TimedOut })
stack backtrace:
   0: rust_begin_unwind
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/std/src/panicking.rs:645:5
   1: core::panicking::panic_fmt
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/panicking.rs:72:14
   2: core::result::unwrap_failed
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/result.rs:1653:5
   3: core::result::Result<T,E>::expect
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/result.rs:1034:23
   4: chatgpt::client::ChatGPT::process_streaming_response::{{closure}}::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/chatgpt_rs-1.2.3/./src/client.rs:301:34
   5: <T as futures_util::fns::FnMut1<A>>::call_mut
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/fns.rs:28:9
   6: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/map.rs:59:33
   7: core::option::Option<T>::map
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/option.rs:1072:29
   8: <futures_util::stream::stream::map::Map<St,F> as futures_core::stream::Stream>::poll_next
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/map.rs:59:21
   9: <futures_util::stream::stream::for_each::ForEach<St,Fut,F> as core::future::future::Future>::poll
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/futures-util-0.3.28/src/stream/stream/for_each.rs:70:47
  10: tldw::summarizer::process_message_stream::{{closure}}
             at ./src/summarizer.rs:67:4
  11: tldw::summarizer::process_short_input::{{closure}}
             at ./src/summarizer.rs:109:59
  12: tldw::main::{{closure}}
             at ./src/main.rs:76:69
  13: tokio::runtime::park::CachedParkThread::block_on::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:63
  14: tokio::runtime::coop::with_budget
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:107:5
  15: tokio::runtime::coop::budget
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/coop.rs:73:5
  16: tokio::runtime::park::CachedParkThread::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/park.rs:282:31
  17: tokio::runtime::context::blocking::BlockingRegionGuard::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/blocking.rs:66:9
  18: tokio::runtime::scheduler::multi_thread::MultiThread::block_on::{{closure}}
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:87:13
  19: tokio::runtime::context::runtime::enter_runtime
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/context/runtime.rs:65:16
  20: tokio::runtime::scheduler::multi_thread::MultiThread::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/scheduler/multi_thread/mod.rs:86:9
  21: tokio::runtime::runtime::Runtime::block_on
             at /home/fm/.cargo/registry/src/index.crates.io-6f17d22bba15001f/tokio-1.33.0/src/runtime/runtime.rs:350:45
  22: tldw::main
             at ./src/main.rs:88:2
  23: core::ops::function::FnOnce::call_once
             at /rustc/82e1608dfa6e0b5569232559e3d385fea5a93112/library/core/src/ops/function.rs:250:5
note: Some details are omitted, run with `RUST_BACKTRACE=full` for a verbose backtrace.
fm:tldw [master]$

fmeyer avatar Dec 28 '23 20:12 fmeyer