aiokafka
aiokafka copied to clipboard
feat: rebalance listener added to Consumer constructor
Changes
Fixes https://github.com/aio-libs/aiokafka/issues/842
- Now it is possible to set the
ConsumerRebalanceListenerwhen aConsumerinstance is created. - The validation is made in the
SubscriptionState._validate_rebalance_listenermethod. - The
RebalanceListenerCT(TypeVar) was created. Using this, we can say that the listener should be a subtype of the abstractConsumerRebalanceListener sphinx_autodoc_typehintsextension was added in order to make possible theTypeVardocumentation. If you prefer not to useTypeVarwe can remove this and theRebalanceListenerCT- Some typing added
Checklist
- [x] I think the code is well written
- [x] Unit tests for the changes exist
- [x] Documentation reflects the changes
- [x] Add a new news fragment into the
CHANGESfolder
This pull request introduces 1 alert when merging 38d896c553367c303a265a8d264d393a714a221a into 6360747c4305d1d096d5e55037d60a12c4c9ab15 - view on LGTM.com
new alerts:
- 1 for Missing call to `__init__` during object initialization
This pull request introduces 1 alert when merging 126a30728684174e5c269d3b4670f7ae858a1865 into 6360747c4305d1d096d5e55037d60a12c4c9ab15 - view on LGTM.com
new alerts:
- 1 for Missing call to `__init__` during object initialization
Hi @marcosschroh, Thank you for the contribution, could you elaborate as to why would it be essential to have a listener on creation time? As you can guess most listeners would require the back link to Consumer class, so as per Java's API we modeled the subscribe method to be the only place this can be passed. I feel like changing the API to set "consumer" is not completely backward compatible and I would like to have a good reason to include it. Thanks!
For example, there can be code, where listener subclasses have "consumer" property, which is not settable.
Hi @tvoinarovskyi
Thanks for taking a look. I see 2 problems here:
- Why do I need to always use the
Consumer.subscribemethod if I want to use aRebalanceListener? It is a bit weird because I can consume immediately after creating aConsumer. In fact, theConsumer.__init__method calls the_subscription.subscribe(...)which accepts aRebalanceListener, so it should be possible to set alistenerduring consumer creation time.
As you said:
most listeners would require the back link to Consumer class
I could see that when I saw the examples and tests. Most of the cases (probably always) you need a link to the consumer from the RebalanceListener instance, which made me realized that is only possible to set a RebalanceListener using the subscribe because the consumer instance is required (chicken/egg). This is leading to the second problem/question:
- Looks like the class
aiokafka.abc.ConsumerRebalanceListenershould have adef __init__(Optional[Consumer])method. In all the test cases and examples the same code is repeated:
class RebalanceListener(ConsumerRebalanceListener):
def __init__(self, consumer): # Always repeat the same
self.consumer = consumer
...
There we can see that the end user always has to include the __init__(consumer) method.
Another problem (out of this scope) is that the interface defines sync methods, but in examples and test cases always async methods are defined (async def on_partitions_revoked and async def on_partitions_assigned).
This PR adds the def __init__(Optional[consumer] = None): ... to the class aiokafka.abc.ConsumerRebalanceListener, so end users won't have to define the RebalanceListener.__init__(consumer): ... every time . Second, it will be possible to set a listener on consumer creation time as you have a reference to self (Consumer instance).
This change shouldn't affect current users because they are already defining the RebalanceListener.__init__(consumer) and it will made things easier for new users that want to use a RebalanceListener. The consumer property will be always present, regardless whether the listener was set during consumer creation time or using the subscribe method.
Other alternative, could be that the callbacks receive the consumer together with the TopicPartitions:
class RebalanceListener(ConsumerRebalanceListener):
async def on_partitions_assigned(self, consumer, tp) -> None
...
I hope my explanation makes sense.