pulsar-client-python icon indicating copy to clipboard operation
pulsar-client-python copied to clipboard

Python3 AsyncIO implementation

Open houyingkun opened this issue 2 years ago • 13 comments
trafficstars

Copy from: https://github.com/apache/pulsar/issues/6308

Is your feature request related to a problem? Please describe. Implement python3 await/ async and using the event loop in order to save IO waiting time

Describe the solution you'd like Using async, await and execute via the event loop

Describe alternatives you've considered Using threads although the GIL preventing pure multi threading

houyingkun avatar Nov 28 '22 03:11 houyingkun

This package already provides asynchronicity (Producer.send_async and the message_listener argument to the client).

While those don't provide async or await syntactic constructs, they are asynchronous (nonblocking) and can be integrated with Python's asyncio (or trio etc.) futures to provide awaitable tokens.

How would async/await syntax save on IO waiting time if this client is already asynchronous? Something would still have to wait on I/O as is already being done.

zbentley avatar May 05 '23 13:05 zbentley

I think a larger point to possibly be made here is the difficulty of using pulsar with other libraries without these features. For example, consider a user trying to write a simple FastAPI server that simply exposes a REST endpoint for a client that gives that latest value in a topic.

If they were using Kafka, they could use broadcaster (like suggested here), or they could use aiokafka (an example of this would be here and here). These third party libraries that allow background processing generally require coroutines to be passed in and don't support synchronous functions because that's obviously a nightmare with python and GIL.

I'm not even sure on how to nicely implement the FastAPI example with Pulsar client (apart from implementing blocking threads), or if the increased complexity means that for many Python developers they might simply see the provided kafka solutions and be swayed.

Samreay avatar May 09 '23 04:05 Samreay

@zbentley Integrating this client with asyncio is frustrating, because it's not entirely clear which thread things are running in and much of asyncio is not threadsafe. In order to get these callbacks to work with asyncio you have to resort to low level threadsafe facilities of asyncio, making it far less trivial to safely integrate this client with asyncio than you portray.

For example if you want to use send_async. Intuition might lead you to just pass a future into that callback, and set the result of that future when you're done. You cannot do that, because Future is not threadsafe. You instead need to call future.set_result using loop.call_soon_threadsafe(future.set_result, res) to ensure that set_future is called from the main thread. This is actually a simpler example, whereas wrapping message_listener callbacks to support async callbacks is even more frustrating. Basically you're always forced to use low-level threadsafe facilities to schedule into the main threads event loop.

I think the argument that this is strictly syntactical is simply not true. These concurrency models are fundamentally different. Pulsar is using a threaded blocking network model, where threads handle blocking operation as to not block the main thread, these threads are put to sleep by the OS when they make a blocking syscall. Asyncio is driven by an event loop and non-blocking operations running on a single thread. I understand if an asyncio wrapper is outside the scope of this project, but these concurrency models diverge on more than just syntax, and integrating with asyncio is not trivial or straightforward especially for most python developers who may not understand the low level details of how asyncio works.

tsturzl avatar Oct 12 '23 16:10 tsturzl

Assign this issue to myself. I wrote a demo locally and I'm going to start this work.

image

BewareMyPower avatar Dec 28 '23 13:12 BewareMyPower

I see the asyncio module in the reference docs (https://pulsar.apache.org/api/python/3.5.x/pulsar.asyncio.html). However, I get ModuleNotFoundError: No module named 'pulsar.asyncio' when I try to import. pip installing "pulsar-client[all]==3.5.0" doesn't help.

Can someone suggest the proper way to access these classes using the Python Pulsar Client?. Info @BewareMyPower . Does it require a custom build from src?

daveqs avatar Aug 24 '24 00:08 daveqs

It might be something wrong with the release process, let me check it next week.

BewareMyPower avatar Aug 24 '24 06:08 BewareMyPower

I can find this module when I installed it:

>>> pulsar.__file__
'/opt/homebrew/lib/python3.12/site-packages/pulsar/__init__.py'
>>> import pulsar.asyncio
>>> pulsar.__version__
'3.5.0'

Could you check if you installed an old version that is not removed? @daveqs

BewareMyPower avatar Aug 29 '24 11:08 BewareMyPower

Is there any plan to support asyncio for consumer?

NiuBlibing avatar Sep 18 '24 06:09 NiuBlibing

No plan for now

BewareMyPower avatar Sep 18 '24 10:09 BewareMyPower

No plan for now

sounds a bad news.

NiuBlibing avatar Sep 19 '24 04:09 NiuBlibing

Hi guys, i am currently implementing the consumer using asyncio in my fork and would like to open a pull request as soon as its finished. I used the c++ clients Async methods in c++ and futures in python (in the same way as the producer) to archive nonblocking methods. As soon as the code is ready help and feedback would be greatly appreciated.

Nictec avatar Sep 19 '24 17:09 Nictec