Cannot convert langchain's ReadableStream to node ReadableStream
Checked other resources
- [X] I added a very descriptive title to this issue.
- [X] I searched the LangChain.js documentation with the integrated search.
- [X] I used the GitHub search to find a similar question and didn't find it.
- [X] I am sure that this is a bug in LangChain.js rather than my code.
- [ ] The bug is not resolved by updating to the latest stable version of LangChain (or the specific integration package).
Example Code
import { RunnableSequence } from "@langchain/core/runnables";
import { ReadableStream } from "node:stream/web";
let answer_stream = ReadableStream.from(["default", "fallback", "stream"]);
if (req.app.locals.llm_chain) {
const llm_chain: RunnableSequence<string, string> = req.app.locals.llm_chain;
answer_stream = await llm_chain.stream("Sample question")
}
Error Message and Stack Trace (if applicable)
Property 'values' is missing in type 'IterableReadableStream<string>' but required in type 'ReadableStream<string>'.ts(2741) web.d.ts(154, 9): 'values' is declared here.
Description
In my business logic, I am trying to extract a stream from langchain, but also have a node ReadableStream as backup in certain cases.
However, it seems langchain's ReadableStream coming from lib.dom.d.ts in typescript/lib is not compatible with node's ReadableStream coming from web.d.ts in node:stream/web . It's missing values method.
Here's the ReadableStream from node:stream/web
/** This Streams API interface represents a readable stream of byte data. */
interface ReadableStream<R = any> {
readonly locked: boolean;
cancel(reason?: any): Promise<void>;
getReader(): ReadableStreamDefaultReader<R>;
getReader(options: { mode: "byob" }): ReadableStreamBYOBReader;
pipeThrough<T>(transform: ReadableWritablePair<T, R>, options?: StreamPipeOptions): ReadableStream<T>;
pipeTo(destination: WritableStream<R>, options?: StreamPipeOptions): Promise<void>;
tee(): [ReadableStream<R>, ReadableStream<R>];
values(options?: { preventCancel?: boolean }): AsyncIterableIterator<R>;
[Symbol.asyncIterator](): AsyncIterableIterator<R>;
}
And here's ReadableStream from typescript/lib which langchain depends on -
/**
* This Streams API interface represents a readable stream of byte data. The Fetch API offers a concrete instance of a ReadableStream through the body property of a Response object.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream)
*/
interface ReadableStream<R = any> {
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/locked) */
readonly locked: boolean;
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/cancel) */
cancel(reason?: any): Promise<void>;
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/getReader) */
getReader(options: { mode: "byob" }): ReadableStreamBYOBReader;
getReader(): ReadableStreamDefaultReader<R>;
getReader(options?: ReadableStreamGetReaderOptions): ReadableStreamReader<R>;
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/pipeThrough) */
pipeThrough<T>(transform: ReadableWritablePair<T, R>, options?: StreamPipeOptions): ReadableStream<T>;
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/pipeTo) */
pipeTo(destination: WritableStream<R>, options?: StreamPipeOptions): Promise<void>;
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/ReadableStream/tee) */
tee(): [ReadableStream<R>, ReadableStream<R>];
}
System Info
node -v v20.12.1
Windows
Hello, @muditjai! I'm here to assist with your issue and help you become a contributor.
To resolve the compatibility issue between LangChain's ReadableStream and Node's ReadableStream, you can use the IterableReadableStreamV0 class provided in the @langchain/core/utils/stream file. This class extends the ReadableStream and implements the AsyncIterable interface, ensuring compatibility across different environments.
Here's an example of how you can use the IterableReadableStreamV0 class to wrap a Node ReadableStream:
import { IterableReadableStreamV0 } from '@langchain/core/utils/stream';
// Assuming you have a Node ReadableStream instance
const nodeReadableStream = getNodeReadableStream(); // Replace with your actual Node ReadableStream
// Wrap the Node ReadableStream with IterableReadableStreamV0
const iterableStream = IterableReadableStreamV0.fromReadableStream(nodeReadableStream);
// Now you can use async iteration with the iterableStream
for await (const chunk of iterableStream) {
console.log(chunk);
}
This approach ensures that the ReadableStream can be used with async iteration syntax, making it compatible with both LangChain and Node environments. Additionally, the convertEventStreamToIterableReadableDataStream function in the @langchain/core/utils/stream file can be used to convert an event stream into an iterable readable data stream, further enhancing compatibility [1][2][3].
To continue talking to Dosu, mention @dosu.
Thanks for the heads up - will have a look as soon as possible
Having some trouble figuring this out :(
Is it just the types? Does it work if you just type it as any?
Thanks for taking a look.
It's both an issue with type conversion and missing support for .values() method.
See the stream.values({ preventCancel: true }) example in https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream, without values() we can't read the stream in 2 separate for loops. In my use case, I need to read first k characters in a loop, do some processing on it and then continue reading rest of stream. Currently I'm implementing this in a while loop using .read() on a reader instance obtained through getReader().
The type conversion I was able to workaround by typing it as any.