streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Write to Kafka from Dask Stream

Open jsmaupin opened this issue 4 years ago • 0 comments

It seems like the exiting PR for this needs improvements. I'm creating this issue to invite discussion on now to properly solve this problem.

I would like to use the existing back pressure handling implementation in the existing to_kafka function. I'm thinking that we can just call the existing to_kafka.update from Dask. It looks something like this:

@DaskStream.register_api()
class to_kafka(DaskStream, core.to_kafka):
    def update(self, x, who=None, metadata=None):
        client = default_client()
        result = client.submit(to_kafka.update, self, x, who, metadata)
        yield self._emit(result)

However, there's still the issue of the Kafka connection. It seems Dask attempts to pickle the Kafka producer and push it the worker. This fails due to the Confluent producer using some sort of lazy instantiation. In my previous attempt to make this work, I was able to create the producer using the getattr workaround. It also attaches the producer to the worker in order to maintain the connection.

def get_producer(config):
    w = get_worker()
    if hasattr(w, 'producer'):
        return w.producer

    import confluent_kafka as ck
    w.producer = getattr(ck, 'Producer')(config)
    return w.producer

This seems very much like a hack to me, and I would prefer a better solution. If anyone has more expertise on Dask, any help would be appreciated.

jsmaupin avatar Mar 19 '20 19:03 jsmaupin