taskq icon indicating copy to clipboard operation
taskq copied to clipboard

redis `schedulePending` target messages continue to increase

Open sun617 opened this issue 3 years ago • 1 comments

Example

  • Create stream(taskq:{test}:stream) and consumer group(taskq)
127.0.0.1:6379> XGROUP CREATE taskq:{test}:stream taskq $ MKSTREAM
OK
  • Add 3 messages
127.0.0.1:6379> XADD taskq:{test}:stream * body apple
"1634099225543-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body orange
"1634099233517-0"
127.0.0.1:6379> XADD taskq:{test}:stream * body banana
"1634099237506-0"
127.0.0.1:6379> XLEN taskq:{test}:stream
(integer) 3
  • This time pending list is empty
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
(empty array)
  • Consumer foo allocated 3 messages
127.0.0.1:6379> XREADGROUP GROUP taskq foo STREAMS taskq:{test}:stream >
1) 1) "taskq:{test}:stream"
   2) 1) 1) "1634099225543-0"
         2) 1) "body"
            2) "apple"
      2) 1) "1634099233517-0"
         2) 1) "body"
            2) "orange"
      3) 1) "1634099237506-0"
         2) 1) "body"
            2) "banana"
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
2) 1) "1634099233517-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
3) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 71364
   4) (integer) 1
  • When message processed successfully the message will be deleted, but pending list remains 3 messages
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099225543-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1
2) 1) "1634099233517-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1
3) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 122716
   4) (integer) 1

All messages will leaves in pending list that schedulePending method will got an error trying to release a deleted messages issue: https://github.com/vmihailenco/taskq/issues/143

When Delete and Release do we should execute XACK and then execute XDEL https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L225 https://github.com/vmihailenco/taskq/blob/v3/redisq/queue.go#L245

127.0.0.1:6379> XACK taskq:{test}:stream taskq 1634099233517-0
(integer) 1
127.0.0.1:6379> XDEL taskq:{test}:stream 1634099233517-0
(integer) 1
127.0.0.1:6379> XPENDING taskq:{test}:stream taskq - + 5
1) 1) "1634099225543-0"
   2) "foo"
   3) (integer) 199531
   4) (integer) 1
2) 1) "1634099237506-0"
   2) "foo"
   3) (integer) 199531
   4) (integer) 1

sun617 avatar Oct 13 '21 05:10 sun617

Hello, is this still an issue?

lzap avatar Jan 02 '23 14:01 lzap