csredis
csredis copied to clipboard
Redis集群主从切换后SetAsync方法报错
-
版本情况
-
CSRedis:3.6.6
-
Redis版本:4.0.10
-
重现步骤
- Redis使用主备方式部署集群
- 业务程序正常启动,业务正常运行
- 模拟所有Redis Master发生故障,进行主备切换
- 调用业务中会使用redisclient.setasync方法的接口
-
故障表现
- 业务代码中redisclient.setasync方法调用不会成功
- 日志中会有报错:CSRedis.RedisException:Moved 5180 xxx.xxx.xxx.xxx:8379
-
原因
- 经过排查,发现原因为CSRedisClientAsync中的SetAsync方法代码处理逻辑错误
- CSRedisClientAsync中SetAsync方法有两种调用方式
async public Task<bool> SetAsync(string key, object value, int expireSeconds = -1, RedisExistence? exists = null)
{
object redisValule = this.SerializeRedisValueInternal(value);
if (expireSeconds <= 0 && exists == null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule)) == "OK";
if (expireSeconds <= 0 && exists != null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule, null, exists)) == "OK";
if (expireSeconds > 0 && exists == null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule, expireSeconds, null)) == "OK";
if (expireSeconds > 0 && exists != null) return await ExecuteScalarAsync(key, (c, k) => c.Value.SetAsync(k, redisValule, expireSeconds, exists)) == "OK";
return false;
}
async public Task<bool> SetAsync(string key, object value, TimeSpan expire, RedisExistence? exists = null)
{
object redisValule = this.SerializeRedisValueInternal(value);
if (expire <= TimeSpan.Zero && exists == null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule)) == "OK";
if (expire <= TimeSpan.Zero && exists != null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule, null, exists)) == "OK";
if (expire > TimeSpan.Zero && exists == null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule, expire, null)) == "OK";
if (expire > TimeSpan.Zero && exists != null) return await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule, expire, exists)) == "OK";
return false;
}
- 这两种方式的差异是,超时参数的类型,有int和timespan两种
- 当超时参数为timespan类型时,会执行
await ExecuteScalar(key, (c, k) => c.Value.SetAsync(k, redisValule))
方法 - 但这里是把把c.Value.SetAsync()这个异步方法,做为参数传递到了同步resdisclient中提供的ExecuteScalar同步方法
- 在ExecuteScalar中,执行这个调用时也没有加await方法,就导致了异常没有被捕获,直接报错了
T GetAndExecute<T>(RedisClientPool pool, Func<Object<RedisClient>, T> handler, int jump = 100, int errtimes = 0)
{
Object<RedisClient> obj = null;
Exception ex = null;
var redirect = ParseClusterRedirect(null);
try
{
obj = pool.Get();
while (true)
{ //因网络出错重试,默认1次
try
{
var ret = handler(obj);
return ret;
}
catch (RedisException ex3)
{
redirect = ParseClusterRedirect(ex3); //官方集群跳转
if (redirect == null || jump <= 0)
{
ex = ex3;
if (SentinelManager != null && ex.Message.Contains("READONLY"))
{ //哨兵轮询
if (pool.SetUnavailable(ex) == true)
BackgroundGetSentinelMasterValue();
}
throw ex;
}
break;
}