go-redis icon indicating copy to clipboard operation
go-redis copied to clipboard

XClaim parse error when pending messages are trimmed

Open kkkbird opened this issue 5 years ago • 2 comments

when use redis 5.0 stream XREADGROUP feature, if one consumer crashed and fail to ack messages, and some of the message were trimmed after that, we still can get all messages by XPENDING. XCLAIM those messages should return a array with those trimmed messages "nil", but XClaim fail to parse the result.

it can be reproduced by below code

var maxLen int64 = 2

client.Del("mystream")
client.XGroupCreateMkStream("mystream", "mygroup", "$")

// add 2 entries
client.XAdd(&redis.XAddArgs{
	Stream: "mystream",
	MaxLen: maxLen,
	Values: map[string]interface{}{"value": 1},
})
client.XAdd(&redis.XAddArgs{
	Stream: "mystream",
	MaxLen: maxLen,
	Values: map[string]interface{}{"value": 2},
})

// read by alice
client.XReadGroup(&redis.XReadGroupArgs{
	Group:    "mygroup",
	Consumer: "alice",
	Streams:  []string{"mystream", ">"},
	Count:    10,
	Block:    0,
	NoAck:    false,
})

// add 2 more entries
client.XAdd(&redis.XAddArgs{
	Stream: "mystream",
	MaxLen: maxLen,
	Values: map[string]interface{}{"value": 3},
})
client.XAdd(&redis.XAddArgs{
	Stream: "mystream",
	MaxLen: maxLen,
	Values: map[string]interface{}{"value": 4},
})

// read by alice again, now alice has 4 pending messages, and 2 messages were trimed
client.XReadGroup(&redis.XReadGroupArgs{
	Group:    "mygroup",
	Consumer: "alice",
	Streams:  []string{"mystream", ">"},
	Count:    10,
	Block:    0,
	NoAck:    false,
}).Result()

// sleep for a while
time.Sleep(3 * time.Second)

// get all pending messages
pending, _ := client.XPendingExt(&redis.XPendingExtArgs{
	Stream: "mystream",
	Group:  "mygroup",
	Start:  "-",
	End:    "+",
	Count:  10,
}).Result()

claimIds := make([]string, 0)
for _, p := range pending {
	claimIds = append(claimIds, p.Id)
}

// bob claim messages, on redis-cli, the result will be:
// 1) (nil)
// 2) (nil)
// 3) 1) "1575368938045-0"
//    2) 1) "value"
// 	  2) "3"
// 4) 1) "1575371014539-0"
//    2) 1) "value"
// 	  2) "4"
rlt, err := client.XClaim(&redis.XClaimArgs{
	Stream:   "mystream",
	Group:    "mygroup",
	Consumer: "bob",
	MinIdle:  time.Second,
	Messages: claimIds,
}).Result()

// rlt will be empty array with error redis.Nil
log.Info(rlt, " ", err)

for i := 0; i < 4; i++ {
	len, err := client.XLen("mystream").Result() // redis call will fail serval times
	log.Info(i, " ", len, " ", err)
}

kkkbird avatar Dec 03 '19 11:12 kkkbird

Faced the same problem with redis 6.0/6.2, any news? Also there is errors like this (both on v8 and v9, example for v8): pool.go:356: Conn has unread data

nshibalov avatar Dec 29 '22 13:12 nshibalov

This issue is marked stale. It will be closed in 30 days if it is not updated.

github-actions[bot] avatar Feb 26 '24 00:02 github-actions[bot]