comlink icon indicating copy to clipboard operation
comlink copied to clipboard

documentation on (deeply?) proxying objects like Request/Response

Open mfulton26 opened this issue 3 years ago • 0 comments

Here's the code I think I want to be able to write:

main.ts

import { serve } from "https://deno.land/[email protected]/http/server.ts";
import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";

serve(async function handler(request) {
  const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
    type: "module",
  });

  const handler = Comlink.wrap<(request: Request) => Promise<Response>>(worker);

  return await handler(Comlink.proxy(request));
});

worker.ts

/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />

import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";

Comlink.expose(async (request: Request): Promise<Response> => {
  const body = await request.text();
  return Comlink.proxy(
    new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}\n`)
  );
});

but when I run my server and send it a request (e.g. curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar) I get an error:

% deno run --allow-net --allow-read main.ts
Check file:///path/to/main.ts
Check file:///path/to/worker.ts
error: Uncaught (in worker "") (in promise) TypeError: Cannot read properties of undefined (reading 'apply')
TypeError: Cannot convert object to primitive value
    at AsyncFunction.<anonymous> (file:///path/to/worker.ts:9:38)
error: Uncaught (in promise) Error: Unhandled error event in child worker.
    at Worker.#pollControl (deno:runtime/js/11_workers.js:168:21)

I tried implementing a transfer handler based on some async-iterable examples but I didn't get it working and ended up implementing in a less than ideal way. I figure there's bound to be an easier way. I think some additional documentation on how to implement transfer handlers would be helpful. Could such be added in Comlink's README? I think it would be very beneficial to include an example that reuses the existing proxy transfer handler to transfer/proxy a more complex object than an Event (e.g. a Request and/or Response object).

For reference, the code I ended up writing to get things to work is included below and is what is in my current answer to multithreading - Deno on multi-core machines - Stack Overflow.

import { serve } from "https://deno.land/[email protected]/http/server.ts";

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

serve(async function handler(request) {
  const worker = new Worker(new URL("./worker.ts", import.meta.url).> href, {
    type: "module",
  });

  const handler = ComlinkRequestHandler.wrap(worker);

  return await handler(request);
});

worker.ts

/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />

import ComlinkRequestHandler from "./ComlinkRequestHandler.ts";

ComlinkRequestHandler.expose(async (request) => {
  const body = await request.text();
  return new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}> \n`);
});

ComlinkRequestHandler.ts

import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";

interface RequestMessage extends Omit<RequestInit, "body" | "signal"> > {
  url: string;
  headers: Record<string, string>;
  hasBody: boolean;
}

interface ResponseMessage extends ResponseInit {
  headers: Record<string, string>;
  hasBody: boolean;
}

export default class ComlinkRequestHandler {
  #handler: (request: Request) => Promise<Response>;
  #responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | > undefined;

  static expose(handler: (request: Request) => Promise<Response>) {
    Comlink.expose(new ComlinkRequestHandler(handler));
  }

  static wrap(worker: Worker) {
    const { handleRequest, nextResponseBodyChunk } =
      Comlink.wrap<ComlinkRequestHandler>(worker);

    return async (request: Request): Promise<Response> => {
      const requestBodyReader = request.body?.getReader();

      const requestMessage: RequestMessage = {
        url: request.url,
        hasBody: requestBodyReader !== undefined,
        cache: request.cache,
        credentials: request.credentials,
        headers: Object.fromEntries(request.headers.entries()),
        integrity: request.integrity,
        keepalive: request.keepalive,
        method: request.method,
        mode: request.mode,
        redirect: request.redirect,
        referrer: request.referrer,
        referrerPolicy: request.referrerPolicy,
      };

      const nextRequestBodyChunk = Comlink.proxy(async () => {
        if (requestBodyReader === undefined) return undefined;
        const { value } = await requestBodyReader.read();
        return value;
      });

      const { hasBody: responseHasBody, ...responseInit } = await > handleRequest(
        requestMessage,
        nextRequestBodyChunk
      );

      const responseBodyInit: BodyInit | null = responseHasBody
        ? new ReadableStream({
            start(controller) {
              async function push() {
                const value = await nextResponseBodyChunk();
                if (value === undefined) {
                  controller.close();
                  return;
                }
                controller.enqueue(value);
                push();
              }

              push();
            },
          })
        : null;

      return new Response(responseBodyInit, responseInit);
    };
  }

  constructor(handler: (request: Request) => Promise<Response>) {
    this.#handler = handler;
  }

  async handleRequest(
    { url, hasBody, ...init }: RequestMessage,
    nextRequestBodyChunk: () => Promise<Uint8Array | undefined>
  ): Promise<ResponseMessage> {
    const request = new Request(
      url,
      hasBody
        ? {
            ...init,
            body: new ReadableStream({
              start(controller) {
                async function push() {
                  const value = await nextRequestBodyChunk();
                  if (value === undefined) {
                    controller.close();
                    return;
                  }
                  controller.enqueue(value);
                  push();
                }

                push();
              },
            }),
          }
        : init
    );
    const response = await this.#handler(request);
    this.#responseBodyReader = response.body?.getReader();
    return {
      hasBody: this.#responseBodyReader !== undefined,
      headers: Object.fromEntries(response.headers.entries()),
      status: response.status,
      statusText: response.statusText,
    };
  }

  async nextResponseBodyChunk(): Promise<Uint8Array | undefined> {
    if (this.#responseBodyReader === undefined) return undefined;
    const { value } = await this.#responseBodyReader.read();
    return value;
  }
}

Example usage:

% deno run --allow-net --allow-read main.ts
% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar
Hello to http://localhost:8000/foo/bar

Received:

{"answer":42}

mfulton26 avatar Apr 13 '22 13:04 mfulton26