valkey-py
valkey-py copied to clipboard
NOSUB error in PubSub class while it should be a RuntimeError
hi we are trying to write a valkey backend for celery: the PR the integration test (and some normal usage) error with the following traceback:
self = <t.integration.test_canvas.test_chain object at 0x105e79310>, manager = <celery.contrib.testing.manager.Manager object at 0x119174260>
def test_chain_child_replaced_with_chain_middle(self, manager):
orig_sig = chain(
identity.s(42), replace_with_chain.s(), identity.s()
)
res_obj = orig_sig.delay()
> assert res_obj.get(timeout=TIMEOUT) == 42
t/integration/test_canvas.py:803:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
celery/result.py:251: in get
return self.backend.wait_for_pending(
celery/backends/asynchronous.py:221: in wait_for_pending
for _ in self._wait_for_pending(result, **kwargs):
celery/backends/asynchronous.py:287: in _wait_for_pending
for _ in self.drain_events_until(
celery/backends/asynchronous.py:54: in drain_events_until
yield self.wait_for(p, wait, timeout=interval)
celery/backends/asynchronous.py:63: in wait_for
wait(timeout=timeout)
celery/backends/redis.py:161: in drain_events
message = self._pubsub.get_message(timeout=timeout)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:1072: in get_message
response = self.parse_response(block=(timeout is None), timeout=timeout)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:883: in parse_response
response = self._execute(conn, try_read)
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:859: in _execute
return conn.retry.call_with_retry(
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/retry.py:62: in call_with_retry
return do()
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:860: in <lambda>
lambda: command(*args, **kwargs),
.tox/3.12-integration-rabbitmq_valkey/lib/python3.12/site-packages/valkey/client.py:881: in try_read
return conn.read_response(disconnect_on_error=False, push_request=True)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = <valkey.connection.Connection(host=localhost,port=6380,db=0)>, disable_decoding = False
def read_response(
self,
disable_decoding=False,
*,
disconnect_on_error=True,
push_request=False,
):
"""Read the response from a previously sent command"""
host_error = self._host_error()
try:
if self.protocol in ["3", 3] and not LIBVALKEY_AVAILABLE:
response = self._parser.read_response(
disable_decoding=disable_decoding, push_request=push_request
)
else:
response = self._parser.read_response(disable_decoding=disable_decoding)
except socket.timeout:
if disconnect_on_error:
self.disconnect()
raise TimeoutError(f"Timeout reading from {host_error}")
except OSError as e:
if disconnect_on_error:
self.disconnect()
raise ConnectionError(
f"Error while reading from {host_error}" f" : {e.args}"
)
except BaseException:
# Also by default close in case of BaseException. A lot of code
# relies on this behaviour when doing Command/Response pairs.
# See #1128.
if disconnect_on_error:
self.disconnect()
raise
if self.health_check_interval:
self.next_health_check = time() + self.health_check_interval
if isinstance(response, ResponseError):
try:
> raise response
E valkey.exceptions.ResponseError: NOSUB 'unsubscribe' command executed not in subscribed mode
there are a couple of problems here:
-
the error shouldn't happen, at least not in this form as you can see in the traceback,
PubSub.get_message()is called which containes this check and even if this doesn't work, this method is callingPubSub.parse_response()which should raise an error as this shows -
this error is not consistent, which leads me to believe some sort of race condition is happening, i'm not sure if this is true yet, and if it is, what is causing it.
but the main issue is the first one, if you could help me understand why valkey itself is raising NOSUB, while valkey-py should have raised an error before valkey even gets envolved
important note: this problem is also true when using redis-py