StackExchange.Redis icon indicating copy to clipboard operation
StackExchange.Redis copied to clipboard

When xreadgroup block reads data after a redistimeout, the first message cannot be consumed

Open punisher1 opened this issue 2 years ago • 2 comments

  • windows10
  • redis 5.0.10
  • dotnet any version
  • StackExchange.Redis 2.6.111
using StackExchange.Redis;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

class Program
{

    static void Main()
    {

        var config = ConfigurationOptions.Parse("127.0.0.1:6379");
        config.CommandMap = CommandMap.Create(new HashSet<string> { "SUBSCRIBE" }, false);
        var redisRead = ConnectionMultiplexer.Connect(config);
        var redisReadDb = redisRead.GetDatabase(1);

        try
        {
            redisReadDb.StreamCreateConsumerGroup("test", "default", StreamPosition.NewMessages, true);
        }
        catch { }


        var redisWrite = ConnectionMultiplexer.Connect(config);
        var redisWriteDb = redisWrite.GetDatabase(1);

        var writeTask = Task.Run(async () =>
        {
            while (true)
            {
                var id = await redisWriteDb.StreamAddAsync("test", "time", DateTime.Now.ToString());
                var id2 = await redisWriteDb.StreamAddAsync("test", "time", DateTime.Now.ToString());

                Console.WriteLine($"{DateTime.Now:O} write message:{id},{id2}");
                Thread.Sleep(10000);
            }
        });

        var readTask = Task.Run(async () =>
        {
            var arguments = new List<object>
                {
                    "GROUP",
                    "default",
                    $"dotnet",
                    "COUNT",
                    10,
                    "BLOCK",
                    "0",
                    "STREAMS",
                    "test",
                    ">"
                };

            while (true)
            {
                try
                {
                    Console.WriteLine($"{DateTime.Now:O} redis xreadgroup block read ...");

                    var result = await redisReadDb.ExecuteAsync("xreadgroup", arguments).ConfigureAwait(false);
                    if (result.IsNull)
                    {
                        Console.WriteLine($"{DateTime.Now:O} redis xreadgroup isnull");
                    }
                    else
                    {
                        foreach (RedisResult[] subresults in (RedisResult[])result)
                        {
                            var streamName = (RedisValue)subresults[0];

                            foreach (RedisResult[] messages in (RedisResult[])subresults[1])
                            {
                                var id = (RedisValue)messages[0];
                                Console.WriteLine($"{DateTime.Now:O} read message:{id}");
                                await redisReadDb.StreamAcknowledgeAsync("test", "default", id);
                            }
                        }
                    }
                }
                catch (RedisTimeoutException)
                {
                    Console.WriteLine($"{DateTime.Now:O} redis timeout ...");
                }
            }
        });


        Console.ReadLine();
    }
}

2023-05-31T14:25:27.4733370+08:00 redis xreadgroup block read ... 2023-05-31T14:25:27.4976884+08:00 read message:1685514327495-0 2023-05-31T14:25:27.4994336+08:00 write message:1685514327495-0,1685514327498-0 2023-05-31T14:25:27.5082905+08:00 redis xreadgroup block read ... 2023-05-31T14:25:27.5101550+08:00 read message:1685514327498-0 2023-05-31T14:25:27.5107652+08:00 redis xreadgroup block read ... 2023-05-31T14:25:33.4848775+08:00 redis timeout ... 2023-05-31T14:25:33.4851831+08:00 redis xreadgroup block read ... 2023-05-31T14:25:37.5174767+08:00 write message:1685514337514-0,1685514337515-0 2023-05-31T14:25:37.5175269+08:00 read message:1685514337515-0 2023-05-31T14:25:37.5189520+08:00 redis xreadgroup block read ... 2023-05-31T14:25:43.4835083+08:00 redis timeout ... 2023-05-31T14:25:43.4837146+08:00 redis xreadgroup block read ... 2023-05-31T14:25:47.5262835+08:00 write message:1685514347525-0,1685514347526-0 2023-05-31T14:25:47.5262841+08:00 read message:1685514347526-0 2023-05-31T14:25:47.5276668+08:00 redis xreadgroup block read ... 2023-05-31T14:25:53.4743876+08:00 redis timeout ... 2023-05-31T14:25:53.4745574+08:00 redis xreadgroup block read ... 2023-05-31T14:25:57.5292349+08:00 write message:1685514357528-0,1685514357529-0 2023-05-31T14:25:57.5292381+08:00 read message:1685514357529-0 2023-05-31T14:25:57.5314306+08:00 redis xreadgroup block read ...

punisher1 avatar May 31 '23 06:05 punisher1

This may be because the "xreadgroup" command has been executed on the server before receiving the "timeout" result? It should never time out when reading is blocked.

haoma2514 avatar Dec 14 '23 01:12 haoma2514

Maybe you can refer to it https://github.com/StackExchange/StackExchange.Redis/issues/2135

haoma2514 avatar Dec 14 '23 07:12 haoma2514