aiokafka
aiokafka copied to clipboard
asyncio client for kafka
**Describe the solution you'd like** Currently, there's no clear way to export metrics or traces running directly from the program. Opentelemetry allows `auto instrumentation` with [this python package](https://github.com/open-telemetry/opentelemetry-python-contrib/blob/ebe6d1804bccff184edb379d7780469762c7b854/instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py). If we...
I don't see an explanation of `TransactionManager`'s transaction_timeout_ms` in the docs. Is that value a timeout for the context? For example, if you use ``` async for msg in consumer:...
Hi, The seek to committed function has a small bug. It does not work when the last committed offset of a TopicPartition is 0 (i.e., nothing was ever committed). This...
### Changes Fixes # ### Checklist - [ ] I think the code is well written - [ ] Unit tests for the changes exist - [ ] Documentation reflects...
Once Kafka starts, it takes a few seconds to get ready to receive messages from producer. How to check this readiness from aiokafka client?
### Changes Fixes # ### Checklist - [X] I think the code is well written - [ ] Unit tests for the changes exist - [ ] Documentation reflects the...
Of course, I can get the messages with consumer in real time ```python async def test(): consumer = AIOKafkaConsumer("test", bootstrap_servers="localhost:9092") await consumer.start() try: async for message in consumer: return message.value.decode()...
### Changes Add type hints to the protocol module ### Checklist - [x] I think the code is well written - [ ] Unit tests for the changes exist -...
Hi, I have group of consumers that on startup connect to kafka and infinite consume to messages. And I've encountered a problem that when the address of the kafka changes,...
**Describe the bug** I was able to connect to Azure Event Hub but when I run producer and consumer and connect to a topic I see an Incoming (Sum) and...