go-redis
go-redis copied to clipboard
XClaim parse error when pending messages are trimmed
when use redis 5.0 stream XREADGROUP feature, if one consumer crashed and fail to ack messages, and some of the message were trimmed after that, we still can get all messages by XPENDING. XCLAIM those messages should return a array with those trimmed messages "nil", but XClaim fail to parse the result.
it can be reproduced by below code
var maxLen int64 = 2
client.Del("mystream")
client.XGroupCreateMkStream("mystream", "mygroup", "$")
// add 2 entries
client.XAdd(&redis.XAddArgs{
Stream: "mystream",
MaxLen: maxLen,
Values: map[string]interface{}{"value": 1},
})
client.XAdd(&redis.XAddArgs{
Stream: "mystream",
MaxLen: maxLen,
Values: map[string]interface{}{"value": 2},
})
// read by alice
client.XReadGroup(&redis.XReadGroupArgs{
Group: "mygroup",
Consumer: "alice",
Streams: []string{"mystream", ">"},
Count: 10,
Block: 0,
NoAck: false,
})
// add 2 more entries
client.XAdd(&redis.XAddArgs{
Stream: "mystream",
MaxLen: maxLen,
Values: map[string]interface{}{"value": 3},
})
client.XAdd(&redis.XAddArgs{
Stream: "mystream",
MaxLen: maxLen,
Values: map[string]interface{}{"value": 4},
})
// read by alice again, now alice has 4 pending messages, and 2 messages were trimed
client.XReadGroup(&redis.XReadGroupArgs{
Group: "mygroup",
Consumer: "alice",
Streams: []string{"mystream", ">"},
Count: 10,
Block: 0,
NoAck: false,
}).Result()
// sleep for a while
time.Sleep(3 * time.Second)
// get all pending messages
pending, _ := client.XPendingExt(&redis.XPendingExtArgs{
Stream: "mystream",
Group: "mygroup",
Start: "-",
End: "+",
Count: 10,
}).Result()
claimIds := make([]string, 0)
for _, p := range pending {
claimIds = append(claimIds, p.Id)
}
// bob claim messages, on redis-cli, the result will be:
// 1) (nil)
// 2) (nil)
// 3) 1) "1575368938045-0"
// 2) 1) "value"
// 2) "3"
// 4) 1) "1575371014539-0"
// 2) 1) "value"
// 2) "4"
rlt, err := client.XClaim(&redis.XClaimArgs{
Stream: "mystream",
Group: "mygroup",
Consumer: "bob",
MinIdle: time.Second,
Messages: claimIds,
}).Result()
// rlt will be empty array with error redis.Nil
log.Info(rlt, " ", err)
for i := 0; i < 4; i++ {
len, err := client.XLen("mystream").Result() // redis call will fail serval times
log.Info(i, " ", len, " ", err)
}
Faced the same problem with redis 6.0/6.2, any news? Also there is errors like this (both on v8 and v9, example for v8): pool.go:356: Conn has unread data
This issue is marked stale. It will be closed in 30 days if it is not updated.