When xreadgroup block reads data after a redistimeout, the first message cannot be consumed
- windows10
- redis 5.0.10
- dotnet any version
- StackExchange.Redis 2.6.111
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
class Program
{
static void Main()
{
var config = ConfigurationOptions.Parse("127.0.0.1:6379");
config.CommandMap = CommandMap.Create(new HashSet<string> { "SUBSCRIBE" }, false);
var redisRead = ConnectionMultiplexer.Connect(config);
var redisReadDb = redisRead.GetDatabase(1);
try
{
redisReadDb.StreamCreateConsumerGroup("test", "default", StreamPosition.NewMessages, true);
}
catch { }
var redisWrite = ConnectionMultiplexer.Connect(config);
var redisWriteDb = redisWrite.GetDatabase(1);
var writeTask = Task.Run(async () =>
{
while (true)
{
var id = await redisWriteDb.StreamAddAsync("test", "time", DateTime.Now.ToString());
var id2 = await redisWriteDb.StreamAddAsync("test", "time", DateTime.Now.ToString());
Console.WriteLine($"{DateTime.Now:O} write message:{id},{id2}");
Thread.Sleep(10000);
}
});
var readTask = Task.Run(async () =>
{
var arguments = new List<object>
{
"GROUP",
"default",
$"dotnet",
"COUNT",
10,
"BLOCK",
"0",
"STREAMS",
"test",
">"
};
while (true)
{
try
{
Console.WriteLine($"{DateTime.Now:O} redis xreadgroup block read ...");
var result = await redisReadDb.ExecuteAsync("xreadgroup", arguments).ConfigureAwait(false);
if (result.IsNull)
{
Console.WriteLine($"{DateTime.Now:O} redis xreadgroup isnull");
}
else
{
foreach (RedisResult[] subresults in (RedisResult[])result)
{
var streamName = (RedisValue)subresults[0];
foreach (RedisResult[] messages in (RedisResult[])subresults[1])
{
var id = (RedisValue)messages[0];
Console.WriteLine($"{DateTime.Now:O} read message:{id}");
await redisReadDb.StreamAcknowledgeAsync("test", "default", id);
}
}
}
}
catch (RedisTimeoutException)
{
Console.WriteLine($"{DateTime.Now:O} redis timeout ...");
}
}
});
Console.ReadLine();
}
}
2023-05-31T14:25:27.4733370+08:00 redis xreadgroup block read ... 2023-05-31T14:25:27.4976884+08:00 read message:1685514327495-0 2023-05-31T14:25:27.4994336+08:00 write message:1685514327495-0,1685514327498-0 2023-05-31T14:25:27.5082905+08:00 redis xreadgroup block read ... 2023-05-31T14:25:27.5101550+08:00 read message:1685514327498-0 2023-05-31T14:25:27.5107652+08:00 redis xreadgroup block read ... 2023-05-31T14:25:33.4848775+08:00 redis timeout ... 2023-05-31T14:25:33.4851831+08:00 redis xreadgroup block read ... 2023-05-31T14:25:37.5174767+08:00 write message:1685514337514-0,1685514337515-0 2023-05-31T14:25:37.5175269+08:00 read message:1685514337515-0 2023-05-31T14:25:37.5189520+08:00 redis xreadgroup block read ... 2023-05-31T14:25:43.4835083+08:00 redis timeout ... 2023-05-31T14:25:43.4837146+08:00 redis xreadgroup block read ... 2023-05-31T14:25:47.5262835+08:00 write message:1685514347525-0,1685514347526-0 2023-05-31T14:25:47.5262841+08:00 read message:1685514347526-0 2023-05-31T14:25:47.5276668+08:00 redis xreadgroup block read ... 2023-05-31T14:25:53.4743876+08:00 redis timeout ... 2023-05-31T14:25:53.4745574+08:00 redis xreadgroup block read ... 2023-05-31T14:25:57.5292349+08:00 write message:1685514357528-0,1685514357529-0 2023-05-31T14:25:57.5292381+08:00 read message:1685514357529-0 2023-05-31T14:25:57.5314306+08:00 redis xreadgroup block read ...
This may be because the "xreadgroup" command has been executed on the server before receiving the "timeout" result? It should never time out when reading is blocked.
Maybe you can refer to it https://github.com/StackExchange/StackExchange.Redis/issues/2135