kafka-python icon indicating copy to clipboard operation
kafka-python copied to clipboard

Twisted interface

Open glyph opened this issue 9 years ago • 15 comments

Kafka-Python claims to be "async" but this seems to just mean "uses lots of threads to put your I/O into the background" and not actually "event-driven". It would be nice to have an interface which avoids the overhead (thread stacks, GIL switching, etc) and risks (sharing too much state, deadlocks, difficult to debug differing behavior at scale) of threads, but presents a similar interface.

I filed the ticket with the summary "twisted" because of my particular biases, but this should be done in a way which is as framework-neutral as possible, since supporting the asyncio stdlib module at some point in the future would probably be interesting as well, not to mention Tornado and Vanilla and so on.

Given that one would be consuming a queue, perhaps this would be a good landmark project for https://github.com/twisted/tubes which, despite its github URL, has an (almost) framework-neutral architecture and lots of good abstractions around flow control and propagating backpressure.

glyph avatar Feb 20 '16 02:02 glyph

I have no idea what the implications for the internals would be yet (I haven't looked too closely at the code) but in terms of the external interface, it would seem that there would be two core changes necessary:

  1. an interface to KafkaConsumer where the values are delivered to a callable rather than as the results of an iterable, like

    consumer = KafkaConsumer(value_deserializer=msgpack.dumps)
    @consumer.consume_into
    def a_message_arrived(message):
      "process it"
    
  2. an interface to KafkaProducer where send produces a Deferred (or asyncio or Tornado Future)

glyph avatar Feb 20 '16 02:02 glyph

This sounds great; I believe it would let people use Kafka along with the upcoming Django channels work!

paulproteus avatar Feb 20 '16 02:02 paulproteus

The goal of this project is to stay closely aligned with the official apache client. I don't have plans to implement a different architecture or make significant changes to the interface. The reason for that is not that I think one model is better than another. It is because maintaining a Kafka client that provides all the features that users expect is difficult. There are not a lot of unofficial drivers that come close. Staying in step with the Apache project is, in my opinion, the best approach until Kafka itself is fully matured and you won't need to constantly keep up with protocol changes, client state issues, and requirements necessary to provide ordering guarantees, coordination, etc.

I welcome an exploration of what it would take to deliver a viable kafka client for asyncio or twisted (or any other event loop framework). I am happy to comment on PRs / forks / POCs / etc. If you can provide unobtrusive wrapper classes to bridge the gaps, I would consider merging. Otherwise I think a deeper architectural change should be published as a separate client (perhaps txkafka).

dpkp avatar Feb 20 '16 08:02 dpkp

Follow up: obviously I'm a big twisted fan. And I'd love to help on txkafka. I just don't think this project (me) has capacity to take that on right now.

dpkp avatar Feb 20 '16 08:02 dpkp

@dpkp Understood! Believe me I know all about not having the bandwidth to take something on :). If I had the bandwidth to start up a new txkafka, after all, I probably wouldn't have filed this bug :). Might PyKafka be a better fit for this work?

glyph avatar Feb 20 '16 09:02 glyph

Some additional info: kafka-python mirrors the Apache (java) client. The low level client returns futures on send (which look a lot like Deferred) and processes network recv and runs callbacks / errbacks inside a custom poll() method that producer and consumer classes call into. When i started out I looked at using twisted or asyncio to manage this event loop, but could not come up with a good approach. Do you think there's a way to support twisted interface without changing this core design? My main concern is keeping up w changes to official client so that's why I'm hesitant to change the core loop.

@dpkp https://github.com/dpkp Understood! Believe me I know all about not having the bandwidth to take something on :). If I had the bandwidth to start up a new txkafka, after all, I probably wouldn't have filed this bug :). Might PyKafka be a better fit for this work?

— Reply to this email directly or view it on GitHub https://github.com/dpkp/kafka-python/issues/560#issuecomment-186545965.

dpkp avatar Feb 20 '16 18:02 dpkp

Do you think there's a way to support twisted interface without changing this core design? My main concern is keeping up w changes to official client so that's why I'm hesitant to change the core loop.

Almost.

The big change you have to make is not to adopt Twisted APIs like Deferred. The change you need to make is to change the I/O interface from one which calls recv to one which feeds in some data. The best example of this in a totally platform-agnostic library is the https://github.com/python-hyper/hyper-h2 library from @Lukasa.

Luckily, since recv can always return a short read – and if your code isn't dealing with that, it's buggy – there is a simple way to do this with your existing code: replace all real socket usage with a fake object whose recv raises EAGAIN if no data is available, and returns whatever data is available. Then, have an external API which feeds the data in and cranks the loop manually.

This does create a situation where calling those APIs directly can lead to the EAGAIN exception will bubble up to the caller. But you can address this in two ways:

  1. provide two implementations of I/O, one which blocks like right now, and one which pulls data in from the main loop to push it into a fake socket, turning the crank on a python-kafka object in Twisted's dataReceived method, for example, or
  2. use the async implementation of I/O by providing facade objects for use on non-main threads, and use something like Twisted's blockingCallFromThread to implement Future.get and your kafka subscriber iterator's next (py3: __next__) method.

Does this make sense?

glyph avatar Feb 20 '16 20:02 glyph

It does - thanks for the notes. I'll take a closer look. On Feb 20, 2016 12:27 PM, "Glyph" [email protected] wrote:

Do you think there's a way to support twisted interface without changing this core design? My main concern is keeping up w changes to official client so that's why I'm hesitant to change the core loop.

Almost.

The big change you have to make is not to adopt Twisted APIs like Deferred. The change you need to make is to change the I/O interface from one which calls recv to one which feeds in some data. The best example of this in a totally platform-agnostic library is the https://github.com/python-hyper/hyper-h2 library from @Lukasa https://github.com/Lukasa.

Luckily, since recv can always return a short read – and if your code isn't dealing with that, it's buggy – there is a simple way to do this with your existing code: replace all real socket usage with a fake object whose recv raises EAGAIN if no data is available, and returns whatever data is available. Then, have an external API which feeds the data in and cranks the loop manually.

This does create a situation where calling those APIs directly can lead to the EAGAIN exception will bubble up to the caller. But you can address this in two ways:

  1. provide two implementations of I/O, one which pulls data in from the main loop, turning the crank on a python-kafka object in Twisted's dataReceived method, for example, or
  2. use the async implementation of I/O by providing facade objects for use on non-main threads, and use something like Twisted's blockingCallFromThread https://twistedmatrix.com/documents/15.5.0/api/twisted.internet.threads.html#blockingCallFromThread to implement Future.get and your kafka subscriber iterator's next (py3: next) method.

Does this make sense?

— Reply to this email directly or view it on GitHub https://github.com/dpkp/kafka-python/issues/560#issuecomment-186676385.

dpkp avatar Feb 21 '16 01:02 dpkp

I wrote 'afkak' (A twisted up kafka) awhile back and got permission to open source it: https://github.com/ciena/afkak It still needs Docs, and there's probably other warts, but at least it's a start.

rthille avatar Mar 14 '16 22:03 rthille

@rthille - thanks so much for the pointer!

glyph avatar Mar 15 '16 04:03 glyph

@glyph I'd love feedback on how to improve it. We've got internal customers using it now, so I have to keep the interface fairly stable, but can evolve it to be better. It was my first "real" Twisted project.

rthille avatar Mar 15 '16 17:03 rthille

@glyph @dpkp Async hooks that could be Twisted-friendly would be really great. Writing a dedicated "txKafka" seems destined to always lag behind the current Kafka protocol (as too often happens with Twisted drivers for things like Cassandra), whereas hooking into an async-friendly pattern for kafka-python to provide just the Twisted-specific transport seems a lot more maintainable.

williamsjj avatar Apr 19 '16 08:04 williamsjj

Hello! A week ago we released initial version for aiokafka: https://github.com/aio-libs/aiokafka Read docs at: http://aiokafka.readthedocs.org/en/stable/ Currently full support and tested for Kafka 9.0. Feedback greatly appreciated.

TarasLevelUp avatar Apr 22 '16 14:04 TarasLevelUp

Hello there, I know this is a bit old issue. Still, I have just noticed that SQLAlchemy is releasing an asyncio interface (this November 2021) that uses the same synchronous codebase, reuses the test suite and offers the same functionality [1].

According to their announcement: The approach involves a single greenlet context switch per await call, and the extension which makes it possible is less than 20 lines of code [1]. So they are using greenlet (not gevent nor eventlet) to make it work. Digging a bit more, I have found the solution was initially discussed here where you can find this snippet to play and experiment. I think the snippet is very illustrative and helps to properly understand how it works.

Using the green_spawn and green_await of the snippet above, the main idea would be as follows. Create an async interface (e.g., AsyncKafkaConsumer) that calls the sync interface (the current one) using the green_spawn. Create an async interface for the I/O operations (one that uses async def functions). As the synchronous code can't call async functions, wrap the calls using green_await (only when the AsyncKafkaConsumer is in use, otherwise the current sync I/O code should be called). And "that's all". The async I/O interface could be based on the native asyncio-stream or 3rd party solutions.

[1] SQLAlchemy initial feature announcement [2] SQL Alchemy Asynchronous I/O (asyncio)

emarbo avatar Nov 18 '21 09:11 emarbo

I don't think this project is the right place to add Twisted support- afkak and aiokafka have their own ways of being integrated with Twisted.

wbarnha avatar Aug 05 '23 03:08 wbarnha