redis-plus-plus icon indicating copy to clipboard operation
redis-plus-plus copied to clipboard

[BUG] sw::redis::AsyncRedisCluster xread not work

Open nqf opened this issue 4 months ago • 2 comments

22222:too many moved error

                sw::redis::AsyncRedisCluster rr{redis_uri};
                std::vector<std::string> cmd{"XREAD", "COUNT", "100", "STREAMS", "test_stream", "1765430590633-0"};
                rr.template command<std::unordered_map<std::string, ItemStream>>(
                    cmd.begin(), cmd.end(),
                    [](auto&& fut)
                    {
                        try
                        {
                            auto ret = fut.get();
                            REQUIRE(ret.size() == 1);
                        }
                        catch (const std::exception& e)
                        {
                            std::cout << "22222:" << e.what() << std::endl;
                        }
                    });

22222:too many moved error

nqf avatar Dec 11 '25 05:12 nqf

Looks like your cluster is quite unstable.

Client tries to access a key, whose slot has been migrate to a new node, and client is redirected to the new node (i.e. receive a MOVED error). However, when the client tries to access the key on the new node, it receives another MOVED error, i.e. the slot has been migrated again.

In order to avoid being redirected in a infinite loop, the client library throws a too many moved error exception.

Regards

sewenew avatar Dec 13 '25 06:12 sewenew

int main(int argc, char **argv) {
  std::string addr = "tcp://"
      "127.0.0.1:7000?socket_timeout=50ms&connect_timeout=1s&pool_size=10&pool_"
      "wait_timeout=1s&pool_connection_lifetime=60s&pool_connection_idle_time="
      "20s";
  sw::redis::RedisCluster redis(addr);

  using Attrs = std::vector<std::pair<std::string, std::string>>;
  Attrs attrs = {{"f1", "v1"}, {"f2", "v2"}};
  auto id = redis.xadd("test_stream", "*", attrs.begin(), attrs.end());
  auto id1 = redis.xadd("test_stream", "*", attrs.begin(), attrs.end());
  std::cout << id << std::endl;

  std::unordered_map<std::string, ItemStream> result;
  redis.xread("test_stream", id, 10, std::inserter(result, result.end()));
  for (auto [x, Item] : result) {
    for (auto [id_a, z] : Item) {
      std::cout << "sw::redis::RedisCluster xread:" << id_a << std::endl;
    }
  }

  sw::redis::AsyncRedisCluster rr{
      addr};
  std::vector<std::string> cmd{"XREAD",   "COUNT",       "100",
                               "STREAMS", "test_stream", id};
  rr.template command<std::unordered_map<std::string, ItemStream>>(
      cmd.begin(), cmd.end(), [](auto &&fut) {
        try {
          auto ret = fut.get();
        } catch (const std::exception &e) {
          std::cout << "22222:" << e.what() << std::endl;
        }
      });
  std::cout << "start wait" << std::endl;
  sleep(10);

  return 0;
}
➜  redis-plus-plus git:(master) ✗ ./a.out  
1765610707120-0
sw::redis::RedisCluster xread:1765610707121-0
start wait
22222:too many moved error
127.0.0.1:7002> monitor
OK
1765610949.499480 [0 172.17.0.1:54551] "XADD" "test_stream" "*" "f1" "v1" "f2" "v2"
1765610949.499979 [0 172.17.0.1:54551] "XADD" "test_stream" "*" "f1" "v1" "f2" "v2"
1765610949.500526 [0 172.17.0.1:54551] "XREAD" "COUNT" "10" "STREAMS" "test_stream" "1765610949499-0"

Using the same cluster sw::redis::RedisCluster can work, sw::redis::AsyncRedisCluster (22222:too many moved error) not work, So I think this is a bug in asynchronous interfaces

nqf avatar Dec 13 '25 07:12 nqf

So sorry, but I forget to reply. My bad!!!

You cannot use AsyncRedisCluster::command to send commands that too complicated, such as xread. I'll add a builtin support for it ASAP.

Regards

sewenew avatar Dec 22 '25 15:12 sewenew

maybe you can consider adding a universal command that allows the caller to specify the key, The issue with xread is that the key recognition is incorrect

    template <typename Result, typename Input, typename Callback>
    auto command(std::string_view key, Input first, Input last, Callback &&cb)

nqf avatar Dec 23 '25 01:12 nqf

@nqf xread for async interface has been merged to master branch. Sorry for the delay.

Regrads

sewenew avatar Dec 24 '25 13:12 sewenew

@nqf xread for async interface has been merged to master branch. Sorry for the delay.

Regrads

thanks, but it seems that not support set BLOCK milliseconds , I still need to set the BLOCK parameter, but I cannot set it now

nqf avatar Dec 24 '25 14:12 nqf

@sewenew could you check the MR above

nqf avatar Dec 30 '25 05:12 nqf

@nqf I did not merge that MR, because it's not a good idea to call block operation with async interface. You should be very careful on the usage. Since the commands which uses the same underlying connection with block operation might be blocked until the block operation finishes.

I checked other API of the async interface. Unfortunately, there're already some other APIs support blocking operation, I'll merge the MR after the author do some minor changes. However, be careful on this kind of usage.

Regards

sewenew avatar Dec 31 '25 10:12 sewenew

@nqf I did not merge that MR, because it's not a good idea to call block operation with async interface. You should be very careful on the usage. Since the commands which uses the same underlying connection with block operation might be blocked until the block operation finishes.

I checked other API of the async interface. Unfortunately, there're already some other APIs support blocking operation, I'll merge the MR after the author do some minor changes. However, be careful on this kind of usage.

Regards

thanks, I need this kind of usage to work with my coroutines. For this, I will create a separate client that only sends async block XREAD requests.

fantasy-peak avatar Dec 31 '25 11:12 fantasy-peak

Close the issue, since code has been merged.

Regards

sewenew avatar Jan 03 '26 07:01 sewenew