langfuse icon indicating copy to clipboard operation
langfuse copied to clipboard

bug: Async Tasks in CrewAI Create Separate Traces from Crew in Langfuse

Open moqiaaa opened this issue 3 months ago • 5 comments

Describe the bug

When I integrate Langfuse with CrewAI and use the async execution flag (async_execution=True) in CrewAI tasks, when multiple tasks are executed concurrently, these tasks generate different trace_ids, which are not the same as the current crew's trace.

langfuse 3.5.0 crewai 0.193.2

Steps to reproduce

When I integrate Langfuse with CrewAI and use the async execution flag (async_execution=True) in CrewAI tasks, when multiple tasks are executed concurrently, these tasks generate different trace_ids, which are not the same as the current crew's trace.

Langfuse Cloud or self-hosted?

Self-hosted

If self-hosted, what version are you running?

3.5.0

SDK and integration versions

python sdk

Additional information

No response

Are you interested in contributing a fix for this bug?

No

moqiaaa avatar Sep 25 '25 05:09 moqiaaa

Hi @moqiaaa! I'm Dosu and I’m helping the langfuse team.

This happens because OpenTelemetry context propagation is lost across async boundaries and thread pools, especially when using async_execution=True in CrewAI. Each async task may start without the parent trace context, so Langfuse creates a new trace for each task instead of grouping them under the crew's trace. This is a known limitation with Python's contextvars and async/threaded execution, not a bug in Langfuse or CrewAI specifically [source] [source].

To work around this and ensure all async tasks share the same trace:

  • Wrap the entire CrewAI workflow (including all async tasks) inside a single start_as_current_span context. All tool/agent calls must execute within this context for trace continuity [source].
  • Avoid mixing manual Langfuse instrumentation (like @observe or direct Langfuse() calls) with OpenTelemetry-based tracing, as this can cause double instrumentation and multiple traces [source].
  • If you must use thread pools or async tasks that break context, explicitly pass the langfuse_trace_id and langfuse_parent_observation_id as keyword arguments to each async execution or use manual context management (start the span in the parent, pass it to the child, and end it explicitly) [source].
  • CrewAI's kickoff_async may use asyncio.to_thread, which can further break context propagation [source].

If you want a concrete example or more details on context propagation helpers, let me know!

To reply, just mention @dosu.


How did I do? Good | Irrelevant | Incorrect | Verbose | Hallucination | Report 🐛 | Other  Chat with Dosu Join Discord Share on X

dosubot[bot] avatar Sep 25 '25 05:09 dosubot[bot]

a concrete example

i need a concrete example。 thanks @DosuBot how can i get this concrete examples。

moqiaaa avatar Sep 25 '25 10:09 moqiaaa

Here's a concrete example for CrewAI + Langfuse async tracing:

To ensure all async CrewAI tasks share the same trace, wrap the entire workflow (including kickoff and all async tasks) inside a single Langfuse span context. This keeps the trace context intact across async boundaries and avoids separate trace_ids for each task. For example:

from langfuse import get_client

langfuse = get_client()

with langfuse.start_as_current_span(name="Analysis Workflow") as span:
    # Set helpful attributes for grouping in Langfuse UI
    span.update_trace(user_id="user_123", session_id="session_abc")

    # All CrewAI calls must happen inside this context
    result = crew.kickoff_async(async_execution=True)
    print(result)

langfuse.flush()  # Ensure all events are sent in short-lived apps

If you spawn threads or use asyncio.to_thread (which CrewAI's kickoff_async may do), you must manually propagate the trace context. Pass langfuse_trace_id and langfuse_parent_observation_id as keyword arguments to each async task or thread. Example for manual propagation:

from concurrent.futures import ThreadPoolExecutor
from langfuse import get_client, observe

@observe
def execute_task(*args, langfuse_trace_id=None, langfuse_parent_observation_id=None):
    # Your task logic here
    pass

@observe
def execute_groups(task_args):
    trace_id = get_client().get_current_trace_id()
    observation_id = get_client().get_current_observation_id()

    with ThreadPoolExecutor(3) as executor:
        futures = [
            executor.submit(
                execute_task,
                *task_arg,
                langfuse_trace_id=trace_id,
                langfuse_parent_observation_id=observation_id,
            )
            for task_arg in task_args
        ]
        for future in futures:
            future.result()

References and more details:

Best practices: always run all async tasks inside the same parent span context, avoid mixing manual SDK instrumentation with OpenTelemetry tracing, and flush events before your app exits. If you need more examples or want to see how to handle edge cases (like streaming or generator yields), let me know!

To reply, just mention @dosu.


How did I do? Good | Irrelevant | Incorrect | Verbose | Hallucination | Report 🐛 | Other  Chat with Dosu Join Discord Share on X

dosubot[bot] avatar Sep 25 '25 10:09 dosubot[bot]

Thanks for raising this! Since kickoff_async is using multithreading, could you please follow our docs here? https://langfuse.com/docs/observability/sdk/python/advanced-usage#using-threadpoolexecutors

hassiebp avatar Oct 13 '25 09:10 hassiebp

This issue is stale because it has been open for 30 days with no activity.

github-actions[bot] avatar Nov 15 '25 02:11 github-actions[bot]

This issue was closed because it has been inactive for 14 days since being marked as stale. Please reopen if the issue persists.

github-actions[bot] avatar Nov 29 '25 02:11 github-actions[bot]