feat(stream): add support of the CLAIM option for the XREADGROUP command
https://redis.io/docs/latest/commands/xreadgroup/
feat(stream): Add support for XREADGROUP CLAIM option
English / 中文
English
Overview
This PR implements the CLAIM option for the XREADGROUP command, enabling consumers to claim idle pending messages from other consumers in a consumer group. This feature is essential for handling failed or slow consumers in distributed stream processing scenarios.
Motivation
In distributed stream processing with consumer groups, consumers may fail or become slow, causing messages to remain in their Pending Entries List (PEL) indefinitely. The CLAIM option allows other consumers to take over these idle messages, improving fault tolerance and ensuring timely message processing.
Changes
Core Implementation
-
Command Parsing (
src/commands/cmd_stream.cc)- Added
CLAIM min-idle-timeparameter parsing with validation - Implemented smart response format selection (2-element vs 4-element)
- Updated
AddStreamEntriesToResponsefor conditional formatting
- Added
-
Stream Processing (
src/types/redis_stream.cc)- Implemented comprehensive claiming logic in
Stream::RangeWithPending - Added idle time calculation and message ownership transfer
- Implemented ordering guarantees (idle entries first, ordered by idle time)
- Optimized consumer metadata management
- Fixed PEL iteration to respect
exclude_startoption
- Implemented comprehensive claiming logic in
-
Data Structures (
src/types/redis_stream_base.h,src/types/redis_stream.h)- Extended
StreamEntrywithidle_msanddelivery_countfields - Created
StreamReadGroupReadOptionsstruct for CLAIM parameters - Updated
RangeWithPendingsignature
- Extended
Key Features
- ✅ Extended Reply Format: Claimed entries return
[id, fields, idle_ms, delivery_count] - ✅ Ordering Guarantees: Idle entries first (longest idle first), then new entries
- ✅ NOACK Interaction: Claimed entries still added to PEL (per Redis spec)
- ✅ Min-Idle-Time Filtering: Only claim messages exceeding threshold
- ✅ Delivery Count Tracking: Accurate increment and reporting
Test Coverage
Comprehensive test suite in tests/gocase/unit/type/stream/xreadgroup_test.go:
- Basic CLAIM: Message claiming and ownership transfer
- Ordering Guarantees: Validates idle-first ordering by idle time
- NOACK Interaction: Verifies claimed entries remain in PEL
- Min-Idle-Time Filtering: Tests threshold-based claiming
- Delivery Count: Validates accurate increment tracking
All tests passing ✅
Usage Example
# Consumer1 reads messages
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >
# Consumer2 claims messages idle for > 5000ms
XREADGROUP GROUP mygroup consumer2 COUNT 10 CLAIM 5000 STREAMS mystream >
# Response format:
# [[mystream, [
# [entry_id, [field, value, ...], idle_ms, delivery_count],
# ...
# ]]]
Redis Specification Compliance
This implementation follows the Redis protocol specification:
- CLAIM option accepts min-idle-time in milliseconds
- Returns entries in extended format with idle_ms and delivery_count
- Correctly updates PEL and consumer metadata
- Compatible with
>(latest) ID and NOACK option - Maintains ordering guarantees
Testing
# Run XREADGROUP CLAIM tests
./test.sh
# All tests passing:
# --- PASS: TestXReadGroup (3.31s)
# --- PASS: XREADGROUP_with_CLAIM_option (0.10s)
# --- PASS: XREADGROUP_CLAIM_ordering_guarantees (0.21s)
# --- PASS: XREADGROUP_CLAIM_with_NOACK (0.10s)
# --- PASS: XREADGROUP_CLAIM_min_idle_time_filter (0.05s)
# --- PASS: XREADGROUP_CLAIM_delivery_count_increment (0.10s)
Performance
- Time Complexity: O(N) where N is the COUNT parameter
- Space Complexity: O(N) for returned entries
- Database Operations: Single batched write for all PEL updates (atomic)
中文
概述
本PR为 XREADGROUP 命令实现了 CLAIM 选项,使消费者能够从消费者组中的其他消费者那里认领空闲的待处理消息。此功能对于处理分布式流处理场景中失败或缓慢的消费者至关重要。
动机
在使用消费者组进行分布式流处理时,消费者可能会失败或变慢,导致消息无限期地保留在其待处理条目列表(PEL)中。CLAIM 选项允许其他消费者接管这些空闲消息,提高容错性并确保及时处理消息。
更改内容
核心实现
-
命令解析 (
src/commands/cmd_stream.cc)- 添加了
CLAIM min-idle-time参数解析和验证 - 实现智能响应格式选择(2元素 vs 4元素)
- 更新
AddStreamEntriesToResponse以支持条件格式化
- 添加了
-
流处理 (
src/types/redis_stream.cc)- 在
Stream::RangeWithPending中实现了完整的认领逻辑 - 添加了空闲时间计算和消息所有权转移
- 实现了排序保证(空闲条目优先,按空闲时间排序)
- 优化了消费者元数据管理
- 修复了PEL迭代以遵守
exclude_start选项
- 在
-
数据结构 (
src/types/redis_stream_base.h,src/types/redis_stream.h)- 扩展
StreamEntry增加idle_ms和delivery_count字段 - 创建
StreamReadGroupReadOptions结构体用于传递CLAIM参数 - 更新
RangeWithPending函数签名
- 扩展
关键特性
- ✅ 扩展回复格式: 认领的条目返回
[id, fields, idle_ms, delivery_count] - ✅ 排序保证: 空闲条目优先(最长空闲时间优先),然后是新条目
- ✅ NOACK交互: 认领的条目仍然添加到PEL(符合Redis规范)
- ✅ 最小空闲时间过滤: 仅认领超过阈值的消息
- ✅ 投递计数跟踪: 准确的增量和报告
测试覆盖
tests/gocase/unit/type/stream/xreadgroup_test.go 中的完整测试套件:
- 基础CLAIM: 消息认领和所有权转移
- 排序保证: 验证按空闲时间的空闲优先排序
- NOACK交互: 验证认领的条目保留在PEL中
- 最小空闲时间过滤: 测试基于阈值的认领
- 投递计数: 验证准确的增量跟踪
所有测试通过 ✅
使用示例
# 消费者1读取消息
XREADGROUP GROUP mygroup consumer1 COUNT 10 STREAMS mystream >
# 消费者2认领空闲时间 > 5000ms 的消息
XREADGROUP GROUP mygroup consumer2 COUNT 10 CLAIM 5000 STREAMS mystream >
# 响应格式:
# [[mystream, [
# [entry_id, [field, value, ...], idle_ms, delivery_count],
# ...
# ]]]
Redis规范合规性
此实现遵循Redis协议规范:
- CLAIM选项接受以毫秒为单位的最小空闲时间
- 以扩展格式返回包含idle_ms和delivery_count的条目
- 正确更新PEL和消费者元数据
- 与
>(latest) ID和NOACK选项兼容 - 维护排序保证
测试
# 运行XREADGROUP CLAIM测试
./test.sh
# 所有测试通过:
# --- PASS: TestXReadGroup (3.31s)
# --- PASS: XREADGROUP_with_CLAIM_option (0.10s)
# --- PASS: XREADGROUP_CLAIM_ordering_guarantees (0.21s)
# --- PASS: XREADGROUP_CLAIM_with_NOACK (0.10s)
# --- PASS: XREADGROUP_CLAIM_min_idle_time_filter (0.05s)
# --- PASS: XREADGROUP_CLAIM_delivery_count_increment (0.10s)
性能
- 时间复杂度: O(N),其中N是COUNT参数
- 空间复杂度: O(N),用于返回的条目
- 数据库操作: 所有PEL更新的单次批量写入(原子操作)
redis XREADGROUP 有 CLAIM 的返回值会多出一些,官方文档没写 但是,含义和xpending类似, https://redis.io/docs/latest/commands/xpending/
> XREADGROUP GROUP send test COUNT 1 BLOCK 1000 CLAIM 1000 STREAMS smtp >
1) 1) "smtp"
2) 1) 1) "1764497928656-0"
2) 1) "key"
2) "value"
3) "287341" // The number of milliseconds that elapsed since the last time this message was delivered to this consumer.
4) "22" // The number of times this message was delivered.
我还需要继续加工下
redis XREADGROUP 有 CLAIM 的返回值会多出一些,官方文档没写 但是,含义和xpending类似, https://redis.io/docs/latest/commands/xpending/
> XREADGROUP GROUP send test COUNT 1 BLOCK 1000 CLAIM 1000 STREAMS smtp > 1) 1) "smtp" 2) 1) 1) "1764497928656-0" 2) 1) "key" 2) "value" 3) "287341" // The number of milliseconds that elapsed since the last time this message was delivered to this consumer. 4) "22" // The number of times this message was delivered.我还需要继续加工下
fixed