celery icon indicating copy to clipboard operation
celery copied to clipboard

Pass socket_keepalive_options to redis client for result_backend

Open awmackowiak opened this issue 2 years ago • 12 comments

Note: Before submitting this pull request, please review our contributing guidelines.

Description

In result_backend_transport_options documentation celery have description:

A dict of additional options passed to the underlying transport.
See your transport user manual for supported options (if any).

so after this change we will have possibility to pass socket_keepalive_options defined in result_backend_transport_options to result backend redis client. Also set up max_connection to 0 with is responsible to close conenction to redis after sending task to it.

    app.conf.result_backend_transport_options = {
        'socket_keepalive_options': {
            socket.TCP_KEEPIDLE: 300,
            socket.TCP_KEEPCNT: 5,
            socket.TCP_KEEPINTVL: 30
        }
    }
    app.conf.max_commections = 0

This options are supported in redis-py client.

awmackowiak avatar Jun 05 '23 07:06 awmackowiak

what this change will let to achieve?

auvipy avatar Jun 05 '23 08:06 auvipy

You will have possibility to manipulate sending tcp keepalive packet to redis from celery client. It needed when you wist to keep connection open do redis server if you have some proxy between them and on that proxy are defined shorten ttl to keep open connection.

awmackowiak avatar Jun 05 '23 08:06 awmackowiak

Codecov Report

Attention: 1 lines in your changes are missing coverage. Please review.

Comparison is base (da1146a) 81.25% compared to head (5b6983d) 81.26%.

Files Patch % Lines
celery/backends/redis.py 88.88% 0 Missing and 1 partial :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #8297      +/-   ##
==========================================
+ Coverage   81.25%   81.26%   +0.01%     
==========================================
  Files         149      149              
  Lines       18553    18560       +7     
  Branches     3166     3169       +3     
==========================================
+ Hits        15075    15083       +8     
+ Misses       3191     3190       -1     
  Partials      287      287              
Flag Coverage Δ
unittests 81.24% <88.88%> (+0.01%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Jun 14 '23 11:06 codecov[bot]

@pawl since you use redis, can you please try / review this PR too?

auvipy avatar Jun 14 '23 13:06 auvipy

@auvipy I was unable to start using redis as a broker in production due to this issue: https://github.com/celery/celery/discussions/7276#discussioncomment-6043547

pawl avatar Jun 20 '23 16:06 pawl

Which version you are using?

auvipy avatar Jun 21 '23 05:06 auvipy

Which version you are using?

@auvipy here are the versions I'm using:

  • Celery 5.2.6
  • Kombu 5.2.3
  • redis-py 4.5.4
  • billiard 3.6.4.0

I wasn't seeing any fixes related to celery workers using Redis as a broker getting stuck in the 2 more recent releases.

I'm thinking the issues could be related to the worker_max_tasks_per_child setting which I had set to 1000.

I need to find some time to make progress on https://github.com/celery/kombu/pull/1734 However, Kombu's redis transport code is very difficult to navigate without types and debug logging. I'm thinking it may be easiest to add types before proceeding.

At the moment, I'm not sure I'd recommend using Redis as a celery broker.

pawl avatar Jun 22 '23 03:06 pawl

This change doesn't fix reset connection but possibility to configure keep_alive connection to results pub/sub redis. I didn't see any reason why this change is still open :) But about kombu and not connection reestablish have you (@pawl @auvipy) some test environments (dockers?) for recreation that issue?

awmackowiak avatar Jul 13 '23 04:07 awmackowiak

================================== FAILURES =================================== ______________________ test_WorkController.test_statedb _______________________

self = <t.unit.worker.test_worker.test_WorkController object at 0x0000019DC8CCD280>

  def test_statedb(self):
      from celery.worker import state
      Persistent = state.Persistent
  
      state.Persistent = Mock()
      try:
      worker = self.create_worker(statedb='statefilename')

t\unit\worker\test_worker.py:1027:


t\unit\worker\test_worker.py:717: in create_worker worker = self.app.WorkController(concurrency=1, loglevel=0, **kw) celery\worker\worker.py:98: in init self.setup_instance(**self.prepare_args(**kwargs)) celery\worker\worker.py:138: in setup_instance self.blueprint.apply(self, **kwargs) celery\bootsteps.py:211: in apply step.include(parent) celery\bootsteps.py:339: in include return self._should_include(parent)[0] celery\bootsteps.py:335: in _should_include return True, self.create(parent) celery\worker\components.py:211: in create w._persistence = w.state.Persistent(w.state, w.statedb, w.app.clock) celery\worker\state.py:211: in init self.merge() celery\worker\state.py:219: in merge self._merge_with(self.db) .tox\3.12-unit\Lib\site-packages\kombu\utils\objects.py:40: in get return super().get(instance, owner) C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\functools.py:995: in get val = self.func(instance) celery\worker\state.py:288: in db return self.open() celery\worker\state.py:214: in open return self.storage.open( C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\shelve.py:243: in open return DbfilenameShelf(filename, flag, protocol, writeback) C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\shelve.py:227: in init Shelf.init(self, dbm.open(filename, flag), protocol, writeback)


file = 'statefilename', flag = 'c', mode = 438

??? E dbm.error: db type is dbm.gnu, but the module is not available

C:\hostedtoolcache\windows\Python\3.12.0\x64\Lib\dbm_init_.py:91: error

auvipy avatar Nov 22 '23 09:11 auvipy

@auvipy can you rerun this one integration test. It looks like is non-deterministic. I assume this because all tests are finished with positive results.

awmackowiak avatar Nov 22 '23 16:11 awmackowiak

I rerun several time and it is passing this time!

auvipy avatar Nov 23 '23 13:11 auvipy

@awmackowiak Hey there - is it possible to finalize this PR?

Thank you 🙏

Nusnus avatar Feb 19 '24 14:02 Nusnus