dramatiq icon indicating copy to clipboard operation
dramatiq copied to clipboard

Dramatiq workers hangs very often, no logs were seen in logs even if it is verbose mode.

Open Harinib-Kore opened this issue 1 year ago • 12 comments

Issues

GitHub issues are for bugs. If you have questions, please ask them on the mailing list.

Checklist

  • [x] Does your title concisely summarize the problem?
  • [x] Did you include a minimal, reproducible example?
  • [x] What OS are you using?
  • [x] What version of Dramatiq are you using?
  • [x] What did you do?
  • [x] What did you expect would happen?
  • [x] What happened?

What OS are you using?

Centos

What version of Dramatiq are you using?

Dramatiq version 1.14.2

What did you do?

Processing parallel async requests using dramatiq

What did you expect would happen?

It should work properly it should receive messages and tasks, but it is getting hanged.

What happened?

We are processing parallel async requests. It will work fine for 3 days and again the same issue occurs. If we restart the workers then it works , how to fix this

dramatiq dramatiq_app.BatchOperationsDramatiqTasks --processes 1 --verbose

I cant see anything in logs files. Even logs getting hanged

error which i captured when logs got hanged

Exception in thread Thread-1: Traceback (most recent call last): File "/data/py3.9.7/lib/python3.9/[threading.py](https://threading.py/)", line 973, in _bootstrap_inner [self.run](https://self.run/)() File "/data/py3.9.7/lib/python3.9/[threading.py](https://threading.py/)", line 910, in run self._target(*self._args, **self._kwargs) File "/data/py3.9.7/Findly/lib/python3.9/site-packages/dramatiq/[cli.py](https://cli.py/)", line 328, in watch_logs data = event.recv_bytes() File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 221, in recv_bytes buf = self._recv_bytes(maxlength) File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 426, in _recv_bytes return self._recv(size) File "/data/py3.9.7/lib/python3.9/multiprocessing/[connection.py](https://connection.py/)", line 384, in _recv chunk = read(handle, remaining) MemoryError

Harinib-Kore avatar Sep 13 '23 09:09 Harinib-Kore

We got something similiar with --process 1 --threads 2. When Worker consume one queue and produce messages to another. We reproduce it yesterday and will investigate what happend actually

Quick fix was --process 1 --threads 1 UPD: we produce messages to another queue trough kombu and kombu.Connection not thread-safe. In that case you got infinity wait on _read, cause bot threads call _read and read different parts of incoming data and both can't construct frame and infinity wait for next data.

spumer avatar Sep 22 '23 07:09 spumer

I've observed similar behavior when the CPU reaches 100%.

Alsheh avatar Oct 16 '23 10:10 Alsheh

I have the same issue with 1.15 (python 3.11) using the AsyncIO middleware. using --process 1 --threads 1 works

menezes- avatar Oct 27 '23 22:10 menezes-

I am seeing this as well running with --processes 4 --threads 1 running through django_dramatiq I'll try updating to 1 process and see how that goes. Also using dramatiq 1.5 (python 3.10)

agamrp avatar Nov 10 '23 00:11 agamrp

I'm released thread-safe broker implemention https://github.com/spumer/dramatiq-kombu-broker/ We does not have hangs anymore with that

spumer avatar Jun 23 '24 11:06 spumer

I'm released thread-safe broker implemention https://github.com/spumer/dramatiq-kombu-broker/ We does not have hangs anymore with that

hey what combination work wells whether it is dramatiq kombu broker and redis as backend , or should i use both kombu

Harinib-Kore avatar Jul 02 '24 16:07 Harinib-Kore

Works well with kombu + rabbitmq. Right now dramatiq-kombu-broker support only RabbitMQ

spumer avatar Jul 03 '24 13:07 spumer

Is there a way to check the current status of a Dramatiq task by its jobId? Specifically, to determine if it is still running or has completed? Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

In Celery, you can achieve this with the following code:

celery_task = celery.AsyncResult(id=job_id, app=celery)
if celery_task.state != 'REVOKED':

My primary intention is to manage long-running tasks by being able to terminate them if necessary. How can I remove such tasks from the queue in Dramatiq?

Harinib-Kore avatar Jul 03 '24 13:07 Harinib-Kore

Is there a way to check the current status of a Dramatiq task by its jobId?

We solve it by https://github.com/Bogdanp/django_dramatiq, it has Task model and middleware which write message status

Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

You mean display_name in Celery? Where you expect to see that in Dramatiq? :) I think Actor name is enough

spumer avatar Jul 04 '24 11:07 spumer

Is there a way to check the current status of a Dramatiq task by its jobId?

We solve it by https://github.com/Bogdanp/django_dramatiq, it has Task model and middleware which write message status

Also, is it possible to assign a custom name to the job, similar to a functionality available in Celery?

You mean display_name in Celery? Where you expect to see that in Dramatiq? :) I think Actor name is enough

  1. how to get process id of task with message id or any other parameter in dramatiq ?

  2. I wanted to know how to revoke a process like

result.revoke()

AsyncResult(id).revoke()

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True)

app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed', terminate=True, signal='SIGKILL')

Harinib-Kore avatar Jul 04 '24 14:07 Harinib-Kore

You need to implement manually. Dramatiq has no functionality to revoke task.

I think it can be done by implement like in celery: create queue for events and dispatch events by worker when done some action and listen for any other. Then you can implement "revoke event", and discard message when worker receive it

spumer avatar Jul 08 '24 09:07 spumer

Hi i am facing issue , can you tell me how to solve this @agamrp @spumer @Bogdanp

I'm issuing a ticket for the following Dramatiq configuration and error:

Actor Configuration

@dramatiq.actor(queue_name="batch_index_operations_queue", store_results=True)
def batch_index(request_payload):
    try:
        sa_gateway.batch_index_documents(request_payload)
        return True
    except Exception:
        debug_logger.error('Error in creating an async task for Batch Index')
        debug_logger.error(traceback.format_exc())
        return False

Dramatiq Setup

def setup_dramatiq():
    redis_url = get_redis_connection_url()
    redis_broker = RedisBroker(url=redis_url)
    dramatiq.set_broker(redis_broker)
    backend = RedisBackend(url=redis_url)
    redis_broker.add_middleware(Results(backend=backend))

CrawlerDramatiq Configuration

{ "name": "CrawlerDramatiq", "script": "/bin/bash", "args": "-c 'cd /data/www/Findly/search && source /data/sa_py3.9.7/Findly/bin/activate && dramatiq dramatiq_app.CrawlDramatiqTasks --processes 4'", "autorestart": true, "log_file": "/data/logs/findly/crawler_dramatiq.log", "watch": false }

Error

Exception in thread Thread-1: Traceback (most recent call last): File "/data/sa_py3.9.7/lib/python3.9/threading.py", line 973, in _bootstrap_inner self.run() File "/data/sa_py3.9.7/lib/python3.9/threading.py", line 910, in run self._target(*self._args, **self._kwargs) File "/data/sa_py3.9.7/Findly/lib/python3.9/site-packages/dramatiq/cli.py", line 337, in watch_logs log_file.write(data + "\n") MemoryError

Issue

I am encountering a MemoryError when running the Dramatiq task. How can this be resolved?

Harinib-Kore avatar Jul 30 '24 09:07 Harinib-Kore