documentation on (deeply?) proxying objects like Request/Response
Here's the code I think I want to be able to write:
main.ts
import { serve } from "https://deno.land/[email protected]/http/server.ts";
import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";
serve(async function handler(request) {
const worker = new Worker(new URL("./worker.ts", import.meta.url).href, {
type: "module",
});
const handler = Comlink.wrap<(request: Request) => Promise<Response>>(worker);
return await handler(Comlink.proxy(request));
});
worker.ts
/// <reference no-default-lib="true"/>
/// <reference lib="deno.worker" />
import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts";
Comlink.expose(async (request: Request): Promise<Response> => {
const body = await request.text();
return Comlink.proxy(
new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}\n`)
);
});
but when I run my server and send it a request (e.g. curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar) I get an error:
% deno run --allow-net --allow-read main.ts
Check file:///path/to/main.ts
Check file:///path/to/worker.ts
error: Uncaught (in worker "") (in promise) TypeError: Cannot read properties of undefined (reading 'apply')
TypeError: Cannot convert object to primitive value
at AsyncFunction.<anonymous> (file:///path/to/worker.ts:9:38)
error: Uncaught (in promise) Error: Unhandled error event in child worker.
at Worker.#pollControl (deno:runtime/js/11_workers.js:168:21)
I tried implementing a transfer handler based on some async-iterable examples but I didn't get it working and ended up implementing in a less than ideal way. I figure there's bound to be an easier way. I think some additional documentation on how to implement transfer handlers would be helpful. Could such be added in Comlink's README? I think it would be very beneficial to include an example that reuses the existing proxy transfer handler to transfer/proxy a more complex object than an Event (e.g. a Request and/or Response object).
For reference, the code I ended up writing to get things to work is included below and is what is in my current answer to multithreading - Deno on multi-core machines - Stack Overflow.
import { serve } from "https://deno.land/[email protected]/http/server.ts"; import ComlinkRequestHandler from "./ComlinkRequestHandler.ts"; serve(async function handler(request) { const worker = new Worker(new URL("./worker.ts", import.meta.url).> href, { type: "module", }); const handler = ComlinkRequestHandler.wrap(worker); return await handler(request); });worker.ts
/// <reference no-default-lib="true"/> /// <reference lib="deno.worker" /> import ComlinkRequestHandler from "./ComlinkRequestHandler.ts"; ComlinkRequestHandler.expose(async (request) => { const body = await request.text(); return new Response(`Hello to ${request.url}\n\nReceived:\n\n${body}> \n`); });ComlinkRequestHandler.ts
import * as Comlink from "https://cdn.skypack.dev/[email protected]?dts"; interface RequestMessage extends Omit<RequestInit, "body" | "signal"> > { url: string; headers: Record<string, string>; hasBody: boolean; } interface ResponseMessage extends ResponseInit { headers: Record<string, string>; hasBody: boolean; } export default class ComlinkRequestHandler { #handler: (request: Request) => Promise<Response>; #responseBodyReader: ReadableStreamDefaultReader<Uint8Array> | > undefined; static expose(handler: (request: Request) => Promise<Response>) { Comlink.expose(new ComlinkRequestHandler(handler)); } static wrap(worker: Worker) { const { handleRequest, nextResponseBodyChunk } = Comlink.wrap<ComlinkRequestHandler>(worker); return async (request: Request): Promise<Response> => { const requestBodyReader = request.body?.getReader(); const requestMessage: RequestMessage = { url: request.url, hasBody: requestBodyReader !== undefined, cache: request.cache, credentials: request.credentials, headers: Object.fromEntries(request.headers.entries()), integrity: request.integrity, keepalive: request.keepalive, method: request.method, mode: request.mode, redirect: request.redirect, referrer: request.referrer, referrerPolicy: request.referrerPolicy, }; const nextRequestBodyChunk = Comlink.proxy(async () => { if (requestBodyReader === undefined) return undefined; const { value } = await requestBodyReader.read(); return value; }); const { hasBody: responseHasBody, ...responseInit } = await > handleRequest( requestMessage, nextRequestBodyChunk ); const responseBodyInit: BodyInit | null = responseHasBody ? new ReadableStream({ start(controller) { async function push() { const value = await nextResponseBodyChunk(); if (value === undefined) { controller.close(); return; } controller.enqueue(value); push(); } push(); }, }) : null; return new Response(responseBodyInit, responseInit); }; } constructor(handler: (request: Request) => Promise<Response>) { this.#handler = handler; } async handleRequest( { url, hasBody, ...init }: RequestMessage, nextRequestBodyChunk: () => Promise<Uint8Array | undefined> ): Promise<ResponseMessage> { const request = new Request( url, hasBody ? { ...init, body: new ReadableStream({ start(controller) { async function push() { const value = await nextRequestBodyChunk(); if (value === undefined) { controller.close(); return; } controller.enqueue(value); push(); } push(); }, }), } : init ); const response = await this.#handler(request); this.#responseBodyReader = response.body?.getReader(); return { hasBody: this.#responseBodyReader !== undefined, headers: Object.fromEntries(response.headers.entries()), status: response.status, statusText: response.statusText, }; } async nextResponseBodyChunk(): Promise<Uint8Array | undefined> { if (this.#responseBodyReader === undefined) return undefined; const { value } = await this.#responseBodyReader.read(); return value; } }Example usage:
% deno run --allow-net --allow-read main.ts% curl -X POST --data '{"answer":42}' http://localhost:8000/foo/bar Hello to http://localhost:8000/foo/bar Received: {"answer":42}