cog icon indicating copy to clipboard operation
cog copied to clipboard

Websockets worker

Open palp opened this issue 3 years ago • 6 comments
trafficstars

This adds a websocket queue worker based heavily on the existing Redis queue worker - they share so much code that it's likely they could and should be refactored to use a base class, but I'm not really a python dev so I didn't attempt it.

The worker takes 3 positional arguments: websocket queue URL, upload URL, and model ID. It will connect to the websocket and wait for a message to be sent; it simply expects a raw JSON payload that includes an ID and input, any other messages sent over the socket would throw an error at the moment - I'm not sure if it should really be designed otherwise. Responses are streamed back over the websocket connection live, with the ID included for correlation if multithreading is used. I have not done exhaustive testing as my server is still in the early stages, but it seems functional at a basic level.

palp avatar Jul 06 '22 01:07 palp

Neat! We're definitely up for multiple drivers for the queue worker. You're right that this could use refactoring into a base class. Having a second driver would give us something to focus that work.

I've got a question about the behaviour here: judging by the on_message method, this will try to run multiple predictions at once, if it receives a new message before an existing prediction is finished. Is that right? If so, the prediction runner is only set up to handle a single prediction at a time, so I think it'll break in unexpected ways. For it to work, I think you'd need to handle queueing inside the worker – taking messages in on_message and adding them to a list, and pulling the next job off that list when ready for a new one.

I think that changes the feel of the whole thing, and makes it more like the HTTP API than the queue worker – one of the aims of the queueing worker is to have multiple Cog models coordinate work against a single queue.

How are you planning to use this worker? What's sitting on the other end of the connection, making the requests? Do you have queueing logic on that side at all?

Some related issues to this:

  • #443
  • #475
  • #521

evilstreak avatar Jul 06 '22 09:07 evilstreak

How are you planning to use this worker? What's sitting on the other end of the connection, making the requests? Do you have queueing logic on that side at all?

Yeah, this is undefined behavior right now on the client; I'm preventing it from happening on the server side and it feels like that's where the actual queuing belongs with this setup. The main reason I'm doing this versus just calling the HTTP API is to simplify networking and deployment, including stuff like supporting use cases behind NAT, etc - like people at home contributing to a worker pool with their GPUs during downtime.

I think that changes the feel of the whole thing, and makes it more like the HTTP API than the queue worker – one of the aims of the queueing worker is to have multiple Cog models coordinate work against a single queue.

I expected the naming to come up, even, and the current name is such only because I refactored the redis queue code. Since the websocket connection is a 1:1 bidirectional pipe, it doesn't really make sense to use it explicitly like a queue. I thought cog.server.websocket by itself was a little confusing, though, since it's not a socket server, it's a client, and websocket support on the HTTP API could happen at some point leading to extra confusion.

Edit: Come to think of it, there's no reason this class couldn't act as a client or server, once connected it's the same thing.

palp avatar Jul 06 '22 09:07 palp

I think maybe the right term for what both of them do (different from the HTTP API) is "subscription" or something along those lines, versus "queue"?

palp avatar Jul 06 '22 09:07 palp

Though just temporary logic, this bit of code is where I actually assign jobs to workers; the idea is if a worker has a running job it's not considered a valid target for a new job - I figured it would help show how I'm planning to use it.

https://github.com/NightmareAI/cogflare/blob/8752c0cb86e665f3b4071f4c62aca70d18c58b91/src/queue.ts#L33

      case "/predict": {
        let req = JSON.parse(await request.text());
        var session: Session = this.sessions.filter(member => !member.quit && !member.request)[0];
        if (!session)
          return new Response("No workers available", { status: 400 });
        session.request = req;
        session.webSocket.send(JSON.stringify(req));
        return new Response("OK");

one of the aims of the queueing worker is to have multiple Cog models coordinate work against a single queue.

I should point out this is absolutely how the end result will work for me, effectively a websocket URL represents a queue (for a specific model is what I'm thinking makes the most sense) and every client that connects to it is a worker that's given jobs in a load balanced fashion - very much how a good workqueue should function but with almost all of the logic happening server-side transparently to "dumb" clients. I'm hopeful that since Cloudflare Workers are (apparently) just ES6 web workers with some extra functionality added on, most of the server code will be fairly reusable elsewhere.

palp avatar Jul 06 '22 09:07 palp

I've refactored the name and fixed some bugs, while switching to the asyncio-based websockets library. I plan to adjust the parameters with the idea of allowing it to act as a server or client in the future. It's likely drifting far enough from the Redis worker now that merging them into a base class is moot, but there's probably logic that could be moved into the already shared runner class.

palp avatar Jul 10 '22 02:07 palp

This is fantastic, thank you @palp.

We also intend to support websockets on Replicate and we want to have the same API as Cog. So, just as a reminder to future reviewers, we'll want review this keeping in mind we want to support the same API on Replicate.

Alternatively, we could mark it as an experimental feature with no guarantee of backwards compatibility, then finalize it when we've successfully added it to Replicate's API too.

bfirsh avatar Aug 04 '22 17:08 bfirsh