aiokafka
aiokafka copied to clipboard
[QUESTION] How do I use a topic like a database table and query it as fast as I possibly can?
Using offsets_for_times, I'd like to be able to query all message between a certain time range, as fast as I can.
Using the confluent-kafka-python client, I was able to do this at about 1415.25348 messages/second, with a message size of about 1.8 MB.
I would like to run my kafka consumer like a short-lived database query.
I understand I can run multiple instances of my consumer as a consumer group to increase parallelism, but I would like to run a single instance in a multi-threaded fashion. If my topic has 3 partitions, I would like to consume messages using three consumers. I'm not quite sure how to do this from a single instance. I'm trying to create a library that another engineer can use to query a topic between a time range, as already mentioned above.
Please not that Kafka is not designed for random access to topics, thus old messages will not be in a cache on the broker. So you will get a laggy start no matter the solution (including parallel consumption). If you plan to do a lot of such queries it will be bad for the Brokers, at least make sure they are running SSD and optimized for such a use-case.
Apart from the Broker side, you want a quite straight forward consume in a range of offsets, which is not hard, but all Kafka Consumers are designed to only have 1 consuming position at a time. Ie. you will have to either consume 1 range at a time or create several consumers to get more concurrency (say 4 clients to get up to 4 parallel queries). Something like this should do the trick for 1 consumer:
import asyncio
from typing import AsyncIterator
from datetime import datetime, timedelta
from aiokafka import AIOKafkaConsumer, ConsumerRecord, TopicPartition
loop = asyncio.get_event_loop()
TOPIC = "test-topic"
POLL_TIMEOUT_MS = 500
async def consumer_for_time(
consumer: AIOKafkaConsumer,
start_time: datetime,
end_time: datetime
) -> AsyncIterator[ConsumerRecord]:
start_ts_ms = int(start_time.timestamp() * 1000)
end_ts_ms = int(end_time.timestamp() * 1000 + 1)
partitions = set(
TopicPartition(TOPIC, part) for part in consumer.partitions_for_topic(TOPIC)
)
start_offsets = await consumer.offsets_for_times(
{tp: start_ts_ms for tp in partitions}
)
print("Start offsets", start_offsets)
end_offsets = await consumer.offsets_for_times(
{tp: end_ts_ms for tp in partitions}
)
print("End offsets", end_offsets)
for tp, offset_meta in start_offsets.items():
# Will be None if topic truncated to that period already
if offset_meta is None:
await consumer.seek_to_beginning(tp)
else:
consumer.seek(tp, offset_meta.offset)
# `async for` would be more pythonic, but getmany is faster
while partitions:
messages = await consumer.getmany(*partitions, timeout_ms=POLL_TIMEOUT_MS)
for tp, batch in messages.items():
for msg in batch:
if msg.timestamp <= end_ts_ms:
yield msg
else:
# We already found all messages for this partition - stop searching
partitions.remove(tp)
break
for tp in partitions:
# 1 case we need to cover is fetching from partition that yet did not
# receive any new messages after `end_time`. We can do this by checking
# for highwater.
if consumer.highwater(tp) == await consumer.position(tp):
partitions.remove(tp)
async def main():
# Init the consumer once on start.
consumer = AIOKafkaConsumer(
TOPIC, loop=loop, bootstrap_servers='localhost:9092')
await consumer.start()
try:
async for msg in consumer_for_time(
consumer,
datetime.now() - timedelta(minutes=1000),
datetime.now() - timedelta(minutes=20)
):
print(msg)
finally:
# Don't forget to close it after you are done.
await consumer.stop()
loop.run_until_complete(main())
While confluent-kafka-python is way faster for small messages, as it's completely written in C, I do believe you should be able to get more or less same throughput in aiokafka for large messages like 1.8MB. If you have issues with the speed - please let me know, there may be some timing issues because the use case is quite unique.
@tvoinarovskyi thank you for that code. I was able to do some benchmarks and I was able to achieve 939.6426457 records/sec with your code. Sadly, the confluent-kafka-python client was able to achieve 1118.175027 records/sec.
I'm still now clear on how to divide the work between multiple consumers from a single Python entry point. I would need to create my multiple consumers within the consumer_for_time function?
@zzztimbo No, you would need to write some simple ConsumerPool that would be able to get a consumer for the job and put it back after it's finished. You call consumer_for_time on the consumer you got. Similar to how you would have a ConnectionPool in a database.
The speed difference seems ok, I would expect Confluent to process the messages a bit faster. Still, while 1000 is not a large number, it seems like you are bottlenecking on socket configuration at this point. What network connection does your cluster provide? 10GB?
You could try working with socket.nagle.disable, socket.send.buffer.bytes and socket.receive.buffer.bytes configuration in confluent_kafka_python, you should be able to top this benchmark up to your bandwidth limit. See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md.