graphql-subscriptions
graphql-subscriptions copied to clipboard
iterable.return() is not sufficient to clean up subscriptions
Hey there 👋 This is perhaps a bug report and a bit of a question about how ya;'ll have implemented this.
We've been working on our own semi-internal implementation of subscriptions for graphql and come across an issue handling the pub-sub case, one that seems like ya'll suffer from as well (unless i'm missing something)
The crux of the issue is that iterable.return() doesn't get called when an iterable hasn't started. which means once you've used language level constructs to map or filter an iterable it'll never clean up unless it's already had a value pulled
Following will run in Chrome if you paste it into a console:
let f = (()=> {
let i = 0
return {
next() {
return Promise.resolve({ value: i++, done: i >= 4 })
},
return() {
console.log('return!')
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
return Promise.reject(error);
},
[Symbol.asyncIterator]() {
return this
}
}
})()
async function* addOne() {
for await (const val of f) yield val + 1;
}
await addOne().return();
The practical problem is that if you disconnect or unsubscribe from a subscription before it pushes a value, the underlying pup-sub will never clean itself up.
I don't believe this is a problem for subscriptions Apollo because I don't think it uses for await anywhere under the hood.
Looking at subscriptions-transport-ws, I believe it calls an AsyncIterator's return method here: https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/server.ts#L211
But keep in mind: no AsyncIterables anyone is using for GraphQL subscriptions should do anything that necessitates cleanup until their Symbol.asyncIterator method is called. As long as they follow that contract then they will work safely in for await statements without leaving any dangling resources.
However, this package's eventEmitterToAsyncIterator method does commit the sin of adding event listeners before its Symbol.asyncIterator method is called. I think subscriptions-transport-ws will always call its return method, so I don't believe any danger exists in that context, but still, eventEmitterToAsyncIterator not adding event listeners in the safest place in general, so I think @jquense was right to bring this up.
See https://github.com/davidyaha/graphql-redis-subscriptions/blob/v1.4.0/src/pubsub-async-iterator.ts, for example, which, while not exactly first-party, also doesn't quite do the right thing.
cc @davidyaha
Actually this is a very important issue because if one writes a valid concat function for async iterables and then concats two pubsub.asyncIterators, one or both of those could leave dangling listeners after the top-level async iterable has been returned.
concat is maybe a bad example, but I've been meaning to write a utility function for prepending the current value at the start of a subscription (since there can be a window of time in between the query result and first subscription result during which updates get lost 😱 ). That function would suffer equally from this problem.
Additionally, withFilter also suffers from this danger and needs to be redesigned to operate on AsyncIterables rather than AsyncIterators themselves.
@jquense I think the title of this issue isn't really correct, return() is perfectly sufficient to clean up subscriptions, the async iterables just have to be written correctly.
Thanks for taking look! I'm not sure tho how the iterable/iterator distinction helps the the above example with the generator does return an iterable, main issue is return() doesn't do anything unless the first yield has been hit. Passing the iterable to a valid map or filter method will still break the chain unless I'm misunderstanding what your proposing? Thanks again for jumping on this !
Sorry by "above examples" I meant if you used an async generator you'd hit the same problem
@jquense Okay let me clarify, cause yeah this stuff is not obvious.
In for await (let val of foo), foo must be an AsyncIterable, but not necessarily AsyncIterator. When the loop starts it calls foo[Symbol.asyncIterator] to create the actual AsyncIterator, which can be a completely different object than foo (and ought to be a different object in most use cases). The loop then repeatedly awaits next() on the iterator until it is done, in which case it calls the iterator's return() method, or an error is thrown, in which case it calls the iterator's throw() method.
Here's the consequence: if the for await loop never runs (like in your example) it never calls the [Symbol.asyncIterator] method. Which means if you only perform listener registration when [Symbol.asyncIterator]() runs, you won't register listeners unless the loop is actually going to run (in which case you're guaranteed to get a return() or throw() call in which you can unregister the listeners).
So here's a safe way to rewrite your original example:
const iterable = {
[Symbol.asyncIterator]() {
let i = 0
console.log('adding listener!')
// below is the actual iterator -- this creates a new iterator with each call
return {
next() {
return Promise.resolve({ value: i++, done: i >= 4 })
},
return() {
console.log('removing listener!')
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
console.log('removing listener!')
return Promise.reject(error);
},
}
}
}
async function* addOne(iterable) {
for await (const val of iterable) yield val + 1;
}
Now if I call the generator but make its iterator return() before I ever call next(), it never registers the "listener":
> let iterator = addOne(iterable)
undefined
> await iterator.return()
{value: undefined, done: true}
However, the first time I call next(), it actually runs the for await statement, and the first thing that does is call iterable[Symbol.asyncIterator](), which is what actually adds the listener. And when I call return() in the middle of the for await, it does pass through the return call, removing the listener:
> iterator = addOne(iterable)
addOne {<suspended>}
> await iterator.next()
adding listener!
{value: 1, done: false}
> await iterator.return()
removing listener!
{value: undefined, done: true}
hmm i'm surprised that suffer the same problem but if next() doesn't resolve before return is called? The situation were we ran into this wasn't that next was never called, it was immediately, but until the first event was received, return didn't get called through. The other downside here is that you not can't use syntax to create the iterable, since you need to customize the iterator method, but maybe that's ok since it's just the consumer
which is to say this is still a failure condition:
const iterable = {
[Symbol.asyncIterator]() {
let i = 0
console.log('adding listener!')
return {
next() {
return Promise.resolve({ value: i++, done: i >= 4 })
},
return() {
console.log('removing listener!')
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
console.log('removing listener!')
return Promise.reject(error);
},
}
}
}
const iter = iterable[Symbol.asyncIterator]]() // adding listeners
iterable.return() // nothing
Maybe that's ok, but it's not intuitive to me that getting the iterator has a bunch of side effects that may or may not have to be cleaned up
huh maybe i've been over complicating this. It's pretty straightforward to handle this with a generator.
var addListeners = () => console.log('adding listener!');
async function* create() {
let i = 0;
try {
await addListeners();
while (i < 5) yield ++i;
} finally {
console.log('remove listeners');
}
}
async function* addOne(iterable) {
for await (const val of iterable) yield val + 1;
}
var iter = addOne(create());
iter.return();
perhaps rather than doing the logic in the iterator method, its best to move it to inside the first next invocation?
@jquense that should be:
const iterator = iterable[Symbol.asyncIterator]() // adding listeners
iterator.return() // removing listeners
No problem in this case.
if
next()doesn't resolve beforereturn()is called
I could be wrong about this, but I think when using async iterators you basically are responsible for calling return after all next calls are resolved.
In any case adding the listeners when the iterator is created is less vulnerable to an early return call than adding the listeners in the first call to next.
Is it possible to yield events coming from a pubsub system using a generator though? Seems like it should, I just can't think of it at the moment...
No problem in this case.
It doesn't work that way tho? I've tried locally and it was failing that test case, but maybe i'm doing something else wrong (totally possible)
Is it possible to yield events coming from a pubsub system using a generator though?
Yeah it is but with some annoyance, we use a queue abstraction: https://github.com/4Catalyzer/graphql-subscription-server/blob/master/src/AsyncUtils.js#L31
In any case the thing graphql-subscriptions does wrong is, it doesn't currently provide a method to create an iterable that waits to add the listeners until the iterable actually gets used. Which is what I added in my PR.
Maybe you are doing something with your original combined iterator/iterable in the OP? I noticed you wrote iterable.return() instead of iter.return().
> const iterable = {
[Symbol.asyncIterator]() {
let i = 0
console.log('adding listener!')
return {
next() {
return Promise.resolve({ value: i++, done: i >= 4 })
},
return() {
console.log('removing listener!')
return Promise.resolve({ value: undefined, done: true });
},
throw(error) {
console.log('removing listener!')
return Promise.reject(error);
},
}
}
}
undefined
> var iterator = iterable[Symbol.asyncIterator]()
VM59:4 adding listener!
undefined
> await iterator.return()
VM59:10 removing listener!
{value: undefined, done: true}
It may be okay in this lib to move the subscription inside the first next() invocation, but:
- It's unnecessarily wasteful to check that with each
nextinvocation [Symbol.asyncIterator]() { return this }is kind of bad, it violates the expectation that the method returns a fresh new iterator every time.
Symbol.asyncIterator { return this } is kind of bad, it violates the expectation that the method returns a fresh new iterator every time.
I don't think that's the correct expectation though. native behavior doesn't return a fresh iterator:
async function* create() {
let i = 0;
while (i < 5) yield ++i;
}
var iterable = create()
iterable[Symbol.asyncIterator]() === iterable[Symbol.asyncIterator]() // true
await iterable[Symbol.asyncIterator]().next() // 1
await iterable[Symbol.asyncIterator]().next() // 2
@jquense I see.
As far as I understand generator functions are mainly intended to be called every time you want to create an iterator, and calling [Symbol.asyncIterator] multiple times on its return value is not really intended. I guess they made the created iterator return itself in its Symbol.asyncIterator method so that it would be more convenient to do things like for await (let val of create()).
It's worth keeping in mind FWIW that the issue isn't generator functions per se. It's about adapting external event sources (something like an observable) to an async iterable.
Right now these options all seem pretty mediocre.
@taion yes, it is a bit tricky. I do think GraphQL was wise to use async iterables because they are the first built-in feature added to JS that is equivalent to observables. What are your thoughts on my PR?
I think using a generator that adds listeners before it yields anything (like @jquense suggested) would be a pretty decent approach. Only thing I don't know is, does TypeScript support async generators? If not I could just move the listener registration inside the first call to next() in eventEmitterAsyncIterator.
this is where we landed btw: https://github.com/4Catalyzer/graphql-subscription-server/blob/master/src/AsyncUtils.ts#L26 it works well (so far), but yeah it's a bit easier to do with syntax than a hand made one, tho that could be wrapped up in a helper pretty easily
From the other thread, this is much worse than previously understood given https://github.com/tc39/proposal-async-iteration/issues/126#issuecomment-403454433.
At the current point, iterator.return() is just not going to work for tearing down and cleaning up resources.
@taion I understand you are frustrated and would like to see a sea change here, and I agree that we need to figure out a better solution for this, but please be technically honest: iterator.return() will work as long as people don't use async generators. "Just not going to work" will mislead people into thinking it will not work in any case.
I think it's important to be clear about this because solving this issue by switching to observables will be a far-reaching change requiring coordination between many different parts of the ecosystem and change in everyone's application code, not necessarily something I'm opposed to, but if it even happens it will take a lot of time. So until that day, it's important that we spread the word that people should avoid using async generators for GraphQL subscriptions and try to clearly explain why.