billiard
billiard copied to clipboard
Worker crashed with invalid file descriptor error when use with celery and gevent
Environment
os: Ubuntu 20.04 LTS broker: redis 4.0.9 python: 3.8 celery: 5.1.2 billiard: 3.6.4.0 gevent: 21.1.2
Celery startup config
supervisor
[program:celery_worker_email]
environment = C_FORCE_ROOT="true"
command = /var/www/backend/venv/bin/celery -A run_celery.celery worker -Q "email" -c 20 -n email@%%h
directory = /var/www/backend/
stdout_logfile = /var/log/backend/worker_email.log
redirect_stderr = true
stopasgroup = true
stopwaitsecs = 30
run_celery.py
import gevent.monkey; gevent.monkey.patch_all()
from app import create_app, celery
app = create_app()
Log
[2022-01-05 01:20:00,086: ERROR/ForkPoolWorker-21] Thread 'ResultHandler' crashed: ValueError('invalid file descriptor 20')
Traceback (most recent call last):
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 504, in run
return self.body()
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 899, in body
for _ in self._process_result(1.0): # blocking
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 864, in _process_result
ready, task = poll(timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 1370, in _poll_result
if self._outqueue._reader.poll(timeout):
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 292, in poll
return self._poll(timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 470, in _poll
r = wait([self], timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 1003, in wait
return _poll(object_list, timeout)
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/connection.py", line 983, in _poll
raise ValueError('invalid file descriptor %i' % fd)
ValueError: invalid file descriptor 20
[2022-01-05 01:20:00,550: ERROR/MainProcess] Process 'ForkPoolWorker-21' pid:155045 exited with 'exitcode 1'
[2022-01-05 01:20:10,978: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 1 Job: 74.')
Traceback (most recent call last):
File "/var/www/backend/venv/lib/python3.8/site-packages/billiard/pool.py", line 1265, in mark_as_worker_lost
raise WorkerLostError(
billiard.exceptions.WorkerLostError: Worker exited prematurely: exitcode 1 Job: 74.
How to reproduce
- Use above configs to start celery. (
gevent.monkey.patch_all()
must be used) - Use
kill <pid>
to kill some worker processes. Main process will create new worker processes to replace the killed worker processes. - execute
xxx_task.delay()
in python shell to send some tasks. - once any newly created worker processes receive a task, it will trigger
invalid file descriptor
error and crash.
I’m not sure if the above steps will definitely reproduce the issue, but I’ve reproduced it several times.
Analysis
ResultHandler
is used by main process to read result of worker processes. It should be running at main process, but worker processes executed it.
billiard/pool.py
:
class Pool(object):
def __init__(self, ...):
# ...
self._setup_queues()
# ...
for i in range(self._processes):
self._create_worker_process(i)
# ...
# Thread processing results in the outqueue.
self._result_handler = self.create_result_handler()
self.handle_result_event = self._result_handler.handle_event
if threads:
self._result_handler.start()
# ...
def create_result_handler(self, **extra_kwargs):
return self.ResultHandler(
self._outqueue, self._quick_get, self._cache,
self._poll_result, self._join_exited_workers,
self._putlock, self.restart_state, self.check_timeouts,
self.on_job_ready, on_ready_counters=self._on_ready_counters,
**extra_kwargs
)
# ...
def _setup_queues(self):
self._inqueue = self._ctx.SimpleQueue()
self._outqueue = self._ctx.SimpleQueue()
self._quick_put = self._inqueue._writer.send
self._quick_get = self._outqueue._reader.recv
def _poll_result(timeout):
if self._outqueue._reader.poll(timeout):
return True, self._quick_get()
return False, None
self._poll_result = _poll_result
# ...
def get_process_queues(self):
return self._inqueue, self._outqueue, None
def _create_worker_process(self, i):
sentinel = self._ctx.Event() if self.allow_restart else None
inq, outq, synq = self.get_process_queues()
on_ready_counter = self._ctx.Value('i')
w = self.WorkerProcess(self.Worker(
inq, outq, synq, self._initializer, self._initargs,
self._maxtasksperchild, sentinel, self._on_process_exit,
# Need to handle all signals if using the ipc semaphore,
# to make sure the semaphore is released.
sigprotection=self.threads,
wrap_exception=self._wrap_exception,
max_memory_per_child=self._max_memory_per_child,
on_ready_counter=on_ready_counter,
))
self._pool.append(w)
# ...
# ...
class Worker(object):
def after_fork(self):
if hasattr(self.inq, '_writer'):
self.inq._writer.close()
if hasattr(self.outq, '_reader'):
self.outq._reader.close()
# ...
As above code shows, on startup, main process creates _outqueue
and fork to create worker processes. After fork, worker processes close _outqueue._reader
.
Then main process starts a thread to run ResultHandler
. ResultHandler
executes _poll_result
of _outqueue._reader
.
If a worker process is crashed, main process will fork a new worker process to replace it.
As fork(2) - Linux man page says, after fork, child process will only keep the current thread, and other threads disappear.
So there are no ResultHandler
threads in newly worker processes.
The child process is created with a single thread--the one that called fork(). The entire virtual address space of the parent is replicated in the child, including the states of mutexes, condition variables, and other pthreads objects; the use of pthread_atfork(3) may be helpful for dealing with problems that this can cause.
When use with gevent.monkey.patch_all()
, thread will be replaced with coroutine (it should be, not sure). ResultHandler
is not runing in a new thread.
Fork will copy _poll_result
calls. Worker process is running _poll_result
of _outqueue._reader
, and close _outqueue._reader
.
Reading a closed file descriptor is invalid, _poll_result
raises an invalid file descriptor
exception.
billiard/connection.py
:
if hasattr(select, 'poll'):
def _poll(fds, timeout):
if timeout is not None:
timeout = int(timeout * 1000) # timeout is in milliseconds
fd_map = {}
pollster = select.poll()
for fd in fds:
pollster.register(fd, select.POLLIN)
if hasattr(fd, 'fileno'):
fd_map[fd.fileno()] = fd
else:
fd_map[fd] = fd
ls = []
for fd, event in pollster.poll(timeout):
if event & select.POLLNVAL:
raise ValueError('invalid file descriptor %i' % fd)
ls.append(fd_map[fd])
return ls
Thanks for digging this out. I had the same issue on celery==4.1.1
This was breaking my --pool=prefork
workers when I used --max-tasks-per-child
. When I removed gevent monkey patching from my celery entry point, the prefork workers restarted correctly.
Good luck!