faststream
faststream copied to clipboard
Fixes #1412 with `TestKafkaBroker` behaviour where Consumer Groups weren't being respected
Description
This fixes bug #1412 where distribution of messages on a topic does not get distributed to each consumer group when running the TestKafkaBroker.
Fixes #1412
Example:
This example should have 2 messages received to 2 subscribers.
@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...
@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...
assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1
Where as if we have the same group_id, we only expect to receive it once.
@test_broker.subscriber(queue, group_id="same_group")
async def subscriber1(): ...
@test_broker.subscriber(queue, group_id="same_group")
async def subscriber2(): ...
assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0
Other fixes
- It also fixes a small bug in
DOCKER_COMPOSE_PROJECTproject name generation where the users' username field contains a.character (which docker believes to be invalid naming). - Typo fix in
.github/PULL_REQUEST_TEMPLATE.md
Type of change
- [x] Bug fix (a non-breaking change that resolves an issue)
Checklist
- [x] My code adheres to the style guidelines of this project (
scripts/lint.shshows no errors) - [x] I have conducted a self-review of my own code
- [x] I have made the necessary changes to the documentation
- [x] My changes do not generate any new warnings
- [x] I have added tests to validate the effectiveness of my fix or the functionality of my new feature
- [ ] Both new and existing unit tests pass successfully on my local environment by running
scripts/test-cov.sh(No. Currently Segfaults on Local M1 MBP, will rely on GHA) - [x] I have ensured that static analysis tests are passing by running
scripts/static-anaylysis.sh - [x] I have included code examples to illustrate the modifications
Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.
@pytest.mark.asyncio()
async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase):
@rpc_broker.subscriber(queue)
async def m(m): # pragma: no cover
return "1"
async with rpc_broker:
await rpc_broker.start()
r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
> assert r == "1"
E AssertionError
imo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.
Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.
@pytest.mark.asyncio() async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase): @rpc_broker.subscriber(queue) async def m(m): # pragma: no cover return "1" async with rpc_broker: await rpc_broker.start() r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True) > assert r == "1" E AssertionErrorimo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.
RPC in TestClient allows you to validate your handler result without special publisher creation. I think, we should save this behavior. In you case, if user has the only last result - it should be OK for most cases: (the following pseudocode represents assuming bahavior)
result = None
for sub in subscribers:
result = call_subscriber(...)
return result
Should be good to go, applied ruff formatting + linting.
I'll check it and solve conflicts with main soon. Thank you for the work!