faust icon indicating copy to clipboard operation
faust copied to clipboard

Testing HTTP endpoints - Producer service not yet started

Open tartieret opened this issue 4 years ago • 2 comments

First of all, thank you so much to all the maintainers of this project, I have been using in production for 6 months now and it's really great !

Checklist

  • [X] I have included information about relevant versions
  • [X] I have verified that the issue persists when using the master branch of Faust.

Steps to reproduce

I have a Faust HTTP endpoint that returns an object from a Faust table, using the id passed in the url, as defined below. This works perfectly fine but I am running into an error while writing unittests for it using pytest

# agents.py
from faust.web import Request, Response, View

from app import app

# table
devices = app.Table("devices", default=dict, partitions=8)

@app.page("/devices/{device_id}/")
class DeviceDetailView(View):
    """Retrieve a specific device"""

    @app.table_route(table=devices, match_info="device_id")
    async def get(self, request: Request, device_id) -> Response:
        """Retrieve an existing device"""
            device = devices.get(device_id)
            if device:
                return self.json(device)

        raise self.NotFound("Cannot find this device")

Here is the conftest:

# conftest.py

import pytest
from simple_settings import settings
from app import app

@pytest.mark.asyncio()
@pytest.fixture()
def event_loop():
    """Force pytest to use Faust's asyncio event loop as the default global loop"""
    yield app.loop

@pytest.fixture()
def test_app(event_loop):
    """Passing in event_loop helps avoid 'attached to a different loop' error"""
    app.conf.store = "memory://"
    app.finalize()
    app.flow_control.resume()
    return app

@pytest.fixture()
def loop(event_loop):
    """Force pytest-aiohttp to use the same event loop"""
    return event_loop

@pytest.fixture()
def web(test_app):
    test_app.web.init_server()
    return test_app.web

@pytest.fixture()
def web_client(loop, aiohttp_client, web):
    try:
        yield aiohttp_client(web.web_app)
    finally:
        # Cleanup threads started by loop.run_in_executor
        # at shutdown.
        if loop._default_executor is not None:
            loop._default_executor.shutdown()

And the test itself:

@pytest.mark.asyncio
async def test_api_get_device(web_client, auth_header):
    """Retrieve device from API"""
    device_id = "00000"
    device = { "device_id": device_id, "name": "testdevice", "type": "testdevice"}
    # manually add the device to the table
    devices.data.data[device_id] = device
    async with await web_client as client:
        resp = await client.get(f"/devices/{device_id}/")
        payload = await resp.json()
        

Expected behavior

We expect the test to pass

Actual behavior

A faust exception is thrown, saying that the "Producer service not yet started". I assume this is due to invalid configuration on my end, but there is nothing in the Faust docs regarding this scenario. Any help would be greatly appreciated ! :)

Full traceback

Traceback (most recent call last):
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 422, in _handle_request
    resp = await self._request_handler(request)
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/aiohttp/web_app.py", line 499, in _handle
    resp = await handler(request)
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/web/drivers/aiohttp.py", line 247, in _dispatch
    return await handler(request)
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/web/views.py", line 82, in __call__
    return await self.dispatch(request)
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/web/views.py", line 98, in dispatch
    response = await method(cast(Request, request), **kwargs)
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/app/base.py", line 1348, in get
    table.name, key, view.web, request
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/app/router.py", line 66, in route_req
    dest_url: URL = app.router.key_store(table_name, key)
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/app/router.py", line 30, in key_store
    return self._assignor.key_store(topic, k)
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/assignor/partition_assignor.py", line 392, in key_store
    return URL(self._tps_url[self.app.producer.key_partition(topic, key)])
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 1376, in key_partition
    producer = self._ensure_producer()
  File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 1257, in _ensure_producer
    raise NotReady("Producer service not yet started")
faust.exceptions.NotReady: Producer service not yet started

Versions

  • Python version: 3.7
  • Faust version: 0.5.2
  • Operating system: Ubuntu 20
  • Kafka version:
  • RocksDB version (if applicable)

tartieret avatar Feb 23 '21 01:02 tartieret

@tartieret when I run this code none of my endpoints show up in the web object produced. have you modified this code anytime or is this what you use still?

kurnal avatar Mar 04 '21 04:03 kurnal

Quick workaround

from faust.exceptions import SameNode
app.router.route_req = AsyncMock(side_effect=SameNode)

dzlabs avatar Jul 15 '21 21:07 dzlabs