taskq
taskq copied to clipboard
redis `schedulePending` target messages continue to increase
Example
- Create stream(
taskq:{test}:stream
) and consumer group(taskq
)
127.0.0.1:6379> XGROUP CREATE taskq:{test}:stream taskq $ MKSTREAM
OK
- Add 3 messages
127.0.0.1:6379> XADD taskq:{test}:stream * body apple
"1634099225543-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body orange
"1634099233517-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body banana
"1634099237506-0"
127.0.0.1:6379> XLEN taskq:{test}:stream
(integer) 3
- This time pending list is empty
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
(empty array)
- Consumer
foo
allocated 3 messages
127.0.0.1:6379> XREADGROUP GROUP taskq foo STREAMS taskq:{test}:stream >
1) 1) "taskq:{test}:stream"
2) 1) 1) "1634099225543-0"
2) 1) "body"
2) "apple"
2) 1) "1634099233517-0"
2) 1) "body"
2) "orange"
3) 1) "1634099237506-0"
2) 1) "body"
2) "banana"
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
2) "foo"
3) (integer) 71364
4) (integer) 1
2) 1) "1634099233517-0"
2) "foo"
3) (integer) 71364
4) (integer) 1
3) 1) "1634099237506-0"
2) "foo"
3) (integer) 71364
4) (integer) 1
- When message processed successfully the message will be deleted, but pending list remains 3 messages
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099225543-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
2) "foo"
3) (integer) 122716
4) (integer) 1
2) 1) "1634099233517-0"
2) "foo"
3) (integer) 122716
4) (integer) 1
3) 1) "1634099237506-0"
2) "foo"
3) (integer) 122716
4) (integer) 1
All messages will leaves in pending list that schedulePending
method will got an error trying to release a deleted messages
issue: https://github.com/vmihailenco/taskq/issues/143
When Delete
and Release
do we should execute XACK
and then execute XDEL
https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L225
https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L245
127.0.0.1:6379> XACK taskq:{test}:stream taskq 1634099233517-0
(integer) 1
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099233517-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
2) "foo"
3) (integer) 199531
4) (integer) 1
2) 1) "1634099237506-0"
2) "foo"
3) (integer) 199531
4) (integer) 1
Hello, is this still an issue?