[BUG] sw::redis::AsyncRedisCluster xread not work
22222:too many moved error
sw::redis::AsyncRedisCluster rr{redis_uri};
std::vector<std::string> cmd{"XREAD", "COUNT", "100", "STREAMS", "test_stream", "1765430590633-0"};
rr.template command<std::unordered_map<std::string, ItemStream>>(
cmd.begin(), cmd.end(),
[](auto&& fut)
{
try
{
auto ret = fut.get();
REQUIRE(ret.size() == 1);
}
catch (const std::exception& e)
{
std::cout << "22222:" << e.what() << std::endl;
}
});
22222:too many moved error
Looks like your cluster is quite unstable.
Client tries to access a key, whose slot has been migrate to a new node, and client is redirected to the new node (i.e. receive a MOVED error). However, when the client tries to access the key on the new node, it receives another MOVED error, i.e. the slot has been migrated again.
In order to avoid being redirected in a infinite loop, the client library throws a too many moved error exception.
Regards
int main(int argc, char **argv) {
std::string addr = "tcp://"
"127.0.0.1:7000?socket_timeout=50ms&connect_timeout=1s&pool_size=10&pool_"
"wait_timeout=1s&pool_connection_lifetime=60s&pool_connection_idle_time="
"20s";
sw::redis::RedisCluster redis(addr);
using Attrs = std::vector<std::pair<std::string, std::string>>;
Attrs attrs = {{"f1", "v1"}, {"f2", "v2"}};
auto id = redis.xadd("test_stream", "*", attrs.begin(), attrs.end());
auto id1 = redis.xadd("test_stream", "*", attrs.begin(), attrs.end());
std::cout << id << std::endl;
std::unordered_map<std::string, ItemStream> result;
redis.xread("test_stream", id, 10, std::inserter(result, result.end()));
for (auto [x, Item] : result) {
for (auto [id_a, z] : Item) {
std::cout << "sw::redis::RedisCluster xread:" << id_a << std::endl;
}
}
sw::redis::AsyncRedisCluster rr{
addr};
std::vector<std::string> cmd{"XREAD", "COUNT", "100",
"STREAMS", "test_stream", id};
rr.template command<std::unordered_map<std::string, ItemStream>>(
cmd.begin(), cmd.end(), [](auto &&fut) {
try {
auto ret = fut.get();
} catch (const std::exception &e) {
std::cout << "22222:" << e.what() << std::endl;
}
});
std::cout << "start wait" << std::endl;
sleep(10);
return 0;
}
➜ redis-plus-plus git:(master) ✗ ./a.out
1765610707120-0
sw::redis::RedisCluster xread:1765610707121-0
start wait
22222:too many moved error
127.0.0.1:7002> monitor
OK
1765610949.499480 [0 172.17.0.1:54551] "XADD" "test_stream" "*" "f1" "v1" "f2" "v2"
1765610949.499979 [0 172.17.0.1:54551] "XADD" "test_stream" "*" "f1" "v1" "f2" "v2"
1765610949.500526 [0 172.17.0.1:54551] "XREAD" "COUNT" "10" "STREAMS" "test_stream" "1765610949499-0"
Using the same cluster sw::redis::RedisCluster can work, sw::redis::AsyncRedisCluster (22222:too many moved error) not work, So I think this is a bug in asynchronous interfaces
So sorry, but I forget to reply. My bad!!!
You cannot use AsyncRedisCluster::command to send commands that too complicated, such as xread. I'll add a builtin support for it ASAP.
Regards
maybe you can consider adding a universal command that allows the caller to specify the key, The issue with xread is that the key recognition is incorrect
template <typename Result, typename Input, typename Callback>
auto command(std::string_view key, Input first, Input last, Callback &&cb)
@nqf xread for async interface has been merged to master branch. Sorry for the delay.
Regrads
@nqf
xreadfor async interface has been merged to master branch. Sorry for the delay.Regrads
thanks, but it seems that not support set BLOCK milliseconds , I still need to set the BLOCK parameter, but I cannot set it now
@sewenew could you check the MR above
@nqf I did not merge that MR, because it's not a good idea to call block operation with async interface. You should be very careful on the usage. Since the commands which uses the same underlying connection with block operation might be blocked until the block operation finishes.
I checked other API of the async interface. Unfortunately, there're already some other APIs support blocking operation, I'll merge the MR after the author do some minor changes. However, be careful on this kind of usage.
Regards
@nqf I did not merge that MR, because it's not a good idea to call block operation with async interface. You should be very careful on the usage. Since the commands which uses the same underlying connection with block operation might be blocked until the block operation finishes.
I checked other API of the async interface. Unfortunately, there're already some other APIs support blocking operation, I'll merge the MR after the author do some minor changes. However, be careful on this kind of usage.
Regards
thanks, I need this kind of usage to work with my coroutines. For this, I will create a separate client that only sends async block XREAD requests.
Close the issue, since code has been merged.
Regards