rele
rele copied to clipboard
Add native async support for subscriptions
Currently, async functions can't be used with the @sub context manager.
@sub(topic='my-topic'):
async def async_handler(data, **kwargs):
print('data', data)
If you try, you'll get the following error.
Configuring worker with 1 subscription(s)...
dicom_uploads - handle_upload
/usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
callback(message)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
There are some simple workarounds, but it would be nice if this was supported natively.
@sub(topic='my-topic'):
def sync_handler(data, **kwargs)
return asyncio.run(async_handler(data, **kwargs))
async def async_handler(data, **kwargs):
print('data', data)
Curious about the scope of this. Is this just making the sub async compatible, or the worker as well?
I havent delved into it, but do you know if the google cloud pubsub library supports async as well?
It looks like the gcp pub/sub library doesn't support async yet - https://github.com/googleapis/python-pubsub/issues/389
What are the tradeoffs of making the worker async-compatible vs just the sub?
It's probably more complicated than this, but I was thinking you could just check if self.func was async and use asyncio.run(or similar) here. https://github.com/mercadona/rele/blob/5d05758ede1e9f2b68a9e752e4f075762bb17351/rele/subscription.py#L90
What are the tradeoffs of making the worker async-compatible vs just the sub?
The subscriptions are essentially callbacks that are run within the worker. As such, a naive approach could be something you suggest with asyncio.run
.
Unfortunately, if the GCP client does not support async, it will be nearly impossible to support async at a higher level.
Out of curiosity, and not being super familiar with async python, whats the benefit of having an async function in a background worker? I also assume if you are going to run async.run
, it runs the async function synchronously anyways.
I also assume if you are going to run async.run, it runs the async function synchronously anyways.
Basically
whats the benefit of having an async function in a background worker?
If you want to be able to run TONS of concurrent callbacks that do exclusively io, I think you'd need native support for async.
For us, we're less concerned with the performance implications and just want to be able to inter-operate with async functions in our codebase.
Understandable if this isn't a priority right now, but wanted to file an issue since it seems like this library is mostly about ergonomics.
Not too sure if itll work, but one attempt you can try is sub-classing the Subscription class like here but with the asyncio.run in the call method.
from rele import Subscription
class DoSomethingSub(Subscription):
topic = 'photo-uploaded'
def __init__(self):
self._func = self.callback_func
super().__init__(self._func, self.topic)
async def callback_func(self, data, **kwargs):
return await asyncio.sleep(2)
def __call__(self, data, **kwargs):
return asyncio.run(self._func(data, **kwargs))
Makes sense. I think I'll stick with the workaround for now. It's actually not so bad.
Appreciate all the help ❤️