csredis icon indicating copy to clipboard operation
csredis copied to clipboard

Redis集群主从切换后SetAsync方法报错

Open liucy1983 opened this issue 3 years ago • 0 comments

  • 版本情况

  • CSRedis:3.6.6

  • Redis版本:4.0.10

  • 重现步骤

    • Redis使用主备方式部署集群
    • 业务程序正常启动,业务正常运行
    • 模拟所有Redis Master发生故障,进行主备切换
    • 调用业务中会使用redisclient.setasync方法的接口
  • 故障表现

    • 业务代码中redisclient.setasync方法调用不会成功
    • 日志中会有报错:CSRedis.RedisException:Moved 5180 xxx.xxx.xxx.xxx:8379
  • 原因

    • 经过排查,发现原因为CSRedisClientAsync中的SetAsync方法代码处理逻辑错误
    • CSRedisClientAsync中SetAsync方法有两种调用方式
async public Task<bool> SetAsync(string key, object value, int expireSeconds = -1, RedisExistence? exists = null)
{
    object redisValule = this.SerializeRedisValueInternal(value);
    if (expireSeconds <= 0 && exists == null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule)) == "OK";
    if (expireSeconds <= 0 && exists != null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule, null, exists)) == "OK";
    if (expireSeconds > 0 && exists == null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule, expireSeconds, null)) == "OK";
    if (expireSeconds > 0 && exists != null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule, expireSeconds, exists)) == "OK";
    return false;
}
async public Task<bool> SetAsync(string key, object value, TimeSpan expire, RedisExistence? exists = null)
{
    object redisValule = this.SerializeRedisValueInternal(value);
    if (expire <= TimeSpan.Zero && exists == null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule)) == "OK";
    if (expire <= TimeSpan.Zero && exists != null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule, null, exists)) == "OK";
    if (expire > TimeSpan.Zero && exists == null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule, expire, null)) == "OK";
    if (expire > TimeSpan.Zero && exists != null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule, expire, exists)) == "OK";
    return false;
}
  • 这两种方式的差异是,超时参数的类型,有int和timespan两种
  • 当超时参数为timespan类型时,会执行await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule))方法
  • 但这里是把把c.Value.SetAsync()这个异步方法,做为参数传递到了同步resdisclient中提供的ExecuteScalar同步方法
  • 在ExecuteScalar中,执行这个调用时也没有加await方法,就导致了异常没有被捕获,直接报错了
 T GetAndExecute<T>(RedisClientPool pool, Func<Object<RedisClient>, T> handler, int jump = 100, int errtimes = 0)
        {
            Object<RedisClient> obj = null;
            Exception ex = null;
            var redirect = ParseClusterRedirect(null);
            try
            {
                obj = pool.Get();
                while (true)
                { //因网络出错重试,默认1次
                    try
                    {
                        var ret = handler(obj);
                        return ret;
                    }
                    catch (RedisException ex3)
                    {
                        redirect = ParseClusterRedirect(ex3); //官方集群跳转
                        if (redirect == null || jump <= 0)
                        {
                            ex = ex3;
                            if (SentinelManager != null && ex.Message.Contains("READONLY"))
                            { //哨兵轮询
                                if (pool.SetUnavailable(ex) == true)
                                    BackgroundGetSentinelMasterValue();
                            }
                            throw ex;
                        }
                        break;
                    }

liucy1983 avatar Sep 28 '21 02:09 liucy1983