[LLMObs] Span client flush doesn't work well with Celery+gevent pools
Summary of problem
We're using celery + gevent pool (with 500 workers) for LLM batch processing. And we're missing 4/5 of llm calls on Datadog.
Continued from #10212 as I can't open issue again. Also, when that time, it was just DD server can't handle it. and now, it's libraries issue, I think.
Hi, I recently tested version 2.12 and noticed that the 'error log' is no more on logs. However, we are still only receiving the most recent LLM calls. We should be receiving at least 400 LLM calls span, but we are only getting around 30, and they all appear to be the latest ones. It’s possible that ddtrace may not be compatible with the gevent pool through Celery. (We are using a gevent pool with 500 workers for the LLM calls, as they are essentially just send-and-wait operations.)
Which version of dd-trace-py are you using?
2.12.0
Which version of pip are you using?
pip 24.1.2 from .venv/lib/python3.11/site-packages/pip (python 3.11)
Which libraries and their versions are you using?
`pip freeze`
celery==5.4.0 ddtrace==2.12.0 gevent==24.2.1 greenlet==3.0.3 ; platform_python_implementation == 'CPython'How can we reproduce your problem?
Need really bulky LLM Calls. (it's just bunch of calls. tokens are 5K for input + output, and also non-ascii calls so it little bit bigger than english llm calls)
What is the result that you get?
Only latest calls are on DD.
What is the result that you expected?
All of llmobs datas goes to Datadog LLMObs.
Hi @kimwonj77, thanks for reaching out again. I suspect this might have something to do with gevent compatibility with the overall ddtrace library. Would you be able to give us a small code snippet that we can use as a reproduction case?
I made simple code to reproduce the issue
# pip install "ddtrace>=2.12.0" celery[redis,gevent] python-dotenv tqdm
from dotenv import load_dotenv
load_dotenv()
from celery import Celery, signature, group
from ddtrace import patch_all
from ddtrace.llmobs import LLMObs
from ddtrace.llmobs.decorators import task, llm
from time import sleep
from random import randint
from tqdm import tqdm
patch_all()
LLMObs.enable(
ml_app="test-ddtrace-reproduce",
integrations_enabled=True,
# agentless_enabled=True,
)
app = Celery(
"ddtrace-reproduce",
broker="redis://localhost:6379/1",
backend="redis://localhost:6379/1",
)
@llm(model_name="mock_llm", name="mock_llm")
def mock_llm(input_data):
# this should be openai messages in my (production) case
result = "A_VERY_LONG_TOKEN_TEST" * 250
# sleep for a random time for mocking the model
sleep(randint(50, 100) / 10)
LLMObs.annotate(
input_data=[{"role": "user", "data": input_data}],
output_data=[{"role": "model", "data": result}],
metrics={"input_tokens": len(input_data), "output_tokens": len(result)},
tags={"test": True},
)
return result
@app.task
@task
def llmTask(input_idx):
input_data = f"+{input_idx}+input_data-" + "TEST" * 100
result = mock_llm(input_data)
LLMObs.annotate(
input_data=input_data,
output_data=result,
tags={"test": True},
)
return result
if __name__ == "__main__":
# make 1000 tasks
length = 1000
tasks = [signature("reproduce.llmTask", args=(i,)) for i in range(length)]
# group the tasks by 100
gs = [group(tasks[i : i + 100]) for i in range(0, length, 100)]
# run the groups
results = [g.apply_async() for g in gs]
# get the results
gots = [r.get() for r in tqdm(results)]
# merge into a single list
result = [item for sublist in gots for item in sublist]
assert len(result) == length
# Run commands:
# python -m celery --app=reproduce worker --pool=gevent -l INFO -E -c 500
# python ./reproduce.py
The DD Should receiving 1000 of llmobs metrics. only 266 spans are received.
Here's my .env (api key is stripped)
DD_SITE=datadoghq.com
DD_LLMOBS_ENABLED=1
DD_LLMOBS_ML_APP=ddtrace-reproduce
DD_AGENT_HOST=localhost
DD_SERVICE=ddtrace-reproduce
DD_TRACE_SAMPLING_RULES='[{"sample_rate":1}]'
I tried set DD_TRACE_SAMPLING_RULES='[{"sample_rate":1}]' and it seems little more spans? (It's unclear) but still not receiving full 1000 spans. (I don't even know how DD_TRACE_SAMPLING_RULES affects LLMObs integration... I thought it isn't by the way.)
Also, Agent-less mode also sometime prints skip additional 5 messages like messages.
Edit: got this message on agent-mode too.
sent 288.82KB in 1.68592s to http://localhost:8126/evp_proxy/v2/api/v2/llmobs, 1 additional messages skipped
Ps. Agent is 7.57.0 with windows host
Thank you @kimwonj77 for the thorough response, we'll try reproducing this issue and get back to you as soon as possible!
Hey @kimwonj77 thanks for your patience here and the thorough reproduction script! Just wanted to let you know that I was able to reproduce your issue and I am working on a fix.
Hey @kimwonj77, we'll investigate how to make flushing large spans more performant. And you're right - DD_TRACE_SAMPLING_RULES should not affect LLM Obs in any way.
As a temporary work-around, could you try force flushing traces using LLMObs.flush like so?
@task
def llmTask(input_idx):
input_data = f"+{input_idx}+input_data-" + "TEST" * 100
result = mock_llm(input_data)
LLMObs.annotate(
input_data=input_data,
output_data=result,
tags={"test": True},
)
return result
@app.task
def wrappedLlmTask(input_inx):
result = llmTask(input_idx)
LLMObs.flush() # force flushes all finished spans to dd
return result
...
tasks = [signature("reproduce.wrappedLlmTask", args=(i,)) for i in range(length)]
This worked for me in getting all 1000 traces to show up, though it will add some overhead to your llm tasks.
Hi, I tried warp the llmobs flush like:
def llmTask(input_idx):
result = _llmTask(input_idx)
LLMObs.flush() # force flushes all finished spans to dd
return result
But as soon as do this, the call hangs indefinently. seems like flush locked up when there's no flush out things? This quite slow down the whole process and always locked up. (I can check active task keeps somewhat hangs via celery api, and after commenting out LLMObs.flush there's no issue)
ddtrace was 2.14.2 for test this out.
Edit: It was not just 'overhead'. I thought this will be like add up to 20 seconds to each call but it ended up keep reaching out soft_time_limit for one or two calls every time and keep hangs.
Edit3: I ended up again with this code due to celery doesn't work very well with gevent.Timeout.
@app.task(soft_time_limit=15)
def llmobs_flush():
"""
Flush the LLM Observability spans to Datadog
Returns:
None
"""
try:
LLMObs.flush()
except SoftTimeLimitExceeded:
# Log the timeout event
logger.warning("report_that_sometime_hangs timed out after 15 seconds")
except Exception as e:
# Handle other exceptions if necessary
logger.error(f"An error occurred: {e}")
@app.task
def llmTask(input_inx):
result = _llmTask(input_idx)
llmobs_flush.delay()
return result
Hey @kimwonj77, is your solution above enough to get all of the spans to show up (except in cases where flush times out)?
seems like flush locked up when there's no flush out things?
Let me investigate any potential deadlock issues and get back to you
We have indeed switched to Celery’s hard time limit(time_limit=15) to ensure a definite timeout, and it has resolved the primary issue. However, there's a minor issue: occasionally, it causes a complete worker crash, especially after extended periods, around 30 minutes or more, when multiple deadlocks have occurred (Due to AMPQ issue with celery that caused by deadlock?) . But is not critical for us. (well. It is critical for someone by the way...) As we’re able to restart without any data loss.
Just in case. I attached a log
Celery Crash
``` [2024-10-25 11:28:35,342: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more', (0, 0), '') Traceback (most recent call last): File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start self.blueprint.start(self) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start return self.obj.start() ^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start blueprint.start(self) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start step.start(parent) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 746, in start c.loop(*c.loop_args()) File "/[redacted]/.venv/lib/python3.11/site-packages/celery/worker/loops.py", line 130, in synloop connection.drain_events(timeout=2.0) File "/[redacted]/.venv/lib/python3.11/site-packages/kombu/connection.py", line 341, in drain_events return self.transport.drain_events(self.connection, **kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/kombu/transport/pyamqp.py", line 171, in drain_events return connection.drain_events(**kwargs) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/connection.py", line 526, in drain_events while not self.blocking_read(timeout): ^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/connection.py", line 532, in blocking_read return self.on_inbound_frame(frame) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/method_framing.py", line 53, in on_frame callback(channel, method_sig, buf, None) File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/connection.py", line 538, in on_inbound_method return self.channels[channel_id].dispatch_method( ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/abstract_channel.py", line 156, in dispatch_method listener(*args) File "/[redacted]/.venv/lib/python3.11/site-packages/amqp/channel.py", line 293, in _on_close raise error_for_code( amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more ```
Hey @kimwonj77, closing this since the work-around mentioned resolves your primary issue.
I was also not able to reproduce any deadlocks, but the team will investigate a better solution for flushing large LLM spans and we'll keep an eye out for deadlock scenarios then. As always, feel free open another issue and tag me if you run into similar LLM Obs problems!