comlink icon indicating copy to clipboard operation
comlink copied to clipboard

Generator functions

Open egriff38 opened this issue 5 years ago • 11 comments

Hello!

I am a new user to comlink and am thrilled to begin integrating it into my projects as the primary way to communicate between the main thread and web workers. My question is, does comlink provide any way of working with generator functions?

My base example is as follows:

//////////// worker.js ///////////////
const fibGen = function*(max: number) {
  if(max > 0 ) yield 1
  if(max > 1 ) yield 1
  let a=1, b=1
  for(let i=2; i<max; i++) {
    const c = a
    a = b
    b = a+c
    yield b
  }
}
Comlink.expose({fibGen})

///////// main.js ////////////
const wrapped = Comlink.wrap(new Worker("./worker.js"))
// cannot serialize because the return value is a generator
const gen = await wrapped.fibGen(50)

// throws saying cannot use as an async generator
for await(i of wrapped.fibGen(50)) {
  console.log(i)
}

I know there are other API methods available with Comlink, I'm just a little green on the best way to go about using them in this case.

Thanks in advance! This is my fourth stopping point after workerize-loader, workerize, and comlink-loader, and has been by far the easiest implementation with my existing setup.

egriff38 avatar Feb 26 '20 17:02 egriff38

Hello, I could use generators by defining a function that returns a proxy of a generator.

// worker.ts

const fibGen = function* (max: number) {
  if (max > 0) yield 1
  if (max > 1) yield 1
  let a = 1, b = 1
  for (let i = 2; i < max; i++) {
    const c = a
    a = b
    b = a + c
    yield b
  }
}

// returns a proxy
const fibGen2 = (max: number) => Comlink.proxy(fibGen(max))

Comlink.expose({ fibGen: fibGen2 })
// main.js
const wrapped = Comlink.wrap(new Worker("./worker.js"))
const gen = await wrapped.fibGen(50)
// `for await ... of` statements implicity call the method `[Symbol.asyncIterator]()` of a object.
// However, `gen[Symbol.asyncIterator]()` returns not AsyncGenerator but Promise<AsyncGenerator>
// Therefore you should wrap gen in { [Symbol.asyncIterator]() { ... } }
const gen2 = { [Symbol.asyncIterator]() { return gen } }

for await (const i of gen2) {
  console.log(i) // 1, 2, 3, 5, ...
}

hiroto7 avatar Apr 11 '20 20:04 hiroto7

I ended up doing something similar to this but written as a handler:

// transferHandlers.ts
import {transferHandlers, proxy} from 'comlink';

transferHandlers.set("iterable", {
  canHandle(obj) {
    const isIterable = typeof obj === "object" && !Array.isArray(obj) && (Symbol.iterator in obj || Symbol.asyncIterator in obj)
    return isIterable
  },
  deserialize(obj) {
    return new Proxy(transferHandlers.get("proxy")!.deserialize(obj), {
      get(target, key) {
        if(key === Symbol.asyncIterator) return async function*() {
          let a;
          do {
            a = await target.next()
            
            yield a.value
          } while(!a.done)
        }
        else return target[key]
      },
      has(target, key) {
        if(key === Symbol.asyncIterator) return true
        else return key in target
      }
    })
  },
  serialize(obj) {
    // Is in main thread?
    return transferHandlers.get("proxy")!.serialize(proxy(obj))
  }
})

I just made sure to import this file in the main and worker contexts, and I could export generators without any additional boilerplate.

egriff38 avatar Apr 13 '20 15:04 egriff38

Older versions of Comlink supported generators by turning them into async generators, but support for async generators was lacking at the time. I should revisit this now.

surma avatar Apr 14 '20 10:04 surma

@surma, this was my first attempt at creating a transfer handler, do you see any issues with my implementation? Big fan of your work!

egriff38 avatar Apr 14 '20 12:04 egriff38

@xepher0coded I saw another approach by @samburnstone the other day.

I haven't compared and contrasted the approaches yet, but I'm probably going to be doing this myself soon and figured I'd link to it since others would probably like to see different approaches in-the-wild.

sebinsua avatar Apr 21 '20 15:04 sebinsua

Hi @sebinsua - well found! I meant to post here but ended up being distracted by other things.

Looking over the handler provided by @xepher0coded, that seems a much better solution as it's reusing the proxy transfer handler rather than essentially duplicating it like I do (although I found it a useful exercise to better understand how Comlink was using message channels to pass values back and forth). It also looks like it allows for other properties to be called directly on the method, whereas my solution simply supports the iterator properties (next, return, throw).

The only thing I spotted is when manually calling iterator.return() as in this example, my transfer handler would result in the for-await-of loop exiting immediately, whereas the transfer handler as suggested by @xepher0coded would iterate one more time before exiting. Not quite sure what exactly is the expected behaviour in this case and it's probably a bit of an edge case 😄.

samburnstone avatar Apr 21 '20 16:04 samburnstone

(previously @xepher0coded, changed my username) Thanks @samburnstone! Do you think assigning the generator to the async iterator instead of creating my own would fix that behavior, like what @hiroto7 had suggested?

[Symbol.asyncIterator]() { return gen }

egriff38 avatar Apr 22 '20 20:04 egriff38

I also had the issue @samburnstone mentioned with the proxied async generator iterating one additional time after it returns (with an undefined value).

@egriff38 I wasn't quite sure what you meant by setting the asyncIterator symbol - I tried to work out a solution along the lines of what you suggested, but haven't quite put it together.

However, I was able to fix the issue with the extra iteration by including an additional check for done in the next() result. According to the iterator protocol, the object returned by next() has "the value true if the iterator has completed its sequence. In this case, value optionally specifies the return value of the iterator." so I made a slight modification to check if done is true and if so, returns the value.

This is what I came up with:

// comlink-transfer-handler-iterable.js
import {transferHandlers, proxy} from 'comlink';

transferHandlers.set("iterable", {
    canHandle: (obj) => {
        const isIterable = typeof obj === "object" && !Array.isArray(obj) &&
              (Symbol.iterator in obj || Symbol.asyncIterator in obj);
        return isIterable;
    },
    deserialize: (obj) => {
        return new Proxy(transferHandlers.get("proxy").deserialize(obj), {
            get: (target, prop) => {
                if(prop === Symbol.asyncIterator) {
                    const gen = async function*() {
                        while (true) {
                            const nextObj = await target.next();
                            if (nextObj.done) {
                                return nextObj.value;
                            }
                            yield nextObj.value;
                        }
                    };
                    return gen;
                }
                else return Reflect.get(...arguments)
            },
            has: (target, prop) => {
                if(prop === Symbol.asyncIterator) return true
                else return prop in target
            }
        })
    },
    serialize: (obj) => {
        return transferHandlers.get("proxy").serialize(proxy(obj));
    },
})

jrandall avatar Apr 10 '21 12:04 jrandall

Hey! Is there any update on this issue? We have quite a big project, and it is not easy to change the syntax of functions. I believe our issue is similar, where the comlink cannot pass a function generator to a web worker via proxy. My function looks like this:

function queryParser(otherFn) {
  return (url, { qs, ...optionsWithoutQs }) => {
    const qsPart = queryString.stringify(qs) || '';
    const urlWithQS = `${url}${qsPart ? '?' : ''}${qsPart}`;
    return requestFunction(urlWithQS, optionsWithoutQs);
  };
}

function nonTransferableFn() {
   return {
     ...
     myOtherFunction: queryParser()
   }
}

const webWorkerProxy = wrap(new Worker(new URL('./web-worker.js', import.meta.url)));
const webWorkerInstance = await new webWorkerProxy();
webWorkerInstance.initialize(nonTransferableFn: proxy(nonTransferableFn));


Unable to run initialize due to queryParser. Is there any support for this in the nearest future?

vvasylkovskyi avatar Jun 07 '22 08:06 vvasylkovskyi

I think the transfer handler can be simplified.

// async-generator-transfer-handler.ts
import type { TransferHandler } from 'comlink';
import * as Comlink4 from 'comlink';
const proxyTransferHandler = Comlink4.transferHandlers.get('proxy');
export const asyncGeneratorTransferHandler: TransferHandler<AsyncGenerator<any>, any> = {
  canHandle: (value: any): value is AsyncGenerator<any> =>
    typeof value.next === 'function' && typeof value[Symbol.asyncIterator] === 'function',
  async *deserialize(obj) {
    const iterator = proxyTransferHandler.deserialize(obj) as AsyncIterator<any>;
    for (;;) {
      const { value, done } = await iterator.next();
      if (done) {
        break;
      } else {
        yield value;
      }
    }
  },
  serialize(obj) {
    return proxyTransferHandler.serialize(Comlink4.proxy(obj));
  },
};

// at both ends:
import * as Comlink4 from 'comlink';
import { asyncGeneratorTransferHandler } from '$lib/async-generator-transfer-handler';
Comlink4.transferHandlers.set('asyncGenerator', asyncGeneratorTransferHandler);

pauliojanpera avatar Apr 12 '23 00:04 pauliojanpera

use this lib

https://www.npmjs.com/package/comlink-async-generator

kyawswarthwin avatar Nov 10 '23 10:11 kyawswarthwin