ariadne
ariadne copied to clipboard
Propose a pub/sub contract for resolvers and subscriptions
I propose that we propose a contract/interface that could be implemented over different transports to aid application authors with using pub/sub and observing changes.
I imagine the common pattern would be similar to the one below:
from graphql.pyutils import EventEmitter, EventEmitterAsyncIterator
class PubSub:
def __init__(self):
self.emitter = EventEmitter()
def subscribe(self, event_type):
return EventEmitterAsyncIterator(self.emitter, event_type)
async def publish(self, event_type, message):
raise NotImplementedError()
A dummy implementation (useful for local development) could hook the publish method right into the emitter:
class DummyPubSub(PubSub):
async def publish(self, event_type, message):
self.emitter.emit(event_type, message)
Another implementation could hook it to a Redis server:
import asyncio
import aioredis
async def start_listening(redis, channel, emitter):
listener = await redis.subscribe(channel)
while (await listener.wait_message()):
data = await listener.get_json()
event_type, message = data
emitter.emit(event_type, message)
async def stop_listening(redis, channel):
await redis.unsubscribe(channel)
class RedisPubSub(PubSub):
def __init__(self, redis, channel):
super().__init__()
self.redis = redis
self.channel = channel
asyncio.ensure_future(start_listening(self.redis, self.channel, self.emitter))
def __del__(self):
asyncio.ensure_future(stop_listening(self.redis, self.channel))
async def publish(self, event_type, message):
await self.redis.publish_json(self.channel, [event_type, message])
Similar implementations could happen for AWS SNS+SQS, Google Could Pub/Sub etc.
The tricky part is how to help with passing the object between resolvers and subscriptions. I think the most natural way would be to add it to the context. If we can come up with a standard name then it's easy to write a decorator that automatically unpacks it into a keyword argument:
@mutation.field("updateProduct")
@with_pubsub
def resolve_update_product(parent, info, pubsub):
...
pubsub.publish("product_updated", product.id)
Note that I am not currently sure whether the __init__/__del__ lifecycle is what we want. Maybe it would make more sense to use async with SomePubSub(...) as pubsub:, this way the implementation could disconnect from the server when there are no clients listening.
Yes, this is a great idea, and I was just looking to see if anyone had done it yet. Using a context manager feels the most natural to me and in line with similar interfaces.
I think we're quite used to the pattern of injecting things into kwargs for this purpose, and this is how I'm passing other objects (current user object, etc.) in contextually with decorators. Stylistically I'm not a huge fan of beginning a function name with with_ but that's just me. For myself I tend to use @inject_service, @provide_service, etc. Could be so that I don't confuse them as the with keyword, or maybe it just reminds me too much of React HOCs. I think pubsub is straightforward for the argument name.
Looks like @tomchristie has started work on the pub sub library: Broadcaster
It looks promising, and would potentially allow us to limit our implementation to just abstraction layer for resolvers and subscription generators.