dd-trace-py icon indicating copy to clipboard operation
dd-trace-py copied to clipboard

The `asyncio` integration breaks Faust

Open j00bar opened this issue 1 year ago • 2 comments

Summary of problem

I'm trying to instrument workers for Faust, the successor to Celery in Python that does streaming message processing, developed at Robinhood.

When the asyncio integration is enabled, the Faust worker fails to start up properly.

2024-08-21T16:07:35.278760 [info     ] Configured ddtrace instrumentation for 1 integration(s). The following modules have been patched: asyncio [ddtrace._monkey] func_name=patch lineno=267 module=_monkey
starting➢ 2024-08-21T16:07:35.279820 [info     ] [^Worker]: Starting...         [faust.worker] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364271 [info     ] [^-SaladShooterFaustApp]: Starting... [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364537 [info     ] [^--DatadogMonitor]: Starting... [faust.sensors.datadog] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.364807 [info     ] [^--Producer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.365019 [info     ] [^---ProducerBuffer]: Starting... [faust.transport.producer] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.375441 [info     ] [^--CacheBackend]: Starting... [faust.web.cache.backends.base] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.375681 [info     ] [^--Web]: Starting...          [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.389261 [info     ] [^---Server]: Starting...      [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.390258 [info     ] [^--Consumer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.391236 [info     ] [^---AIOKafkaConsumerThread]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.398984 [info     ] [^--LeaderAssignor]: Starting... [faust.assignor.leader_assignor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.400007 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.403748 [info     ] [^--ReplyConsumer]: Starting... [faust.agents.replies] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.403987 [info     ] [^--AgentManager]: Starting... [faust.agents.manager] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.404264 [info     ] [^---Agent: saladshooter.stre[.]ingest_event]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.407193 [info     ] [^---Agent: saladshoote[.]record_event_to_pg]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.408952 [info     ] [^---Conductor]: Starting...   [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.409188 [info     ] [^--TableManager]: Starting... [faust.tables.manager] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.409804 [info     ] [^---Conductor]: Waiting for agents to start... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:35.410273 [info     ] [^---Conductor]: Waiting for tables to be registered... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.411220 [info     ] [^---Recovery]: Starting...    [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.412044 [debug    ] [^-SaladShooterFaustApp]: Started. [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.413783 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.417529 [info     ] [^--Producer]: Creating topic 'saladshooter_uiaction_ingestion' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
/Users/jag/.pyenv/versions/3.11.9/lib/python3.11/asyncio/base_events.py:1921: RuntimeWarning: coroutine 'flight_recorder._waiting' was never awaited
  handle = self._ready.popleft()
Object allocated at (most recent call last):
  File "/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/utils/logging.py", lineno 732
    self._fut = asyncio.ensure_future(self._waiting(), loop=self.loop)
2024-08-21T16:07:36.452266 [info     ] Executing _on_partitions_assigned [faust.app.base] func_name=_on_partitions_assigned lineno=1745 module=base
2024-08-21T16:07:36.454319 [info     ] generation id 1 app consumers id 1 [faust.tables.recovery] func_name=on_rebalance lineno=254 module=recovery
2024-08-21T16:07:36.459239 [info     ] [^---Recovery]: Seek stream partitions to committed offsets. [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.467772 [info     ] [^---Recovery]: Resuming flow... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.468228 [info     ] [^---Fetcher]: Starting...     [faust.transport.consumer] func_name=log lineno=284 module=logging
2024-08-21T16:07:36.469210 [info     ] [^---Recovery]: Worker ready   [faust.tables.recovery] func_name=log lineno=284 module=logging
 😊
/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/services.py:726: RuntimeWarning: coroutine 'Event.wait' was never awaited
  pass
Object allocated at (most recent call last):
  File "/Users/jag/Development/salad-shooter/.direnv/python-3.11/lib/python3.11/site-packages/mode/services.py", lineno 723
    self._stopped.wait(), timeout=want_seconds(n)

Note the two RuntimeWarnings that certain coroutines were never awaited. At this point, while the Faust process is still running, none of the loops consuming events are actively running.

The two warnings come out of mode, a related service/worker creation library from the same authors of Faust. Links to those lines of code throwing the warnings are here:

  • https://github.com/faust-streaming/mode/blob/0.4.1/mode/utils/logging.py#L723-L732
  • https://github.com/faust-streaming/mode/blob/0.4.1/mode/services.py#L719-L726

Without asyncio, this is the output, where everything is working fine:

2024-08-21T16:12:43.720566 [info     ] Configured ddtrace instrumentation for 0 integration(s). The following modules have been patched:  [ddtrace._monkey] func_name=patch lineno=267 module=_monkey
starting➢ 2024-08-21T16:12:43.721608 [info     ] [^Worker]: Starting...         [faust.worker] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.803652 [info     ] [^-SaladShooterFaustApp]: Starting... [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.803908 [info     ] [^--DatadogMonitor]: Starting... [faust.sensors.datadog] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.804161 [info     ] [^--Producer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.804359 [info     ] [^---ProducerBuffer]: Starting... [faust.transport.producer] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.813663 [info     ] [^--CacheBackend]: Starting... [faust.web.cache.backends.base] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.813904 [info     ] [^--Web]: Starting...          [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.827304 [info     ] [^---Server]: Starting...      [faust.web.drivers.aiohttp] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.828269 [info     ] [^--Consumer]: Starting...     [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.829140 [info     ] [^---AIOKafkaConsumerThread]: Starting... [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.836763 [info     ] [^--LeaderAssignor]: Starting... [faust.assignor.leader_assignor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.837944 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842077 [info     ] [^--ReplyConsumer]: Starting... [faust.agents.replies] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842338 [info     ] [^--AgentManager]: Starting... [faust.agents.manager] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.842608 [info     ] [^---Agent: saladshooter.stre[.]ingest_event]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.845350 [info     ] [^---Agent: saladshoote[.]record_event_to_pg]: Starting... [faust.agents.agent] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847106 [info     ] [^---Conductor]: Starting...   [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847334 [info     ] [^--TableManager]: Starting... [faust.tables.manager] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.847936 [info     ] [^---Conductor]: Waiting for agents to start... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:43.848366 [info     ] [^---Conductor]: Waiting for tables to be registered... [faust.transport.conductor] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.849170 [info     ] [^---Recovery]: Starting...    [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.849724 [debug    ] [^-SaladShooterFaustApp]: Started. [saladshooter.streams.app] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.850974 [info     ] [^--Producer]: Creating topic 'saladshooter_uiaction_ingestion' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.853770 [info     ] [^--Producer]: Creating topic 'saladshooter-__assignor-__leader' [faust.transport.drivers.aiokafka] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.879137 [info     ] Executing _on_partitions_assigned [faust.app.base] func_name=_on_partitions_assigned lineno=1745 module=base
2024-08-21T16:12:44.880802 [info     ] generation id 3 app consumers id 3 [faust.tables.recovery] func_name=on_rebalance lineno=254 module=recovery
2024-08-21T16:12:44.883283 [info     ] [^---Recovery]: Seek stream partitions to committed offsets. [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.891936 [info     ] [^---Recovery]: Resuming flow... [faust.tables.recovery] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.892406 [info     ] [^---Fetcher]: Starting...     [faust.transport.consumer] func_name=log lineno=284 module=logging
2024-08-21T16:12:44.893529 [info     ] [^---Recovery]: Worker ready   [faust.tables.recovery] func_name=log lineno=284 module=logging

Which version of dd-trace-py are you using?

dd-trace-py==2.11.1

Which version of pip are you using?

pip==24.2

Which libraries and their versions are you using?

`pip freeze` aioconsole==0.7.1 aiodogstatsd==0.16.0.post0 aiohappyeyeballs==2.3.2 aiohttp==3.10.0 aiohttp-cors==0.7.0 aiokafka==0.10.0 aiomonitor==0.7.0 aiosignal==1.3.1 alembic==1.13.2 annotated-types==0.7.0 anyio==4.4.0 aredis==1.1.8 async-timeout==4.0.3 asyncpg==0.29.0 attrs==23.2.0 build==1.2.1 bytecode==0.15.1 casefy==0.1.7 certifi==2024.7.4 charset-normalizer==3.3.2 click==8.1.7 colorlog==6.8.2 cramjam==2.8.3 croniter==2.0.7 dacite==1.8.1 dataclasses-avroschema==0.60.2 datadog==0.49.1 ddtrace==2.11.1 Deprecated==1.2.14 dill==0.3.8 dnspython==2.6.1 email_validator==2.2.0 envier==0.5.2 fastapi==0.111.1 fastapi-cli==0.0.4 fastavro==1.9.5 faust-streaming==0.11.2 faust-streaming-rocksdb==0.9.3 frozenlist==1.4.1 greenlet==3.0.3 h11==0.14.0 httpcore==1.0.5 httptools==0.6.1 httpx==0.27.0 humanize==4.10.0 idna==3.7 importlib_metadata==8.0.0 Inflector==3.1.1 iniconfig==2.0.0 intervaltree==3.1.0 janus==1.0.0 Jinja2==3.1.4 Mako==1.3.5 markdown-it-py==3.0.0 MarkupSafe==2.1.5 mdurl==0.1.2 mode-streaming==0.4.1 molotov==2.6 multidict==6.0.5 multiprocess==0.70.16 mypy==1.11.1 mypy-extensions==1.0.0 opentelemetry-api==1.26.0 opentracing==2.4.0 orjson==3.10.6 packaging==24.1 pip-tools==7.4.1 pluggy==1.5.0 prompt_toolkit==3.0.47 protobuf==5.27.3 psycopg2-binary==2.9.9 pydantic==2.8.2 pydantic-settings==2.4.0 pydantic_core==2.20.1 Pygments==2.18.0 pyproject_hooks==1.1.0 pytest==8.3.2 pytest-asyncio==0.23.8 python-dateutil==2.9.0.post0 python-dotenv==1.0.1 python-multipart==0.0.9 python-snappy==0.7.2 pytz==2024.1 PyYAML==5.3.1 redis==5.0.8 requests==2.32.3 rich==13.7.1 shellingham==1.5.4 six==1.16.0 sniffio==1.3.1 sortedcontainers==2.4.0 SQLAlchemy==2.0.31 starlette==0.37.2 structlog==24.4.0 terminaltables==3.1.10 trafaret==2.1.1 typer==0.12.3 typing_extensions==4.12.2 urllib3==2.2.2 uvicorn==0.30.3 uvloop==0.19.0 venusian==3.1.0 watchfiles==0.22.0 wcwidth==0.2.13 websockets==12.0 wrapt==1.16.0 xmltodict==0.13.0 yarl==1.9.4 zipp==3.20.0

How can we reproduce your problem?

You can set up a simple Kafka compatible broker using Docker Compose:

services:
  redpanda:
    command:
      - redpanda
      - start
      - --kafka-addr internal://0.0.0.0:9092,external://0.0.0.0:19092
      # Address the broker advertises to clients that connect to the Kafka API.
      # Use the internal addresses to connect to the Redpanda brokers'
      # from inside the same Docker network.
      # Use the external addresses to connect to the Redpanda brokers'
      # from outside the Docker network.
      - --advertise-kafka-addr internal://redpanda:9092,external://localhost:19092
      - --pandaproxy-addr internal://0.0.0.0:8082,external://0.0.0.0:18082
      # Address the broker advertises to clients that connect to the HTTP Proxy.
      - --advertise-pandaproxy-addr internal://redpanda:8082,external://localhost:18082
      - --schema-registry-addr internal://0.0.0.0:8081,external://0.0.0.0:18081
      # Redpanda brokers use the RPC API to communicate with each other internally.
      - --rpc-addr redpanda:33145
      - --advertise-rpc-addr redpanda:33145
      # Mode dev-container uses well-known configuration properties for development in containers.
      - --mode dev-container
      # Tells Seastar (the framework Redpanda uses under the hood) to use 1 core on the system.
      - --smp 1
      - --default-log-level=info
    image: docker.redpanda.com/redpandadata/redpanda:latest
    volumes:
      - redpanda:/var/lib/redpanda/data
    networks:
      - saladshooter
    ports:
      - 18081:18081
      - 18082:18082
      - 19092:19092
      - 19644:9644
  redpanda-console:
    image: docker.redpanda.com/redpandadata/console:latest
    networks:
      - saladshooter
    entrypoint: /bin/sh
    command: -c 'echo "$$CONSOLE_CONFIG_FILE" > /tmp/config.yml; /app/console'
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ["redpanda:9092"]
          schemaRegistry:
            enabled: true
            urls: ["http://redpanda:8081"]
        redpanda:
          adminApi:
            enabled: true
            urls: ["http://redpanda:9644"]
    ports:
      - 8080:8080
    depends_on:
      - redpanda
volumes:
  redpanda:
    driver: local
networks:
  saladshooter:
    driver: bridge

Then you can make a placeholder Faust application like:


from typing import Any
import faust

app = faust.App(
    "saladshooter",
    broker="kafka://localhost:19092/",
    store="memory://")

ingestion_topic = app.topic("saladshooter_uiaction_ingestion", internal=True)

@app.agent(ingestion_topic)
async def ingestor(event_stream: faust.StreamT[Any]) -> None:
    async for event in event_stream:
        print(event)

def main() -> None:
    # I set up structlog configuration here.
    
    import ddtrace
    ddtrace.patch(asyncio=True)
    
    faust.Worker(app, loglevel="info").execute_from_commandline()

if __name__ == "__main__":
    main()

j00bar avatar Aug 21 '24 20:08 j00bar

Thank you for the detailed replication code @j00bar. We'll look into it.

cc @majorgreys

emmettbutler avatar Aug 22 '24 17:08 emmettbutler

@emmettbutler Grateful to you for taking a look - and yeah, it's gotta be tough maintaining a wrapper library with literally infinite use cases and breadth of domain knowledge you'd have to assimilate, so glad I can make it easier to help us in this case!

j00bar avatar Aug 22 '24 19:08 j00bar

FYI: The same issue is happening on python 3.12:

/.venv/lib/python3.12/site-packages/mode/services.py:726: RuntimeWarning: coroutine 'Event.wait' was never awaited
Coroutine created at (most recent call last)
  File "/.venv/lib/python3.12/site-packages/hypercorn/asyncio/run.py", line 230, in _run
    loop.run_until_complete(main(shutdown_trigger=shutdown_trigger))
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 673, in run_until_complete
    self.run_forever()
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 640, in run_forever
    self._run_once()
  File "/usr/local/lib/python3.12/asyncio/base_events.py", line 1984, in _run_once
    handle._run()
  File "/usr/local/lib/python3.12/asyncio/events.py", line 88, in _run
    self._context.run(self._callback, *self._args)
  File "/dd_tracer/python/ddtrace/contrib/asyncio/patch.py", line 50, in traced_coro
    return await coro
  File "/.venv/lib/python3.12/site-packages/mode/services.py", line 873, in _execute_task
    await task
  File "/.venv/lib/python3.12/site-packages/faust/transport/consumer.py", line 899, in _commit_livelock_detector
    await self.verify_all_partitions_active()
  File "/.venv/lib/python3.12/site-packages/faust/transport/consumer.py", line 904, in verify_all_partitions_active
    await self.sleep(0)
  File "/.venv/lib/python3.12/site-packages/mode/services.py", line 723, in sleep
    self._stopped.wait(), timeout=want_seconds(n)
  pass
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

As a workaround, would it be possible to exclude the faust package from patching/tracing ?

konradr avatar Dec 12 '24 09:12 konradr

This issue has been automatically closed after a period of inactivity. If it's a feature request, it has been added to the maintainers' internal backlog and will be included in an upcoming round of feature prioritization. Please comment or reopen if you think this issue was closed in error.

github-actions[bot] avatar May 13 '25 00:05 github-actions[bot]

@j00bar @konradr Hi, can you check whether you still see the issue on ddtrace==3.7.0? It looks similar to what's described in #8907 and it was fixed in #13326

taegyunkim avatar May 13 '25 00:05 taegyunkim

Hi @taegyunkim Thank you for the update, I can confirm that using ddtrace==3.7.0 does solve the issue for us.

konradr avatar May 19 '25 07:05 konradr