celery
celery copied to clipboard
Pass socket_keepalive_options to redis client for result_backend
Note: Before submitting this pull request, please review our contributing guidelines.
Description
In result_backend_transport_options documentation celery have description:
A dict of additional options passed to the underlying transport.
See your transport user manual for supported options (if any).
so after this change we will have possibility to pass socket_keepalive_options defined in result_backend_transport_options to result backend redis client. Also set up max_connection to 0 with is responsible to close conenction to redis after sending task to it.
app.conf.result_backend_transport_options = {
'socket_keepalive_options': {
socket.TCP_KEEPIDLE: 300,
socket.TCP_KEEPCNT: 5,
socket.TCP_KEEPINTVL: 30
}
}
app.conf.max_commections = 0
This options are supported in redis-py client.
what this change will let to achieve?
You will have possibility to manipulate sending tcp keepalive packet to redis from celery client. It needed when you wist to keep connection open do redis server if you have some proxy between them and on that proxy are defined shorten ttl to keep open connection.
Codecov Report
Attention: 1 lines in your changes are missing coverage. Please review.
Comparison is base (
da1146a) 81.25% compared to head (5b6983d) 81.26%.
| Files | Patch % | Lines |
|---|---|---|
| celery/backends/redis.py | 88.88% | 0 Missing and 1 partial :warning: |
Additional details and impacted files
@@ Coverage Diff @@
## main #8297 +/- ##
==========================================
+ Coverage 81.25% 81.26% +0.01%
==========================================
Files 149 149
Lines 18553 18560 +7
Branches 3166 3169 +3
==========================================
+ Hits 15075 15083 +8
+ Misses 3191 3190 -1
Partials 287 287
| Flag | Coverage Δ | |
|---|---|---|
| unittests | 81.24% <88.88%> (+0.01%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
@pawl since you use redis, can you please try / review this PR too?
@auvipy I was unable to start using redis as a broker in production due to this issue: https://github.com/celery/celery/discussions/7276#discussioncomment-6043547
Which version you are using?
Which version you are using?
@auvipy here are the versions I'm using:
- Celery 5.2.6
- Kombu 5.2.3
- redis-py 4.5.4
- billiard 3.6.4.0
I wasn't seeing any fixes related to celery workers using Redis as a broker getting stuck in the 2 more recent releases.
I'm thinking the issues could be related to the worker_max_tasks_per_child setting which I had set to 1000.
I need to find some time to make progress on https://github.com/celery/kombu/pull/1734 However, Kombu's redis transport code is very difficult to navigate without types and debug logging. I'm thinking it may be easiest to add types before proceeding.
At the moment, I'm not sure I'd recommend using Redis as a celery broker.
This change doesn't fix reset connection but possibility to configure keep_alive connection to results pub/sub redis. I didn't see any reason why this change is still open :) But about kombu and not connection reestablish have you (@pawl @auvipy) some test environments (dockers?) for recreation that issue?
================================== FAILURES =================================== ______________________ test_WorkController.test_statedb _______________________
self = <t.unit.worker.test_worker.test_WorkController object at 0x0000019DC8CCD280>
def test_statedb(self):
from celery.worker import state
Persistent = state.Persistent
state.Persistent = Mock()
try:
worker = self.create_worker(statedb='statefilename')
t\unit\worker\test_worker.py:1027:
t\unit\worker\test_worker.py:717: in create_worker worker = self.app.WorkController(concurrency=1, loglevel=0, **kw) celery\worker\worker.py:98: in init self.setup_instance(**self.prepare_args(**kwargs)) celery\worker\worker.py:138: in setup_instance self.blueprint.apply(self, **kwargs) celery\bootsteps.py:211: in apply step.include(parent) celery\bootsteps.py:339: in include return self._should_include(parent)[0] celery\bootsteps.py:335: in _should_include return True, self.create(parent) celery\worker\components.py:211: in create w._persistence = w.state.Persistent(w.state, w.statedb, w.app.clock) celery\worker\state.py:211: in init self.merge() celery\worker\state.py:219: in merge self._merge_with(self.db) .tox\3.12-unit\Lib\site-packages\kombu\utils\objects.py:40: in get return super().get(instance, owner) C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\functools.py:995: in get val = self.func(instance) celery\worker\state.py:288: in db return self.open() celery\worker\state.py:214: in open return self.storage.open( C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\shelve.py:243: in open return DbfilenameShelf(filename, flag, protocol, writeback) C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\shelve.py:227: in init Shelf.init(self, dbm.open(filename, flag), protocol, writeback)
file = 'statefilename', flag = 'c', mode = 438
??? E dbm.error: db type is dbm.gnu, but the module is not available
C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\dbm_init_.py:91: error
@auvipy can you rerun this one integration test. It looks like is non-deterministic. I assume this because all tests are finished with positive results.
I rerun several time and it is passing this time!
@awmackowiak Hey there - is it possible to finalize this PR?
Thank you 🙏