Added accumulator response parts and utils to accumulate from stream and normal response parts
Type
- [ ] Refactor
- [x] Feature
- [ ] Bug Fix
- [ ] Optimization
- [ ] Documentation Update
Description
This pull request adds 3 new ai response parts--TextAccumulatedPart, ReasoningAccumulatedPart, and ToolPart--and utilities to convert from streaming and normal parts to accumulated parts.
TextAccumulatedPart: same asTextPartbut withstatefield representing either"streaming"or"done"ReasoningAccumulatedPart: same asReasoningPartbut withstatefield representing either"streaming"or"done"ToolPartcombines all of the other tool parts into one part, keeping each previous context, and distinguish each state by theirvalue.statefield, where it holds values such as:"params-start": when params are started to streaming in"params-streaming": when params are streaming in"params-malformed": when params are malformed"params-done": when params are done streaming"result-error": when the tool returned an error result"result-done": when the tool call result is returned
These all accumulated parts and other necessary parts are represented by AccumulatedPart.
utilities:
mergeAccumulatedParts: merges two accumulated parts into one accumulated partsaccumulateParts: accumulate over normal parts given already existing accumulated partsaccumulateStreamParts: accumulate over stream parts given already existing accumulated parts
Related
- Related Issue #
- Closes #
🦋 Changeset detected
Latest commit: aee4a16e83eed3c2cb793532023911cef2540824
The changes in this PR will be included in the next version bump.
This PR includes changesets to release 6 packages
| Name | Type |
|---|---|
| @effect/ai | Minor |
| @effect/ai-amazon-bedrock | Major |
| @effect/ai-anthropic | Major |
| @effect/ai-google | Major |
| @effect/ai-openai | Major |
| @effect/ai-openrouter | Major |
Not sure what this means? Click here to learn what changesets are.
Click here if you're a maintainer who wants to add another changeset to this PR
@MrGeniusProgrammer - what is the concrete use case for this? Can you provide an example of how you would use it?
This is valuable to display a UI streaming part for the frontend where accumulated parts are more suited as it is necessary to get the status of the streaming part (are they finished? did it error? did the tool started streaming in?).
// rpcClient("ChatGenerate") returns a Stream.Stream<Response.StreamPart[]>
rpcClient("ChatGenerate").pipe(
// throttle the streaming response to not aggresively update the UI
Stream.throttle({
cost: Chunk.size,
duration: "250 millis",
units: 32
}),
// accumulate the stream parts
Stream.scanEffect([], Response.accumulateStreamParts),
// create a UI Assistant message with the accumulated parts
Stream.map((accumulatedParts) =>
UiAssistantMessage.make({
role: "assistant",
content: accumulatedParts
})
)
);
Gotcha ok - that makes sense. Would you be willing to add some test cases to cover these new methods? I'm trying to be more diligent about testing in the AI packages now as I add new functionality.
@IMax153 I believe discussing whether to accumulate parts based on window or by id is needed. As of right now, the current implementation goes with the former for the text and reasoning part and latter for the tool part. For example, mergeAccumulatedParts accumulates text part by merging the adjacent text parts into one text part. If a different part is found on the adjacent pair or if it has a status 'done', the text part ends. Another consideration is the order preservation of the parts. Also, what is your take on the naming of state in the TextAccumulatedPart, ReasoningAccumulatedPart, and ToolPart? Should it be renamed to status instead?
I think, in general, we should accumulate parts based upon ID given that most providers give that information to us as a "stable" way to identify linked parts.
Regarding state -> status, I like status I think more.
Sorry for being late (I was sick). I have implemented the suggested changes. Do you mind going through them?
@IMax153 let me know if there are any changes that needs to be implemented?