keda-connectors
keda-connectors copied to clipboard
Redis-backed queues do not consume further messages once a queue is empty
Fission/Kubernetes version
fission version client: fission/core: BuildDate: "2023-11-30T15:47:54Z" GitCommit: 9b57f1f2 Version: v1.20.0 server: fission/core: BuildDate: "2023-11-30T15:47:54Z" GitCommit: 9b57f1f2 Version: v1.20.0 kubectl version Client Version: v1.28.4 Kustomize Version: v5.0.4-0.20230601165947-6ce0bf390ce3 Server Version: v1.23.8
Kubernetes platform (e.g. Google Kubernetes Engine) OpenStack
Describe the bug It looks like Redis-backed queues are no longer listened to once a list is empty (hence deleted from Redis, since it has no concept of an empty list).
To Reproduce
Pre-requirements:
- Redis installed in the cluster (version 7.0.12)
- Keda installed in the cluster (version 2.0)
Create a producer.py
script:
import redis, datetime
from flask import current_app
def main():
rdb = redis.StrictRedis(host='redis-headless.ot-operators.svc.cluster.local')
rdb.lpush('redisbug-topic', f'Time: {datetime.datetime.now().isoformat()}')
current_app.logger.info(f'Message sent')
return 'Message sent\n'
Create a consumer.py
script:
from flask import request
from flask import current_app
def main():
current_app.logger.info(f'Message consumed: {request.data}')
return f'Message comsumed\n'
Test setup:
fission function create --name producer --env python --code producer.py
fission function create --name consumer --env python --code consumer.py
fission route create --url /consumer --function consumer --createingress
On two different shells start log observers:
fission function log -f --name producer
fission function log -f --name consumer
Execute:
fission function test --name producer
fission function test --name producer
fission function test --name producer
fission function test --name producer
The producer function log should now show the messages being produced.
Let's create a trigger to consume the messages:
fission mqtrigger create --name redisbug\
--function consumer\
--mqtype redis\
--mqtkind keda\
--topic redisbug-topic\
--resptopic redisbug-response-topic\
--errortopic redisbug-error-topic\
--maxretries 3\
--metadata address=redis-headless.ot-operators.svc.cluster.local:6379\
--metadata listLength=10\
--metadata listName=redisbug-topic
Now the consumer function log should show the messages being consumed.
Let's add some more messages:
fission function test --name producer
fission function test --name producer
fission function test --name producer
fission function test --name producer
The logs should show the messages being produced, but NO message is consumed.
Deleting and recreating the trigger fixes the issue:
fission mqtrigger delete --name redisbug
Expected result The messaged should be consumed even after the Redis list is emptied.
Actual result No messages are further consumed.
Screenshots/Dump file
Slack thread reference: https://fissionio.slack.com/archives/C3LUX6BBP/p1701415694875969
Hello! I have this exact issue and was wondering if you found the solution?
No. I just switched to Kafka-backed queues.
I also have the same issue... Was there in the meantime a fix or a workaround to this issue?
I think the issue is from https://github.com/fission/keda-connectors/blob/main/redis-http-connector/main.go#L42 as it doesn't wait for new messages. Could a BLPop solve this issue?
msg, err := conn.rdbConnection.BLPop(ctx, 0, conn.connectordata.Topic).Result()