Denis Otkidach
Denis Otkidach
This depends on your intent: `except UnknownProducerId` works only this specific error, while `except BrokerResponseError` catches exception for all error responses.
> Do we have any more committers on this project or can this be considered a dead project now that @tvoinarovskyi does not have the bandwidth? Do you mean ones...
Right. I have commit/merge rights and either @tvoinarovskyi or @asvetlov is hopefully still responsive to approve releases. But we need somebody competent and motivated enough to dive into current issues/PRs.
> I'm curious if there is anybody currently working on typings for the aiokafka library. > I see that black formatting is included in the list provided by @tvoinarovskyi. Could...
> We are still seeing this issue in 0.7.2. The example here doesn't look like working code, neither `AIOKafkaConsumer.start()` nor `AIOKafkaProducer.start()` return future. And it would be very helpful if...
> We think it could be the issue is in python 3.8 and 3.9 with using wait_for (https://bugs.python.org/issue42130). Could you then try it with current code in master please? It...
Messages are consumed for each topic+partition independently, offset is an index in queue for particular topic+partition. Where are going to get `starting_offset` from? Do you store somewhere the offset where...
Your code is right, but it's unreliable: `consumer.assignment()` may return empty or incomplete result shortly after the start or during rebalancing. A possible solution could be calling `await consumer.assign([tp])` explicitly.
Subscription to topics assumes automatic partition assignment, while you assign them manually. Just provide no topics on `AIOKafkaConsumer` initialisation. The for loop looks very strange. Why do you get `tp`...
BTW, kafka-python does support customisation via `socket_options` parameter: https://github.com/dpkp/kafka-python/blob/6fc008137c75c751a9fbea3e0ef36d2870119c7b/kafka/conn.py#L135-L137 https://github.com/dpkp/kafka-python/blob/6fc008137c75c751a9fbea3e0ef36d2870119c7b/kafka/conn.py#L373-L375 With asyncio this means we have to create socket ourselves instead of relying on what `loop.create_connection()` does.