streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Reuse Kafka consumer for polling next batch

Open skmatti opened this issue 5 years ago • 4 comments

I would like to propose an idea of having cached consumers for FromKafkaBatched class and reuse consumers for getting next batch.

The current FromKafkaBatched class create a new Kafka consumer for a every new batch which may introduce a lot of overhead. I have looked into Spark integration with Kafka and want to reference the following text from there,

The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.

This means FromKafkaBatched class holds the references to consumers created initially, and reuse them to poll next batch. Also, this makes sense from Kafka point of view as it can have as many concurrent consumers as the number of partitions in a topic.

But I am not sure how this can be handled when stream is running in dask mode. Dask scheduler should be able to redirect request for new batches to appropriate consumer on dask workers.

skmatti avatar Feb 28 '19 01:02 skmatti

Sure, caching recent consumers seems reasonable to me. Is this a large performance cost today? Roughly how much of our time do we spend creating a new consumer?

On Wed, Feb 27, 2019 at 5:11 PM Satish Kumar Matti [email protected] wrote:

I would like to propose an idea of having cached consumers for FromKafkaBatched class and reuse consumers for getting next batch.

The current FromKafkaBatched class create a new Kafka consumer for a every new batch which may introduce a lot of overhead. I have looked into Spark integration with Kafka and want to reference the following text from there https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#locationstrategies ,

The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.

This means FromKafkaBatched class holds the references to consumers created initially, and reuse them to poll next batch. Also, this makes sense from Kafka point of view as it can have as many concurrent consumers as the number of partitions in a topic.

But I am not sure how this can be handled when stream is running in dask mode. Dask scheduler should be able to redirect request for new batches to appropriate consumer on dask workers.

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/mrocklin/streamz/issues/225, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszFNy9bdtd3y86TOAqRDzHG2T6I6kks5vRyzYgaJpZM4bVytC .

mrocklin avatar Feb 28 '19 01:02 mrocklin

Creating a consumer itself may not be attributed to a lot of this large performance cost, but assigning to a particular partition and seeking to an offset may be. I have not tested in streamz set up, when I tested outside it took a few seconds to poll the first message and only a few milliseconds to poll subsequent messages. But the trick here is to schedule jobs corresponding to a partition on same dask worker because the consumer on this worker is assigned and seeked to the last polled offset already.

skmatti avatar Feb 28 '19 19:02 skmatti

@jsmaupin I would be happy to hear your feedback on this.

skmatti avatar Feb 28 '19 19:02 skmatti

Every time a client is created, be it producer or consumer, there is a process where it fetches the topic metadata and the initiates new connections to each node using the info in the meta-data. If this is what we're talking about here, then creating a consumer comes with a fair amount of overhead.

jsmaupin avatar Mar 01 '19 21:03 jsmaupin