Testing HTTP endpoints - Producer service not yet started
First of all, thank you so much to all the maintainers of this project, I have been using in production for 6 months now and it's really great !
Checklist
- [X] I have included information about relevant versions
- [X] I have verified that the issue persists when using the
masterbranch of Faust.
Steps to reproduce
I have a Faust HTTP endpoint that returns an object from a Faust table, using the id passed in the url, as defined below. This works perfectly fine but I am running into an error while writing unittests for it using pytest
# agents.py
from faust.web import Request, Response, View
from app import app
# table
devices = app.Table("devices", default=dict, partitions=8)
@app.page("/devices/{device_id}/")
class DeviceDetailView(View):
"""Retrieve a specific device"""
@app.table_route(table=devices, match_info="device_id")
async def get(self, request: Request, device_id) -> Response:
"""Retrieve an existing device"""
device = devices.get(device_id)
if device:
return self.json(device)
raise self.NotFound("Cannot find this device")
Here is the conftest:
# conftest.py
import pytest
from simple_settings import settings
from app import app
@pytest.mark.asyncio()
@pytest.fixture()
def event_loop():
"""Force pytest to use Faust's asyncio event loop as the default global loop"""
yield app.loop
@pytest.fixture()
def test_app(event_loop):
"""Passing in event_loop helps avoid 'attached to a different loop' error"""
app.conf.store = "memory://"
app.finalize()
app.flow_control.resume()
return app
@pytest.fixture()
def loop(event_loop):
"""Force pytest-aiohttp to use the same event loop"""
return event_loop
@pytest.fixture()
def web(test_app):
test_app.web.init_server()
return test_app.web
@pytest.fixture()
def web_client(loop, aiohttp_client, web):
try:
yield aiohttp_client(web.web_app)
finally:
# Cleanup threads started by loop.run_in_executor
# at shutdown.
if loop._default_executor is not None:
loop._default_executor.shutdown()
And the test itself:
@pytest.mark.asyncio
async def test_api_get_device(web_client, auth_header):
"""Retrieve device from API"""
device_id = "00000"
device = { "device_id": device_id, "name": "testdevice", "type": "testdevice"}
# manually add the device to the table
devices.data.data[device_id] = device
async with await web_client as client:
resp = await client.get(f"/devices/{device_id}/")
payload = await resp.json()
Expected behavior
We expect the test to pass
Actual behavior
A faust exception is thrown, saying that the "Producer service not yet started". I assume this is due to invalid configuration on my end, but there is nothing in the Faust docs regarding this scenario. Any help would be greatly appreciated ! :)
Full traceback
Traceback (most recent call last):
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/aiohttp/web_protocol.py", line 422, in _handle_request
resp = await self._request_handler(request)
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/aiohttp/web_app.py", line 499, in _handle
resp = await handler(request)
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/web/drivers/aiohttp.py", line 247, in _dispatch
return await handler(request)
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/web/views.py", line 82, in __call__
return await self.dispatch(request)
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/web/views.py", line 98, in dispatch
response = await method(cast(Request, request), **kwargs)
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/app/base.py", line 1348, in get
table.name, key, view.web, request
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/app/router.py", line 66, in route_req
dest_url: URL = app.router.key_store(table_name, key)
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/app/router.py", line 30, in key_store
return self._assignor.key_store(topic, k)
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/assignor/partition_assignor.py", line 392, in key_store
return URL(self._tps_url[self.app.producer.key_partition(topic, key)])
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 1376, in key_partition
producer = self._ensure_producer()
File "/home/tartieret/Dev/downtime-reporting-streaming/venv/lib/python3.7/site-packages/faust/transport/drivers/aiokafka.py", line 1257, in _ensure_producer
raise NotReady("Producer service not yet started")
faust.exceptions.NotReady: Producer service not yet started
Versions
- Python version: 3.7
- Faust version: 0.5.2
- Operating system: Ubuntu 20
- Kafka version:
- RocksDB version (if applicable)
@tartieret when I run this code none of my endpoints show up in the web object produced. have you modified this code anytime or is this what you use still?
Quick workaround
from faust.exceptions import SameNode
app.router.route_req = AsyncMock(side_effect=SameNode)