nats.py
nats.py copied to clipboard
Multiple Consumers Processes are not handling messages in parallel
I am trying to use nats for job queuing. I basically want it's as an alternative to celery+rabbitmq for my usecase. I have two scripts.
- Producer Script that populates the queue.
producer.py
import nats
from nats.js.api import RetentionPolicy
NATS_HOST = '0.0.0.0'
NATS_PORT = '4222'
DOCUMENT_EXT_SUBJECT = "file_process"
nc = await nats.connect(servers=f"nats://{NATS_HOST}:{NATS_PORT}")
js = nc.jetstream()
await js.add_stream(
name="sample-stream",
subjects=[DOCUMENT_EXT_SUBJECT],
retention=RetentionPolicy.WORK_QUEUE,
)
for i in range(20):
ack = await js.publish(
subject=DOCUMENT_EXT_SUBJECT,
payload=json.dumps(
{
"some_load" : "lsome_load",
}).encode()
)
- Consumer Script :
consumer.py
import asyncio
import signal
import sys
import logging
import nats
from nats.js.api import RetentionPolicy, ConsumerConfig, AckPolicy
consumer_config = ConsumerConfig(
ack_wait=900,
max_deliver=1,
max_ack_pending=1,
ack_policy=AckPolicy.EXPLICIT
)
# Nats
NATS_HOST = '0.0.0.0'
NATS_PORT = '4222'
DOCUMENT_EXT_SUBJECT = "file_process"
MAX_RECONNECT_ATTEMPTS = 10
## A cpu bound task
def time_consuming_task():
import time; time.sleep(50)
return
async def task_cb(msg):
time_consuming_task()
await msg.ack()
print("acknowledged document-extraction !")
async def run():
logging.info("Started Document processing Consumer ...")
async def error_cb(e):
sys.exit()
async def disconnected_cb():
logging.info(f"Got disconnected from NATS server .. Retrying .")
async def reconnected_cb():
logging.info("Got reconnected...")
nc = await nats.connect(
servers=f"nats://{NATS_HOST}:{NATS_PORT}",
error_cb=error_cb,
reconnected_cb=reconnected_cb,
disconnected_cb=disconnected_cb,
max_reconnect_attempts=MAX_RECONNECT_ATTEMPTS,
)
# Create JetStream context.
js = nc.jetstream()
## PERSIST ON THIS SUBJECT
await js.add_stream(
subjects=[DOCUMENT_EXT_SUBJECT],
name="sample-stream",
## Extra
retention=RetentionPolicy.WORK_QUEUE,
)
await js.subscribe(
DOCUMENT_EXT_SUBJECT,
stream="sample-stream",
queue = "worker_queue", # also knowas as "deliver group". In core nats, it's called "queue group".
cb=task_cb,
manual_ack=True,
config=consumer_config,
)
def signal_handler():
sys.exit()
for sig in ('SIGINT', 'SIGTERM'):
asyncio.get_running_loop().add_signal_handler(getattr(signal, sig), signal_handler)
await nc.flush()
logging.info("Done ... ?")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(run())
loop.run_forever()
except Exception as e:
print("Got error : ", e)
finally:
loop.close()
MY PROBLEM :
I populate the queue using producer.py
.
Then I run consumer.py
script in multple terminal as separate processes.
What's happening is, things don't happen in parallel, like it happened in celery.
Although there are 3 processes to consume message, while a task is being processed inside one of the consumer process, nats server doesn't push any task to other 2 consumer. Everything happens one after another.
am I doing something wrong ? Could it be because I my usecase is for cpu-bound task ?
You might be interested in faststream project and this discussion to understand NATS consumer limits: https://github.com/airtai/faststream/issues/1114