faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Fixes #1412 with `TestKafkaBroker` behaviour where Consumer Groups weren't being respected

Open sifex opened this issue 1 year ago • 2 comments

Description

This fixes bug #1412 where distribution of messages on a topic does not get distributed to each consumer group when running the TestKafkaBroker.

Fixes #1412

Example:
This example should have 2 messages received to 2 subscribers.

@test_broker.subscriber(queue, group_id="group1")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="group2")
async def subscriber2(): ...

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 1

Where as if we have the same group_id, we only expect to receive it once.

@test_broker.subscriber(queue, group_id="same_group")
async def subscriber1(): ...

@test_broker.subscriber(queue, group_id="same_group")
async def subscriber2(): ...

assert subscriber1.mock.call_count == 1
assert subscriber2.mock.call_count == 0

Other fixes

  • It also fixes a small bug in DOCKER_COMPOSE_PROJECT project name generation where the users' username field contains a . character (which docker believes to be invalid naming).
  • Typo fix in .github/PULL_REQUEST_TEMPLATE.md

Type of change

  • [x] Bug fix (a non-breaking change that resolves an issue)

Checklist

  • [x] My code adheres to the style guidelines of this project (scripts/lint.sh shows no errors)
  • [x] I have conducted a self-review of my own code
  • [x] I have made the necessary changes to the documentation
  • [x] My changes do not generate any new warnings
  • [x] I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • [ ] Both new and existing unit tests pass successfully on my local environment by running scripts/test-cov.sh (No. Currently Segfaults on Local M1 MBP, will rely on GHA)
  • [x] I have ensured that static analysis tests are passing by running scripts/static-anaylysis.sh
  • [x] I have included code examples to illustrate the modifications

sifex avatar May 01 '24 10:05 sifex

Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.

@pytest.mark.asyncio()
    async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase):
        @rpc_broker.subscriber(queue)
        async def m(m):  # pragma: no cover
            return "1"
    
        async with rpc_broker:
            await rpc_broker.start()
            r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
    
>       assert r == "1"
E       AssertionError

imo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.

sifex avatar May 01 '24 10:05 sifex

Currently facing an issue where the RPC test assumes you can get a response back from a publisher? Not sure why this assumption was made.

@pytest.mark.asyncio()
    async def test_rpc(self, queue: str, rpc_broker: BrokerUsecase):
        @rpc_broker.subscriber(queue)
        async def m(m):  # pragma: no cover
            return "1"
    
        async with rpc_broker:
            await rpc_broker.start()
            r = await rpc_broker.publish("hello", queue, rpc_timeout=3, rpc=True)
    
>       assert r == "1"
E       AssertionError

imo I think publishers by definition can't return anything other than a ReceiveAck / similar response from the message queue – also not sure if I misunderstand the use case for RPC in this context.

RPC in TestClient allows you to validate your handler result without special publisher creation. I think, we should save this behavior. In you case, if user has the only last result - it should be OK for most cases: (the following pseudocode represents assuming bahavior)

result = None
for sub in subscribers:
    result = call_subscriber(...)
return result

Lancetnik avatar May 02 '24 19:05 Lancetnik

Should be good to go, applied ruff formatting + linting.

sifex avatar May 18 '24 22:05 sifex

I'll check it and solve conflicts with main soon. Thank you for the work!

Lancetnik avatar May 19 '24 06:05 Lancetnik