rele icon indicating copy to clipboard operation
rele copied to clipboard

Add native async support for subscriptions

Open csaroff opened this issue 2 years ago • 6 comments

Currently, async functions can't be used with the @sub context manager.

@sub(topic='my-topic'):
async def async_handler(data, **kwargs):
    print('data', data)

If you try, you'll get the following error.

Configuring worker with 1 subscription(s)...
  dicom_uploads - handle_upload
/usr/local/lib/python3.9/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py:126: RuntimeWarning: coroutine 'handle_upload' was never awaited
  callback(message)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

There are some simple workarounds, but it would be nice if this was supported natively.

@sub(topic='my-topic'):
def sync_handler(data, **kwargs)
    return asyncio.run(async_handler(data, **kwargs))

async def async_handler(data, **kwargs):
    print('data', data)

csaroff avatar Sep 30 '22 04:09 csaroff

Curious about the scope of this. Is this just making the sub async compatible, or the worker as well?

I havent delved into it, but do you know if the google cloud pubsub library supports async as well?

andrewgy8 avatar Sep 30 '22 13:09 andrewgy8

It looks like the gcp pub/sub library doesn't support async yet - https://github.com/googleapis/python-pubsub/issues/389

What are the tradeoffs of making the worker async-compatible vs just the sub?

It's probably more complicated than this, but I was thinking you could just check if self.func was async and use asyncio.run(or similar) here. https://github.com/mercadona/rele/blob/5d05758ede1e9f2b68a9e752e4f075762bb17351/rele/subscription.py#L90

csaroff avatar Sep 30 '22 18:09 csaroff

What are the tradeoffs of making the worker async-compatible vs just the sub?

The subscriptions are essentially callbacks that are run within the worker. As such, a naive approach could be something you suggest with asyncio.run.
Unfortunately, if the GCP client does not support async, it will be nearly impossible to support async at a higher level.
Out of curiosity, and not being super familiar with async python, whats the benefit of having an async function in a background worker? I also assume if you are going to run async.run, it runs the async function synchronously anyways.

andrewgy8 avatar Oct 04 '22 13:10 andrewgy8

I also assume if you are going to run async.run, it runs the async function synchronously anyways.

Basically

whats the benefit of having an async function in a background worker?

If you want to be able to run TONS of concurrent callbacks that do exclusively io, I think you'd need native support for async.

For us, we're less concerned with the performance implications and just want to be able to inter-operate with async functions in our codebase.

Understandable if this isn't a priority right now, but wanted to file an issue since it seems like this library is mostly about ergonomics.

csaroff avatar Oct 04 '22 15:10 csaroff

Not too sure if itll work, but one attempt you can try is sub-classing the Subscription class like here but with the asyncio.run in the call method.

from rele import Subscription

class DoSomethingSub(Subscription):
    topic = 'photo-uploaded'

    def __init__(self):
        self._func = self.callback_func
        super().__init__(self._func, self.topic)

    async def callback_func(self, data, **kwargs):
		return await asyncio.sleep(2)
	
	def __call__(self, data, **kwargs):
		return asyncio.run(self._func(data, **kwargs))

andrewgy8 avatar Oct 05 '22 06:10 andrewgy8

Makes sense. I think I'll stick with the workaround for now. It's actually not so bad.

Appreciate all the help ❤️

csaroff avatar Oct 05 '22 08:10 csaroff