ariadne icon indicating copy to clipboard operation
ariadne copied to clipboard

Propose a pub/sub contract for resolvers and subscriptions

Open patrys opened this issue 6 years ago • 3 comments
trafficstars

I propose that we propose a contract/interface that could be implemented over different transports to aid application authors with using pub/sub and observing changes.

I imagine the common pattern would be similar to the one below:

from graphql.pyutils import EventEmitter, EventEmitterAsyncIterator


class PubSub:
    def __init__(self):
        self.emitter = EventEmitter()

    def subscribe(self, event_type):
        return EventEmitterAsyncIterator(self.emitter, event_type)

    async def publish(self, event_type, message):
        raise NotImplementedError()

A dummy implementation (useful for local development) could hook the publish method right into the emitter:

class DummyPubSub(PubSub):
    async def publish(self, event_type, message):
        self.emitter.emit(event_type, message)

Another implementation could hook it to a Redis server:

import asyncio

import aioredis


async def start_listening(redis, channel, emitter):
    listener = await redis.subscribe(channel)
    while (await listener.wait_message()):
        data = await listener.get_json()
        event_type, message = data
        emitter.emit(event_type, message)


async def stop_listening(redis, channel):
    await redis.unsubscribe(channel)


class RedisPubSub(PubSub):
    def __init__(self, redis, channel):
        super().__init__()
        self.redis = redis
        self.channel = channel
        asyncio.ensure_future(start_listening(self.redis, self.channel, self.emitter))

    def __del__(self):
        asyncio.ensure_future(stop_listening(self.redis, self.channel))

    async def publish(self, event_type, message):
        await self.redis.publish_json(self.channel, [event_type, message])

Similar implementations could happen for AWS SNS+SQS, Google Could Pub/Sub etc.

The tricky part is how to help with passing the object between resolvers and subscriptions. I think the most natural way would be to add it to the context. If we can come up with a standard name then it's easy to write a decorator that automatically unpacks it into a keyword argument:


@mutation.field("updateProduct")
@with_pubsub
def resolve_update_product(parent, info, pubsub):
    ...
    pubsub.publish("product_updated", product.id)

patrys avatar May 08 '19 16:05 patrys

Note that I am not currently sure whether the __init__/__del__ lifecycle is what we want. Maybe it would make more sense to use async with SomePubSub(...) as pubsub:, this way the implementation could disconnect from the server when there are no clients listening.

patrys avatar May 08 '19 16:05 patrys

Yes, this is a great idea, and I was just looking to see if anyone had done it yet. Using a context manager feels the most natural to me and in line with similar interfaces.

I think we're quite used to the pattern of injecting things into kwargs for this purpose, and this is how I'm passing other objects (current user object, etc.) in contextually with decorators. Stylistically I'm not a huge fan of beginning a function name with with_ but that's just me. For myself I tend to use @inject_service, @provide_service, etc. Could be so that I don't confuse them as the with keyword, or maybe it just reminds me too much of React HOCs. I think pubsub is straightforward for the argument name.

notdaniel avatar May 08 '19 17:05 notdaniel

Looks like @tomchristie has started work on the pub sub library: Broadcaster

It looks promising, and would potentially allow us to limit our implementation to just abstraction layer for resolvers and subscription generators.

rafalp avatar Mar 21 '20 21:03 rafalp