weave
weave copied to clipboard
feat(integration): Add Anthropic Messages.stream support
Antrhopic has 2 types of streaming: One by event and another by messages.
We are missing the Messages.stream support, this PR tries to add this. On this mode, the antrhopic library gives you back a ContextManager:
class MessageStreamManager:
"""Wrapper over MessageStream that is returned by `.stream()`.
```py
with client.messages.stream(...) as stream:
for chunk in stream:
...
```
"""
def __init__(
self,
api_request: Callable[[], Stream[RawMessageStreamEvent]],
) -> None:
self.__stream: MessageStream | None = None
self.__api_request = api_request
def __enter__(self) -> MessageStream:
raw_stream = self.__api_request()
self.__stream = MessageStream(
cast_to=raw_stream._cast_to,
response=raw_stream.response,
client=raw_stream._client,
)
return self.__stream
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
exc_tb: TracebackType | None,
) -> None:
if self.__stream is not None:
self.__stream.close()
I am curious if patching the _IteratorWrapper with this hack:
def __enter__(self) -> "_IteratorWrapper":
if hasattr(self._iterator, "__enter__"):
return self._iterator.__enter__()
return self
it is a good idea.
cc @tssweeney
Preview this PR with FeatureBee: https://beta.wandb.ai/?betaVersion=518d65fd364e6ac4a93a86cb0585e0882faf9f41
Ok, now it is working but...
- Had to change some things in the
accumulator.pylike being able to subclass the_IteratorWrapper - This is kind of brittle, but I suppose all the integrations are brittle.
- Couldn't find a way to deal with all the possible streaming scenarios that this lib gives
I think this is ready for review @tssweeney