Run generator.__anext__ in the same task to avoid anyio RuntimeError
When using anyio cancel scopes inside an observed async generator, we were getting
RuntimeError: Attempted to exit cancel scope in a different task than it was entered in
Because the __anext__ call was happening in a new task.
To avoid this, manually restore the context when calling __anext__.
Related: https://github.com/pydantic/pydantic-ai/issues/2818
[!IMPORTANT] Fixes
RuntimeErrorin_ContextPreservedAsyncGeneratorWrapper.__anext__()by manually restoring context to ensure it runs in the same task.
- Behavior:
- Fixes
RuntimeErrorin__anext__()of_ContextPreservedAsyncGeneratorWrapperby manually restoring context to ensure it runs in the same task.- Removes use of
asyncio.create_taskfor__anext__to prevent task context issues with AnyIO.- Misc:
- Adds context restoration logic in
finallyblock of__anext__()to reset context variables.This description was created by
for 1c919661de6433468b7b5dc037c2d1f567ce3761. You can customize this summary. It will automatically update as commits are pushed.
Disclaimer: Experimental PR review
Greptile Overview
Greptile Summary
Fixed RuntimeError when using anyio cancel scopes inside @observe-decorated async generators by manually restoring context variables instead of using asyncio.create_task().
- Replaced
asyncio.create_task(generator.__anext__(), context=self.context)with directawait generator.__anext__()call - Manually set/reset context variables using
var.set(value)andvar.reset(token)to preserve context without creating a new task - Removed Python 3.10 fallback code (try-except block) since manual context restoration works across all Python versions
- Ensures anyio cancel scopes are entered and exited in the same task, preventing "Attempted to exit cancel scope in a different task" errors
Confidence Score: 4/5
- Safe to merge with minor caveats - the fix correctly addresses the anyio RuntimeError and simplifies the code
- The implementation correctly fixes the anyio cancel scope issue by ensuring
__anext__runs in the same task. Manual context variable management usingvar.set()andvar.reset()is the standard approach for context preservation. The change also removes unnecessary Python version-specific code. Score is 4/5 rather than 5/5 due to lack of test coverage for this specific scenario and potential edge cases with context variable cleanup if exceptions occur between set and reset operations (though the finally block mitigates this) - No files require special attention - the change is localized and well-implemented
Important Files Changed
File Analysis
| Filename | Score | Overview |
|---|---|---|
| langfuse/_client/observe.py | 4/5 | Fixed anyio RuntimeError by manually restoring context instead of using asyncio.create_task, removed Python 3.10 fallback code |
Sequence Diagram
sequenceDiagram
participant User
participant Wrapper as _ContextPreservedAsyncGeneratorWrapper
participant CtxVars as contextvars
participant Generator as Async Generator
User->>Wrapper: __anext__()
activate Wrapper
Note over Wrapper,CtxVars: Manual context restoration
Wrapper->>CtxVars: Iterate over context.items()
loop For each context var
Wrapper->>CtxVars: var.set(value)
CtxVars-->>Wrapper: Return token
Wrapper->>Wrapper: Store (var, token)
end
Note over Wrapper,Generator: Execute in same task
Wrapper->>Generator: __anext__()
activate Generator
Note over Generator: anyio cancel scopes work correctly<br/>as we're in the same task
Generator-->>Wrapper: Return item
deactivate Generator
Wrapper->>Wrapper: items.append(item)
Note over Wrapper,CtxVars: Restore original context
loop For each (var, token)
Wrapper->>CtxVars: var.reset(token)
end
Wrapper-->>User: Return item
deactivate Wrapper