The `asyncio` integration breaks Faust
Summary of problem
I'm trying to instrument workers for Faust, the successor to Celery in Python that does streaming message processing, developed at Robinhood.
When the asyncio integration is enabled, the Faust worker fails to start up properly.
2024-08-21T16:07:35.278760 [info ] Configured ddtrace instrumentation for 1 integration(s). The following modules have been patched: asyncio [ddtrace._monkey] func_name=patch lineno=267 module=_monkey
starting➢ 2024-08-21T16:07:35.279820 [info ] [^Worker]: Starting... [faust.worker] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364271 [info ] [^-SaladShooterFaustApp]: Starting... [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364537 [info ] [^--DatadogMonitor]: Starting... [faust.sensors.datadog] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364807 [info ] [^--Producer]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.365019 [info ] [^---ProducerBuffer]: Starting... [faust.transport.producer] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.375441 [info ] [^--CacheBackend]: Starting... [faust.web.cache.backends.base] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.375681 [info ] [^--Web]: Starting... [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.389261 [info ] [^---Server]: Starting... [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.390258 [info ] [^--Consumer]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.391236 [info ] [^---AIOKafkaConsumerThread]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.398984 [info ] [^--LeaderAssignor]: Starting... [faust.assignor.leader_assignor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.400007 [info ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.403748 [info ] [^--ReplyConsumer]: Starting... [faust.agents.replies] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.403987 [info ] [^--AgentManager]: Starting... [faust.agents.manager] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.404264 [info ] [^---Agent: saladshooter.stre[.]ingest_event]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.407193 [info ] [^---Agent: saladshoote[.]record_event_to_pg]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.408952 [info ] [^---Conductor]: Starting... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.409188 [info ] [^--TableManager]: Starting... [faust.tables.manager] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.409804 [info ] [^---Conductor]: Waiting for agents to start... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.410273 [info ] [^---Conductor]: Waiting for tables to be registered... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.411220 [info ] [^---Recovery]: Starting... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.412044 [debug ] [^-SaladShooterFaustApp]: Started. [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.413783 [info ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.417529 [info ] [^--Producer]: Creating topic 'saladshooter_uiaction_ingestion' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
/Users/jag/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py:1921: RuntimeWarning: coroutine 'flight_recorder._waiting' was never awaited
handle = self._ready.popleft()
Object allocated at (most recent call last):
File "/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/utils/logging.py", lineno 732
self._fut = asyncio.ensure_future(self._waiting(), loop=self.loop)
2024-08-21T16:07:36.452266 [info ] Executing _on_partitions_assigned [faust.app.base] func_name=_on_partitions_assigned lineno=1745 module=base
2024-08-21T16:07:36.454319 [info ] generation id 1 app consumers id 1 [faust.tables.recovery] func_name=on_rebalance lineno=254 module=recovery
2024-08-21T16:07:36.459239 [info ] [^---Recovery]: Seek stream partitions to committed offsets. [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.467772 [info ] [^---Recovery]: Resuming flow... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.468228 [info ] [^---Fetcher]: Starting... [faust.transport.consumer] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.469210 [info ] [^---Recovery]: Worker ready [faust.tables.recovery] func_name=log lineno=284 module=logging
😊
/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/services.py:726: RuntimeWarning: coroutine 'Event.wait' was never awaited
pass
Object allocated at (most recent call last):
File "/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/services.py", lineno 723
self._stopped.wait(), timeout=want_seconds(n)
Note the two RuntimeWarnings that certain coroutines were never awaited. At this point, while the Faust process is still running, none of the loops consuming events are actively running.
The two warnings come out of mode, a related service/worker creation library from the same authors of Faust. Links to those lines of code throwing the warnings are here:
- https://github.com/faust-streaming/mode/blob/0.4.1/mode/utils/logging.py#L723-L732
- https://github.com/faust-streaming/mode/blob/0.4.1/mode/services.py#L719-L726
Without asyncio, this is the output, where everything is working fine:
2024-08-21T16:12:43.720566 [info ] Configured ddtrace instrumentation for 0 integration(s). The following modules have been patched: [ddtrace._monkey] func_name=patch lineno=267 module=_monkey
starting➢ 2024-08-21T16:12:43.721608 [info ] [^Worker]: Starting... [faust.worker] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.803652 [info ] [^-SaladShooterFaustApp]: Starting... [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.803908 [info ] [^--DatadogMonitor]: Starting... [faust.sensors.datadog] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.804161 [info ] [^--Producer]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.804359 [info ] [^---ProducerBuffer]: Starting... [faust.transport.producer] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.813663 [info ] [^--CacheBackend]: Starting... [faust.web.cache.backends.base] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.813904 [info ] [^--Web]: Starting... [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.827304 [info ] [^---Server]: Starting... [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.828269 [info ] [^--Consumer]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.829140 [info ] [^---AIOKafkaConsumerThread]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.836763 [info ] [^--LeaderAssignor]: Starting... [faust.assignor.leader_assignor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.837944 [info ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842077 [info ] [^--ReplyConsumer]: Starting... [faust.agents.replies] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842338 [info ] [^--AgentManager]: Starting... [faust.agents.manager] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842608 [info ] [^---Agent: saladshooter.stre[.]ingest_event]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.845350 [info ] [^---Agent: saladshoote[.]record_event_to_pg]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847106 [info ] [^---Conductor]: Starting... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847334 [info ] [^--TableManager]: Starting... [faust.tables.manager] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847936 [info ] [^---Conductor]: Waiting for agents to start... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.848366 [info ] [^---Conductor]: Waiting for tables to be registered... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.849170 [info ] [^---Recovery]: Starting... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.849724 [debug ] [^-SaladShooterFaustApp]: Started. [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.850974 [info ] [^--Producer]: Creating topic 'saladshooter_uiaction_ingestion' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.853770 [info ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.879137 [info ] Executing _on_partitions_assigned [faust.app.base] func_name=_on_partitions_assigned lineno=1745 module=base
2024-08-21T16:12:44.880802 [info ] generation id 3 app consumers id 3 [faust.tables.recovery] func_name=on_rebalance lineno=254 module=recovery
2024-08-21T16:12:44.883283 [info ] [^---Recovery]: Seek stream partitions to committed offsets. [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.891936 [info ] [^---Recovery]: Resuming flow... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.892406 [info ] [^---Fetcher]: Starting... [faust.transport.consumer] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.893529 [info ] [^---Recovery]: Worker ready [faust.tables.recovery] func_name=log lineno=284 module=logging
Which version of dd-trace-py are you using?
dd-trace-py==2.11.1
Which version of pip are you using?
pip==24.2
Which libraries and their versions are you using?
`pip freeze`
aioconsole==0.7.1 aiodogstatsd==0.16.0.post0 aiohappyeyeballs==2.3.2 aiohttp==3.10.0 aiohttp-cors==0.7.0 aiokafka==0.10.0 aiomonitor==0.7.0 aiosignal==1.3.1 alembic==1.13.2 annotated-types==0.7.0 anyio==4.4.0 aredis==1.1.8 async-timeout==4.0.3 asyncpg==0.29.0 attrs==23.2.0 build==1.2.1 bytecode==0.15.1 casefy==0.1.7 certifi==2024.7.4 charset-normalizer==3.3.2 click==8.1.7 colorlog==6.8.2 cramjam==2.8.3 croniter==2.0.7 dacite==1.8.1 dataclasses-avroschema==0.60.2 datadog==0.49.1 ddtrace==2.11.1 Deprecated==1.2.14 dill==0.3.8 dnspython==2.6.1 email_validator==2.2.0 envier==0.5.2 fastapi==0.111.1 fastapi-cli==0.0.4 fastavro==1.9.5 faust-streaming==0.11.2 faust-streaming-rocksdb==0.9.3 frozenlist==1.4.1 greenlet==3.0.3 h11==0.14.0 httpcore==1.0.5 httptools==0.6.1 httpx==0.27.0 humanize==4.10.0 idna==3.7 importlib_metadata==8.0.0 Inflector==3.1.1 iniconfig==2.0.0 intervaltree==3.1.0 janus==1.0.0 Jinja2==3.1.4 Mako==1.3.5 markdown-it-py==3.0.0 MarkupSafe==2.1.5 mdurl==0.1.2 mode-streaming==0.4.1 molotov==2.6 multidict==6.0.5 multiprocess==0.70.16 mypy==1.11.1 mypy-extensions==1.0.0 opentelemetry-api==1.26.0 opentracing==2.4.0 orjson==3.10.6 packaging==24.1 pip-tools==7.4.1 pluggy==1.5.0 prompt_toolkit==3.0.47 protobuf==5.27.3 psycopg2-binary==2.9.9 pydantic==2.8.2 pydantic-settings==2.4.0 pydantic_core==2.20.1 Pygments==2.18.0 pyproject_hooks==1.1.0 pytest==8.3.2 pytest-asyncio==0.23.8 python-dateutil==2.9.0.post0 python-dotenv==1.0.1 python-multipart==0.0.9 python-snappy==0.7.2 pytz==2024.1 PyYAML==5.3.1 redis==5.0.8 requests==2.32.3 rich==13.7.1 shellingham==1.5.4 six==1.16.0 sniffio==1.3.1 sortedcontainers==2.4.0 SQLAlchemy==2.0.31 starlette==0.37.2 structlog==24.4.0 terminaltables==3.1.10 trafaret==2.1.1 typer==0.12.3 typing_extensions==4.12.2 urllib3==2.2.2 uvicorn==0.30.3 uvloop==0.19.0 venusian==3.1.0 watchfiles==0.22.0 wcwidth==0.2.13 websockets==12.0 wrapt==1.16.0 xmltodict==0.13.0 yarl==1.9.4 zipp==3.20.0How can we reproduce your problem?
You can set up a simple Kafka compatible broker using Docker Compose:
services:
redpanda:
command:
- redpanda
- start
- --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
# Address the broker advertises to clients that connect to the Kafka API.
# Use the internal addresses to connect to the Redpanda brokers'
# from inside the same Docker network.
# Use the external addresses to connect to the Redpanda brokers'
# from outside the Docker network.
- --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
- --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
# Address the broker advertises to clients that connect to the HTTP Proxy.
- --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
- --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
# Redpanda brokers use the RPC API to communicate with each other internally.
- --rpc-addr redpanda:33145
- --advertise-rpc-addr redpanda:33145
# Mode dev-container uses well-known configuration properties for development in containers.
- --mode dev-container
# Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
- --smp 1
- --default-log-level=info
image: docker.redpanda.com/redpandadata/redpanda:latest
volumes:
- redpanda:/var/lib/redpanda/data
networks:
- saladshooter
ports:
- 18081:18081
- 18082:18082
- 19092:19092
- 19644:9644
redpanda-console:
image: docker.redpanda.com/redpandadata/console:latest
networks:
- saladshooter
entrypoint: /bin/sh
command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ["redpanda:9092"]
schemaRegistry:
enabled: true
urls: ["http://redpanda:8081"]
redpanda:
adminApi:
enabled: true
urls: ["http://redpanda:9644"]
ports:
- 8080:8080
depends_on:
- redpanda
volumes:
redpanda:
driver: local
networks:
saladshooter:
driver: bridge
Then you can make a placeholder Faust application like:
from typing import Any
import faust
app = faust.App(
"saladshooter",
broker="kafka://localhost:19092/",
store="memory://")
ingestion_topic = app.topic("saladshooter_uiaction_ingestion", internal=True)
@app.agent(ingestion_topic)
async def ingestor(event_stream: faust.StreamT[Any]) -> None:
async for event in event_stream:
print(event)
def main() -> None:
# I set up structlog configuration here.
import ddtrace
ddtrace.patch(asyncio=True)
faust.Worker(app, loglevel="info").execute_from_commandline()
if __name__ == "__main__":
main()
Thank you for the detailed replication code @j00bar. We'll look into it.
cc @majorgreys
@emmettbutler Grateful to you for taking a look - and yeah, it's gotta be tough maintaining a wrapper library with literally infinite use cases and breadth of domain knowledge you'd have to assimilate, so glad I can make it easier to help us in this case!
FYI: The same issue is happening on python 3.12:
/.venv/lib/python3.12/site-packages/mode/services.py:726: RuntimeWarning: coroutine 'Event.wait' was never awaited
Coroutine created at (most recent call last)
File "/.venv/lib/python3.12/site-packages/hypercorn/asyncio/run.py", line 230, in _run
loop.run_until_complete(main(shutdown_trigger=shutdown_trigger))
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 673, in run_until_complete
self.run_forever()
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 640, in run_forever
self._run_once()
File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1984, in _run_once
handle._run()
File "/usr/local/lib/python3.12/asyncio/events.py", line 88, in _run
self._context.run(self._callback, *self._args)
File "/dd_tracer/python/ddtrace/contrib/asyncio/patch.py", line 50, in traced_coro
return await coro
File "/.venv/lib/python3.12/site-packages/mode/services.py", line 873, in _execute_task
await task
File "/.venv/lib/python3.12/site-packages/faust/transport/consumer.py", line 899, in _commit_livelock_detector
await self.verify_all_partitions_active()
File "/.venv/lib/python3.12/site-packages/faust/transport/consumer.py", line 904, in verify_all_partitions_active
await self.sleep(0)
File "/.venv/lib/python3.12/site-packages/mode/services.py", line 723, in sleep
self._stopped.wait(), timeout=want_seconds(n)
pass
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
As a workaround, would it be possible to exclude the faust package from patching/tracing ?
This issue has been automatically closed after a period of inactivity. If it's a feature request, it has been added to the maintainers' internal backlog and will be included in an upcoming round of feature prioritization. Please comment or reopen if you think this issue was closed in error.
@j00bar @konradr Hi, can you check whether you still see the issue on ddtrace==3.7.0? It looks similar to what's described in #8907 and it was fixed in #13326
Hi @taegyunkim
Thank you for the update, I can confirm that using ddtrace==3.7.0 does solve the issue for us.