ZeroMQ with scheduler doesn't work since 0.11.8
Version 0.11.7 is the last working release for this code.
from taskiq import TaskiqScheduler, ZeroMQBroker
from taskiq.schedule_sources import LabelScheduleSource
broker = ZeroMQBroker()
scheduler = TaskiqScheduler(
broker=broker,
sources=[LabelScheduleSource(broker)],
)
@broker.task(schedule=[{"cron": "* * * * *"}])
async def hi() -> None:
print("Hello")
Then runs:
taskiq worker module:broker -w 1
taskiq scheduler module:scheduler
Expected output:
[2025-01-16 16:21:17,802][taskiq.worker][INFO ][MainProcess] Pid of a main process: 91204
[2025-01-16 16:21:17,803][taskiq.worker][INFO ][MainProcess] Starting 1 worker processes.
[2025-01-16 16:21:17,805][taskiq.process-manager][INFO ][MainProcess] Started process worker-0 with pid 91205
[2025-01-16 16:21:17,809][taskiq.receiver.receiver][INFO ][worker-0] Listening started.
[2025-01-16 16:22:00,041][taskiq.receiver.receiver][INFO ][worker-0] Executing task module:hi with ID: a529cfa06fa246e0b3181d7bf468746f
Hello
Reality:
Worker doesn't execute task.
I guess it might be caused by this line. Try removing the with statement and only leaving this.
await self.socket.send_multipart(parts)
If it works, you can make a PR, but I'm not sure if it's a correct solution.
Actually, I guess this will work better.
import math
from logging import getLogger
from typing import AsyncGenerator, Callable, Optional, TypeVar
from taskiq.abc.broker import AsyncBroker
from taskiq.abc.result_backend import AsyncResultBackend
from taskiq.message import BrokerMessage
try:
import zmq
from zmq.asyncio import Context, Socket
except ImportError:
zmq = None # type: ignore
_T = TypeVar("_T")
logger = getLogger(__name__)
class ZeroMQBroker(AsyncBroker):
"""
ZeroMQ broker.
This broker starts a socket ON A CLIENT SIDE,
and all workers connect to this socket using sub_host.
If you're using this socket you have to be sure,
that your workers start after the client is ready.
"""
def __init__(
self,
zmq_pub_host: str = "tcp://localhost:5555",
zmq_sub_host: str = "tcp://0.0.0.0:5555",
result_backend: "Optional[AsyncResultBackend[_T]]" = None,
task_id_generator: Optional[Callable[[], str]] = None,
) -> None:
if zmq is None:
raise RuntimeError(
"To use ZMQ broker please install pyzmq lib or taskiq[zmq].",
)
super().__init__(result_backend, task_id_generator)
self.context = Context()
self.pub_host = zmq_pub_host
self.sub_host = zmq_sub_host
self.socket: Socket
async def startup(self) -> None:
"""
Startup for zmq broker.
This function creates actual connections to
sockets. if current process is worker,
it subscribes, otherwise it becomes publisher.
"""
if self.is_worker_process:
self.socket = self.context.socket(zmq.PULL)
else:
self.socket = self.context.socket(zmq.PUSH)
await super().startup()
async def kick(self, message: BrokerMessage) -> None:
"""
Kicking message.
This method is used to publish message
via socket.
:param message: message to publish.
"""
part_len = 100
parts = [
message.message[
idx * part_len : min(idx * part_len + part_len, len(message.message))
]
for idx in range(math.ceil(len(message.message) / part_len))
]
with self.socket.connect(self.pub_host) as sock:
await sock.send_multipart(parts)
async def listen(self) -> AsyncGenerator[bytes, None]:
"""
Start accepting new messages.
:yields: incoming messages.
"""
with self.socket.bind(self.sub_host) as sock:
while True:
data = await sock.recv_multipart()
yield b"".join(data)
This ZMQ broker architecture will be reversed compared to the previous version. It will be binding socket on worker side and sending to this socket from all the publishers. Which makes sense, but now you can only have 1 worker at a time.
I could have made it possible to use multiple clients listening by passing SO_REUSEPORT socket option, but it seems like ZMQ doesn't support it.
https://github.com/zeromq/libzmq/issues/1443