taskiq icon indicating copy to clipboard operation
taskiq copied to clipboard

ZeroMQ with scheduler doesn't work since 0.11.8

Open vunhatchuong opened this issue 11 months ago • 2 comments

Version 0.11.7 is the last working release for this code.

from taskiq import TaskiqScheduler, ZeroMQBroker
from taskiq.schedule_sources import LabelScheduleSource

broker = ZeroMQBroker()
scheduler = TaskiqScheduler(
    broker=broker,
    sources=[LabelScheduleSource(broker)],
)

@broker.task(schedule=[{"cron": "* * * * *"}])
async def hi() -> None:
    print("Hello")

Then runs:

taskiq worker module:broker -w 1

taskiq scheduler module:scheduler

Expected output:

[2025-01-16 16:21:17,802][taskiq.worker][INFO   ][MainProcess] Pid of a main process: 91204
[2025-01-16 16:21:17,803][taskiq.worker][INFO   ][MainProcess] Starting 1 worker processes.
[2025-01-16 16:21:17,805][taskiq.process-manager][INFO   ][MainProcess] Started process worker-0 with pid 91205
[2025-01-16 16:21:17,809][taskiq.receiver.receiver][INFO   ][worker-0] Listening started.
[2025-01-16 16:22:00,041][taskiq.receiver.receiver][INFO   ][worker-0] Executing task module:hi with ID: a529cfa06fa246e0b3181d7bf468746f
Hello

Reality:

Worker doesn't execute task.

vunhatchuong avatar Jan 16 '25 09:01 vunhatchuong

I guess it might be caused by this line. Try removing the with statement and only leaving this.

		await self.socket.send_multipart(parts)

If it works, you can make a PR, but I'm not sure if it's a correct solution.

s3rius avatar Jan 16 '25 10:01 s3rius

Actually, I guess this will work better.

import math
from logging import getLogger
from typing import AsyncGenerator, Callable, Optional, TypeVar

from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.message import BrokerMessage

try:
    import zmq
    from zmq.asyncio import Context, Socket
except ImportError:
    zmq = None  # type: ignore

_T = TypeVar("_T")

logger = getLogger(__name__)


class ZeroMQBroker(AsyncBroker):
    """
    ZeroMQ broker.

    This broker starts a socket ON A CLIENT SIDE,
    and all workers connect to this socket using sub_host.

    If you're using this socket you have to be sure,
    that your workers start after the client is ready.
    """

    def __init__(
        self,
        zmq_pub_host: str = "tcp://localhost:5555",
        zmq_sub_host: str = "tcp://0.0.0.0:5555",
        result_backend: "Optional[AsyncResultBackend[_T]]" = None,
        task_id_generator: Optional[Callable[[], str]] = None,
    ) -> None:
        if zmq is None:
            raise RuntimeError(
                "To use ZMQ broker please install pyzmq lib or taskiq[zmq].",
            )
        super().__init__(result_backend, task_id_generator)
        self.context = Context()
        self.pub_host = zmq_pub_host
        self.sub_host = zmq_sub_host
        self.socket: Socket

    async def startup(self) -> None:
        """
        Startup for zmq broker.

        This function creates actual connections to
        sockets. if current process is worker,
        it subscribes, otherwise it becomes publisher.
        """
        if self.is_worker_process:
            self.socket = self.context.socket(zmq.PULL)
        else:
            self.socket = self.context.socket(zmq.PUSH)

        await super().startup()

    async def kick(self, message: BrokerMessage) -> None:
        """
        Kicking message.

        This method is used to publish message
        via socket.

        :param message: message to publish.
        """
        part_len = 100
        parts = [
            message.message[
                idx * part_len : min(idx * part_len + part_len, len(message.message))
            ]
            for idx in range(math.ceil(len(message.message) / part_len))
        ]
        with self.socket.connect(self.pub_host) as sock:
            await sock.send_multipart(parts)

    async def listen(self) -> AsyncGenerator[bytes, None]:
        """
        Start accepting new messages.

        :yields: incoming messages.
        """
        with self.socket.bind(self.sub_host) as sock:
            while True:
                data = await sock.recv_multipart()
                yield b"".join(data)

This ZMQ broker architecture will be reversed compared to the previous version. It will be binding socket on worker side and sending to this socket from all the publishers. Which makes sense, but now you can only have 1 worker at a time.

I could have made it possible to use multiple clients listening by passing SO_REUSEPORT socket option, but it seems like ZMQ doesn't support it.

https://github.com/zeromq/libzmq/issues/1443

s3rius avatar Jan 16 '25 11:01 s3rius