bug: Async Tasks in CrewAI Create Separate Traces from Crew in Langfuse
Describe the bug
When I integrate Langfuse with CrewAI and use the async execution flag (async_execution=True) in CrewAI tasks, when multiple tasks are executed concurrently, these tasks generate different trace_ids, which are not the same as the current crew's trace.
langfuse 3.5.0 crewai 0.193.2
Steps to reproduce
When I integrate Langfuse with CrewAI and use the async execution flag (async_execution=True) in CrewAI tasks, when multiple tasks are executed concurrently, these tasks generate different trace_ids, which are not the same as the current crew's trace.
Langfuse Cloud or self-hosted?
Self-hosted
If self-hosted, what version are you running?
3.5.0
SDK and integration versions
python sdk
Additional information
No response
Are you interested in contributing a fix for this bug?
No
Hi @moqiaaa! I'm Dosu and I’m helping the langfuse team.
This happens because OpenTelemetry context propagation is lost across async boundaries and thread pools, especially when using async_execution=True in CrewAI. Each async task may start without the parent trace context, so Langfuse creates a new trace for each task instead of grouping them under the crew's trace. This is a known limitation with Python's contextvars and async/threaded execution, not a bug in Langfuse or CrewAI specifically [source] [source].
To work around this and ensure all async tasks share the same trace:
- Wrap the entire CrewAI workflow (including all async tasks) inside a single
start_as_current_spancontext. All tool/agent calls must execute within this context for trace continuity [source]. - Avoid mixing manual Langfuse instrumentation (like
@observeor directLangfuse()calls) with OpenTelemetry-based tracing, as this can cause double instrumentation and multiple traces [source]. - If you must use thread pools or async tasks that break context, explicitly pass the
langfuse_trace_idandlangfuse_parent_observation_idas keyword arguments to each async execution or use manual context management (start the span in the parent, pass it to the child, and end it explicitly) [source]. - CrewAI's
kickoff_asyncmay useasyncio.to_thread, which can further break context propagation [source].
If you want a concrete example or more details on context propagation helpers, let me know!
To reply, just mention @dosu.
How did I do? Good | Irrelevant | Incorrect | Verbose | Hallucination | Report 🐛 | Other
a concrete example
i need a concrete example。 thanks @DosuBot how can i get this concrete examples。
Here's a concrete example for CrewAI + Langfuse async tracing:
To ensure all async CrewAI tasks share the same trace, wrap the entire workflow (including kickoff and all async tasks) inside a single Langfuse span context. This keeps the trace context intact across async boundaries and avoids separate trace_ids for each task. For example:
from langfuse import get_client
langfuse = get_client()
with langfuse.start_as_current_span(name="Analysis Workflow") as span:
# Set helpful attributes for grouping in Langfuse UI
span.update_trace(user_id="user_123", session_id="session_abc")
# All CrewAI calls must happen inside this context
result = crew.kickoff_async(async_execution=True)
print(result)
langfuse.flush() # Ensure all events are sent in short-lived apps
If you spawn threads or use asyncio.to_thread (which CrewAI's kickoff_async may do), you must manually propagate the trace context. Pass langfuse_trace_id and langfuse_parent_observation_id as keyword arguments to each async task or thread. Example for manual propagation:
from concurrent.futures import ThreadPoolExecutor
from langfuse import get_client, observe
@observe
def execute_task(*args, langfuse_trace_id=None, langfuse_parent_observation_id=None):
# Your task logic here
pass
@observe
def execute_groups(task_args):
trace_id = get_client().get_current_trace_id()
observation_id = get_client().get_current_observation_id()
with ThreadPoolExecutor(3) as executor:
futures = [
executor.submit(
execute_task,
*task_arg,
langfuse_trace_id=trace_id,
langfuse_parent_observation_id=observation_id,
)
for task_arg in task_args
]
for future in futures:
future.result()
References and more details:
- Langfuse CrewAI integration guide
- Langfuse Python SDK advanced usage: context propagation
- Discussion: async CrewAI tasks and trace grouping
Best practices: always run all async tasks inside the same parent span context, avoid mixing manual SDK instrumentation with OpenTelemetry tracing, and flush events before your app exits. If you need more examples or want to see how to handle edge cases (like streaming or generator yields), let me know!
To reply, just mention @dosu.
How did I do? Good | Irrelevant | Incorrect | Verbose | Hallucination | Report 🐛 | Other
Thanks for raising this! Since kickoff_async is using multithreading, could you please follow our docs here? https://langfuse.com/docs/observability/sdk/python/advanced-usage#using-threadpoolexecutors
This issue is stale because it has been open for 30 days with no activity.
This issue was closed because it has been inactive for 14 days since being marked as stale. Please reopen if the issue persists.