apollo-feature-requests
apollo-feature-requests copied to clipboard
Add an AsyncIterable implementation to an Observable
When dealing with complex links, it becomes necessary to retrieve data from forward(opearation). At the moment there is only a mechanism for getting data by subscribing to updates and getting data from there. However, ES has an excellent type for this type of data: AsyncIterable.
First, it is convenient to work with it from async environment. Second, it is always better to use the built-in language constructs.
I have implemented a small function that turns an Observable into an AsyncIterator:
import { Observable } from '@apollo/client';
const nextSymbol = Symbol('@@next');
const doneSymbol = Symbol('@@done');
const errorSymbol = Symbol('@@error');
type NextType<T> = { __type: typeof nextSymbol, value: T };
type DoneType = { __type: typeof doneSymbol, value: void };
type ErrorType = { __type: typeof errorSymbol, value: any };
type QueueItem<T> = NextType<T> | ErrorType | DoneType;
export const observableToIterator = <T>(observable: Observable<T>): AsyncIterable<T> => {
return {
[Symbol.asyncIterator]: () => {
const queue = createQueue<QueueItem<T>>();
const subscription = observable.subscribe({
next: (val) => queue.add({ __type: nextSymbol, value: val }),
error: (err) => queue.add({ __type: errorSymbol, value: err }),
complete: () => queue.add({ __type: doneSymbol, value: undefined })
});
return {
next: async () => {
const next = await queue.receive();
switch (next.__type) {
case nextSymbol:
return { done: false, value: next.value };
case errorSymbol:
throw next.value;
case doneSymbol:
return { done: true, value: undefined };
default:
throw new Error('Unexpected status here');
}
},
return: async val => {
subscription.unsubscribe();
return { done: true, value: val };
}
};
}
};
};
function createQueue<T>() {
type Listener = (val: T) => void;
const list: T[] = [];
const listeners: Listener[] = [];
return {
add(val: T) {
const listener = listeners.shift();
if (listener) return listener(val);
list.push(val);
},
async receive() {
const item = list.shift();
if (item) return item;
return new Promise<T>((res) => listeners.push(res));
}
};
}
However, I would like Observable to also implement Iterable without writing additional code.
In the future it will be very convenient to work with responses from the server, for example, like this:
const observable = forward(operation);
const iterator = observableToIterator(observable);
try {
for await (const value of iterator) {
console.log('NextValue:', value);
if (Math.random() > 0.9) break; // unsubscribes from observable
if (Math.random() > 0.8) return; // unsubscribes from observable too
if (Math.random() > 0.7) throw new Error(); // unsubscribes from observable too
}
console.log('Done');
} catch(err: any) {
console.log('ErrorValue:', err);
}