taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

Task routing to specific worker

Open cofob opened this issue 2 years ago • 6 comments
trafficstars

Hello!

I need to run tasks on specific nodes from the main application, how can this be implemented?

For example this kind of worker code:

@broker.task
def example_task(node_name: str):
    print(f"Node {node_name}")

And such application code:

await example_task.kiq("node1", taskiq_queue="node.1")
await example_task.kiq("node2", taskiq_queue="node.2")

That is, so that I can make a kiq and send the task to a specific node where it will be accepted and executed. This can be realized by allocating a separate queue to each worker. At the moment I see that the broker is bound to only one queue and it will be difficult to implement.

cofob avatar Jul 22 '23 04:07 cofob

It was an interesting question. I think that this usecase is pretty common in building distributed queues. And, luckily, I found an interesting solution. Here's gist that solves your problem.

https://gist.github.com/s3rius/bb334fbafc7131629115fc96aaf3f473

It uses some tricks that are poorly documented. I guess I need to add information about custom receivers in next documnetation interations.

Also, I found it a bit frustrating that you can replace class for decorated tasks, but typing would still won't change. I'll try to add more typehints so you can define custom kiq method that would have node as it's parameter, so your favorite text editor would be able to autocomplete it.

s3rius avatar Jul 23 '23 10:07 s3rius

And of course, you can change it as you'd like to fulfill your needs and publish as a taksiq library. I really love and enjoy seeing how taskiq community grows and expands taskiq functionality so it fits everybody.

s3rius avatar Jul 23 '23 10:07 s3rius

Thanks for the example! It helped me in solving the problem.

But there are a few flaws in the example:

  1. All 3 queues will be opened on all worker nodes, even if they are not needed. As a consequence - heavy load on the application and on the distributed queue.
  2. It is impossible to send a task to a specific node, you can only send it to a group of nodes.

I have rewritten the code for my needs and published it here, please read it.

Also there is a question on this line in taskiq_aio_pika, why the self._routing_key variable is not used here, which is assigned by the user in __init__, but the task name is used? This breaks routing with exchange and setting queue_name on worker.

cofob avatar Jul 24 '23 07:07 cofob

I think it is necessary to implement this functionality not as a separate library, but to add it to the main code, as it is a basic function for any distributed queue.

cofob avatar Jul 24 '23 07:07 cofob