dd-trace-py icon indicating copy to clipboard operation
dd-trace-py copied to clipboard

[LLMObs] Span client flush doesn't work well with Celery+gevent pools

Open kimwonj77 opened this issue 1 year ago • 6 comments

Summary of problem

We're using celery + gevent pool (with 500 workers) for LLM batch processing. And we're missing 4/5 of llm calls on Datadog.

Continued from #10212 as I can't open issue again. Also, when that time, it was just DD server can't handle it. and now, it's libraries issue, I think.

Hi, I recently tested version 2.12 and noticed that the 'error log' is no more on logs. However, we are still only receiving the most recent LLM calls. We should be receiving at least 400 LLM calls span, but we are only getting around 30, and they all appear to be the latest ones. It’s possible that ddtrace may not be compatible with the gevent pool through Celery. (We are using a gevent pool with 500 workers for the LLM calls, as they are essentially just send-and-wait operations.)

Which version of dd-trace-py are you using?

2.12.0

Which version of pip are you using?

pip 24.1.2 from .venv/lib/python3.11/site-packages/pip (python 3.11)    

Which libraries and their versions are you using?

`pip freeze` celery==5.4.0 ddtrace==2.12.0 gevent==24.2.1 greenlet==3.0.3 ; platform_python_implementation == 'CPython'

How can we reproduce your problem?

Need really bulky LLM Calls. (it's just bunch of calls. tokens are 5K for input + output, and also non-ascii calls so it little bit bigger than english llm calls)

What is the result that you get?

Only latest calls are on DD.

What is the result that you expected?

All of llmobs datas goes to Datadog LLMObs.

kimwonj77 avatar Sep 10 '24 07:09 kimwonj77

Hi @kimwonj77, thanks for reaching out again. I suspect this might have something to do with gevent compatibility with the overall ddtrace library. Would you be able to give us a small code snippet that we can use as a reproduction case?

Yun-Kim avatar Sep 11 '24 13:09 Yun-Kim

I made simple code to reproduce the issue

# pip install "ddtrace>=2.12.0" celery[redis,gevent] python-dotenv tqdm
from dotenv import load_dotenv
load_dotenv()
from celery import Celery, signature, group
from ddtrace import patch_all
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs.decorators import task, llm
from time import sleep
from random import randint
from tqdm import tqdm

patch_all()
LLMObs.enable(
    ml_app="test-ddtrace-reproduce",
    integrations_enabled=True,
    # agentless_enabled=True,
)

app = Celery(
    "ddtrace-reproduce",
    broker="redis://localhost:6379/1",
    backend="redis://localhost:6379/1",
)


@llm(model_name="mock_llm", name="mock_llm")
def mock_llm(input_data):
    # this should be openai messages in my (production) case
    result = "A_VERY_LONG_TOKEN_TEST" * 250
    # sleep for a random time for mocking the model
    sleep(randint(50, 100) / 10)
    LLMObs.annotate(
        input_data=[{"role": "user", "data": input_data}],
        output_data=[{"role": "model", "data": result}],
        metrics={"input_tokens": len(input_data), "output_tokens": len(result)},
        tags={"test": True},
    )
    return result


@app.task
@task
def llmTask(input_idx):
    input_data = f"+{input_idx}+input_data-" + "TEST" * 100
    result = mock_llm(input_data)

    LLMObs.annotate(
        input_data=input_data,
        output_data=result,
        tags={"test": True},
    )

    return result


if __name__ == "__main__":
    # make 1000 tasks
    length = 1000
    tasks = [signature("reproduce.llmTask", args=(i,)) for i in range(length)]
    # group the tasks by 100
    gs = [group(tasks[i : i + 100]) for i in range(0, length, 100)]
    # run the groups
    results = [g.apply_async() for g in gs]
    # get the results
    gots = [r.get() for r in tqdm(results)]
    # merge into a single list
    result = [item for sublist in gots for item in sublist]
    assert len(result) == length

# Run commands:
# python -m celery --app=reproduce worker --pool=gevent -l INFO -E -c 500
# python ./reproduce.py

The DD Should receiving 1000 of llmobs metrics. only 266 spans are received. image

Here's my .env (api key is stripped)

DD_SITE=datadoghq.com
DD_LLMOBS_ENABLED=1
DD_LLMOBS_ML_APP=ddtrace-reproduce
DD_AGENT_HOST=localhost
DD_SERVICE=ddtrace-reproduce
DD_TRACE_SAMPLING_RULES='[{"sample_rate":1}]'

I tried set DD_TRACE_SAMPLING_RULES='[{"sample_rate":1}]' and it seems little more spans? (It's unclear) but still not receiving full 1000 spans. (I don't even know how DD_TRACE_SAMPLING_RULES affects LLMObs integration... I thought it isn't by the way.)

Also, Agent-less mode also sometime prints skip additional 5 messages like messages. Edit: got this message on agent-mode too. sent 288.82KB in 1.68592s to http://localhost:8126/evp_proxy/v2/api/v2/llmobs, 1 additional messages skipped

Ps. Agent is 7.57.0 with windows host

kimwonj77 avatar Sep 12 '24 03:09 kimwonj77

Thank you @kimwonj77 for the thorough response, we'll try reproducing this issue and get back to you as soon as possible!

Yun-Kim avatar Sep 13 '24 20:09 Yun-Kim

Hey @kimwonj77 thanks for your patience here and the thorough reproduction script! Just wanted to let you know that I was able to reproduce your issue and I am working on a fix.

lievan avatar Sep 25 '24 20:09 lievan

Hey @kimwonj77, we'll investigate how to make flushing large spans more performant. And you're right - DD_TRACE_SAMPLING_RULES should not affect LLM Obs in any way.

As a temporary work-around, could you try force flushing traces using LLMObs.flush like so?

@task
def llmTask(input_idx):
    input_data = f"+{input_idx}+input_data-" + "TEST" * 100
    result = mock_llm(input_data)

    LLMObs.annotate(
        input_data=input_data,
        output_data=result,
        tags={"test": True},
    )
    return result

@app.task
def wrappedLlmTask(input_inx):
      result = llmTask(input_idx)
      LLMObs.flush() # force flushes all finished spans to dd
      return result

...

tasks = [signature("reproduce.wrappedLlmTask", args=(i,)) for i in range(length)]

This worked for me in getting all 1000 traces to show up, though it will add some overhead to your llm tasks.

lievan avatar Oct 02 '24 15:10 lievan

Hi, I tried warp the llmobs flush like:

def llmTask(input_idx):
      result = _llmTask(input_idx)
      LLMObs.flush() # force flushes all finished spans to dd
      return result

But as soon as do this, the call hangs indefinently. seems like flush locked up when there's no flush out things? This quite slow down the whole process and always locked up. (I can check active task keeps somewhat hangs via celery api, and after commenting out LLMObs.flush there's no issue) ddtrace was 2.14.2 for test this out.

Edit: It was not just 'overhead'. I thought this will be like add up to 20 seconds to each call but it ended up keep reaching out soft_time_limit for one or two calls every time and keep hangs.

Edit3: I ended up again with this code due to celery doesn't work very well with gevent.Timeout.

@app.task(soft_time_limit=15)
def llmobs_flush():
    """
    Flush the LLM Observability spans to Datadog

    Returns:
        None
    """

    try:
        LLMObs.flush()
    except SoftTimeLimitExceeded:
        # Log the timeout event
        logger.warning("report_that_sometime_hangs timed out after 15 seconds")
    except Exception as e:
        # Handle other exceptions if necessary
        logger.error(f"An error occurred: {e}")

@app.task
def llmTask(input_inx):
      result = _llmTask(input_idx)
      llmobs_flush.delay()
      return result

kimwonj77 avatar Oct 21 '24 02:10 kimwonj77

Hey @kimwonj77, is your solution above enough to get all of the spans to show up (except in cases where flush times out)?

seems like flush locked up when there's no flush out things?

Let me investigate any potential deadlock issues and get back to you

lievan avatar Oct 29 '24 15:10 lievan

We have indeed switched to Celery’s hard time limit(time_limit=15) to ensure a definite timeout, and it has resolved the primary issue. However, there's a minor issue: occasionally, it causes a complete worker crash, especially after extended periods, around 30 minutes or more, when multiple deadlocks have occurred (Due to AMPQ issue with celery that caused by deadlock?) . But is not critical for us. (well. It is critical for someone by the way...) As we’re able to restart without any data loss.

Just in case. I attached a log

Celery Crash ``` [2024-10-25 11:28:35,342: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more', (0, 0), '') Traceback (most recent call last): File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start self.blueprint.start(self) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start return self.obj.start() ^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start blueprint.start(self) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 746, in start c.loop(*c.loop_args()) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/loops.py", line 130, in synloop connection.drain_events(timeout=2.0) File "/[redacted]/.venv/lib/python3.11/site-packages/kombu/connection.py", line 341, in drain_events return self.transport.drain_events(self.connection, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 171, in drain_events return connection.drain_events(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events while not self.blocking_read(timeout): ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/connection.py", line 532, in blocking_read return self.on_inbound_frame(frame) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/method_framing.py", line 53, in on_frame callback(channel, method_sig, buf, None) File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/connection.py", line 538, in on_inbound_method return self.channels[channel_id].dispatch_method( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/abstract_channel.py", line 156, in dispatch_method listener(*args) File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/channel.py", line 293, in _on_close raise error_for_code( amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more ```

kimwonj77 avatar Oct 30 '24 05:10 kimwonj77

Hey @kimwonj77, closing this since the work-around mentioned resolves your primary issue.

I was also not able to reproduce any deadlocks, but the team will investigate a better solution for flushing large LLM spans and we'll keep an eye out for deadlock scenarios then. As always, feel free open another issue and tag me if you run into similar LLM Obs problems!

lievan avatar Nov 13 '24 22:11 lievan