workerd
workerd copied to clipboard
TCP ingress worker support
Implements the connect handler and tcp-ingress for a worker
See the samples/tcp-ingress for an example
Note that this is only enabling this for local dev in workerd. A lot more additional work will need to be done internally to support TCP ingress in general but this at least lays the ground work. It also gives us some basic stuff we can use to test the Socket API and node:net implementation without relying on the internal production tests.
The proposed experimental API here is simple:
- The worker exports a
connect(event, env, ctx)handler - The
eventargument has two properties:inbound-- AReadableStreamthat provides access to the inbound data stream.cf-- The JSON parsedcfstructure
- The return value is expected to be a
ReadableStreamor aPromise<ReadableStream>, internally this will be piped back to the client in the same way thefetch()handler'sResponse.
interface ConnectEvent {
inbound: ReadableStream;
cf: object;
};
connect(event : ConnectEvent, env: WorkersEnv, ctx: WorkersContext) : Promise<ReadableStream>|ReadableStream
This is essentially the same model as the fetch() handler with the Request and Response objects stripped away.
See this comment for more discussion of the API: https://github.com/cloudflare/workerd/pull/1429#issuecomment-2197791012
Labeling this with the nodejs compat label only because this is a prereq for having proper tests for the node:net implementation
We should verify what the Cloudflare stack does with HTTP CONNECT requests today. Hopefully they don't make it through to the edge runtime. If they do, we need to think carefully about this change since such requests will now run the Worker's connect handler. (Not necessarily a bad thing long-term but maybe something we don't intend to enable right now.)
We should verify what the Cloudflare stack does with HTTP CONNECT requests today. Hopefully they don't make it through to the edge runtime. If they do, we need to think carefully about this change since such requests will now run the Worker's connect handler. (Not necessarily a bad thing long-term but maybe something we don't intend to enable right now.)
For the time being I can have the globalScope->connect(...) handler throw if the experimental compat flag is not enabled.
This really needs a design doc to discuss the API. It's hard for people to give API feedback if they first must read the code to figure out what the API looks like.
It looks like the connect() handler is like:
export default {
async connect(event, env, ctx) {
// event.inbound is the socket
// event.cf is the CF blob
}
}
I'm concerned that, unlike the fetch() API, this is very asymmetrical. The asymmetry makes it awkward to implement proxying: You can't just do return connect("upstream-host"). You end up having to set up a pair of pumps, like:
export default {
async connect(event, env, ctx) {
let up = connect("upstream-host");
await Promise.all(
env.inbound.readable.pipeTo(up.writable);
up.readable.pipeTo(env.inbound.writable));
}
}
This is kind of ugly, but there's a bigger problem than the aesthetics: The above code cannot apply deferred proxying optimization, since JavaScript has to await the two pipes. How do we avoid that? Seems tricky.
I would argue instead that the connect() handler should have the same signature as the global connect():
export default {
async connect(addr, env, ctx) {
return connect("upstream-host");
}
}
Now proxying is simple and the pump can be deferred in an obvious way.
To allow people to directly respond, we would probably want to offer a notion of a SocketPair, which works a lot like WebSocketPair. They end up with similar code to handling a WebSocket request in fetch().
Sorry that this feedback comes after so much delay, but this is a basic problem with the format of "feature proposal as PR": reviewers will tend to look at the code, and will have a hard time seeing the high-level design, because that's what you're presenting them. We really need feature requests like this to start out with a design doc explaining the high-level design, especially API details. It doesn't have to be long, it's just to prompt discussion of the API before spending time on code.
@kentonv :
It looks like the connect() handler is like ...
Not quite... The event.inbound is a ReadableStream not a Socket. The return value is a Promise<ReadableStream> ... So it would be...
export default {
async connect(event, env, ctx) {
const up = connect("upstream-host");
// will use an optimized pipe since both are internal streams
ctx.waitUntil(event.inbound.pipeTo(up.writable));
return up.readable;
}
}
// Would be a bit nicer here if the Socket subrequest `connect(...)` API had a way of receiving the
// inbound readable such that it automatically wired up the pipe... e.g. `connect('upstream-host', { outbound: event.inbound })`
// But that is a limitation of the Socket API not the TCP ingress API.
We could also support a return value of Promise<ReadableWritablePair> and automatically wire up the necessary pipes, allowing something like...
export default {
async connect(event, env, ctx) {
// The internal handler logic would detect this case and wire event.inbound to socket.writable,
// and respond with the socket.readable... This would just be a syntactic sugar alternative to
// the above example tho,
return connect('upstream-host');
}
}
I've expanded the PR description with the brief description of the simple experimental API implemented here.
To allow people to directly respond, we would probably want to offer a notion of a SocketPair, which works a lot like WebSocketPair. They end up with similar code to handling a WebSocket request in fetch().
I disagree here. I originally started down that path on the implementation here and there's really no reason to pass the writable side of this around. Let's take the simplest possible example, an echo service:
export default {
// Uses deferred proxy...
connect({ inbound }) { return inbound; }
}
Or a slightly more complicated case that responds to individual inputs...
async function processInbound(readable, writable) {
for await (const chunk of readable) {
// Process the chunk...
const result = processChunk();
// Write the result
await writable.write(result);
}
}
export default {
connect({ inbound }, env, ctx) {
const { writable, readable } = new IdentityTransformStream();
ctx.waitUntil(processInbound(inbound, writable);
return readable; // Can use deferred proxy but limited by waitUntil...
}
}
This could also be implemented with a TransformStream fairly easily with good symmetry... (however, because this uses a JS-backed transform deferred proxy isn't used, but that's not specific to connect)
export default {
connect({ inbound }, env, ctx) {
return inbound.pipeThrough(new TransformStream({
transform(chunk, controller) {
controller.enqueue(processChunk());
}
}));
}
}
Or, proxy a fetch subrequest
export default {
async connect({ inbound }, env, ctx) {
const { writable, readable } = new IdentityTransformStream();
const request = new Request('http://example.org', { body: readable });
ctx.waitUntil(inbound.pipeTo(writable));
const resp = await fetch(request);
return resp.body; // body is a ReadableStream... uses deferred proxy
}
}
I would also argue that we do not need to solve any API issues right now. This is an experimental mechanism that is not currently intended for production use beyond supporting a handful of tests for CI