apollo-feature-requests icon indicating copy to clipboard operation
apollo-feature-requests copied to clipboard

Add an AsyncIterable implementation to an Observable

Open JWo1F opened this issue 2 years ago • 0 comments

When dealing with complex links, it becomes necessary to retrieve data from forward(opearation). At the moment there is only a mechanism for getting data by subscribing to updates and getting data from there. However, ES has an excellent type for this type of data: AsyncIterable.

First, it is convenient to work with it from async environment. Second, it is always better to use the built-in language constructs.

I have implemented a small function that turns an Observable into an AsyncIterator:

import { Observable } from '@apollo/client';

const nextSymbol = Symbol('@@next');
const doneSymbol = Symbol('@@done');
const errorSymbol = Symbol('@@error');

type NextType<T> = { __type: typeof nextSymbol, value: T };
type DoneType = { __type: typeof doneSymbol, value: void };
type ErrorType = { __type: typeof errorSymbol, value: any };
type QueueItem<T> = NextType<T> | ErrorType | DoneType;

export const observableToIterator = <T>(observable: Observable<T>): AsyncIterable<T> => {
  return {
    [Symbol.asyncIterator]: () => {
      const queue = createQueue<QueueItem<T>>();

      const subscription = observable.subscribe({
        next: (val) => queue.add({ __type: nextSymbol, value: val }),
        error: (err) => queue.add({ __type: errorSymbol, value: err }),
        complete: () => queue.add({ __type: doneSymbol, value: undefined })
      });

      return {
        next: async () => {
          const next = await queue.receive();

          switch (next.__type) {
            case nextSymbol:
              return { done: false, value: next.value };
            case errorSymbol:
              throw next.value;
            case doneSymbol:
              return { done: true, value: undefined };
            default:
              throw new Error('Unexpected status here');
          }
        },
        return: async val => {
          subscription.unsubscribe();
          return { done: true, value: val };
        }
      };
    }
  };
};

function createQueue<T>() {
  type Listener = (val: T) => void;
  const list: T[] = [];
  const listeners: Listener[] = [];

  return {
    add(val: T) {
      const listener = listeners.shift();
      if (listener) return listener(val);

      list.push(val);
    },
    async receive() {
      const item = list.shift();
      if (item) return item;
      return new Promise<T>((res) => listeners.push(res));
    }
  };
}

However, I would like Observable to also implement Iterable without writing additional code.

In the future it will be very convenient to work with responses from the server, for example, like this:

const observable = forward(operation);
const iterator = observableToIterator(observable);

try {
  for await (const value of iterator) {
    console.log('NextValue:', value);
    if (Math.random() > 0.9) break; // unsubscribes from observable
    if (Math.random() > 0.8) return; // unsubscribes from observable too
    if (Math.random() > 0.7) throw new Error(); // unsubscribes from observable too
  }
  console.log('Done');
} catch(err: any) {
  console.log('ErrorValue:', err);
}

JWo1F avatar Jan 27 '23 06:01 JWo1F