pika icon indicating copy to clipboard operation
pika copied to clipboard

咨询问题pipeline

Open wseven10 opened this issue 5 years ago • 2 comments

pipeline试了是可以支持的,但是没看到文档说明,不知道是不是有问题

wseven10 avatar Jul 08 '20 07:07 wseven10

没有问题

whoiami avatar Jul 09 '20 00:07 whoiami

pipeline别用,有bug,例如以下语句,在并发执行的时候redis中正确执行,在pika中数据执行错乱。 go版本

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

python版本

with handle.pipeline(transaction=False) as p:
        p.lrange('key', 0, 200-1)
        p.ltrim('key', 200, -1)
        res = p.execute()
        items = res[0]

ownthink avatar Aug 26 '21 03:08 ownthink

@ForestLH 负责验证该功能

AlexStocks avatar Apr 29 '23 11:04 AlexStocks

pipeline别用,有bug,例如以下语句,在并发执行的时候redis中正确执行,在pika中数据执行错乱。 go版本

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

python版本

with handle.pipeline(transaction=False) as p:
        p.lrange('key', 0, 200-1)
        p.ltrim('key', 200, -1)
        res = p.execute()
        items = res[0]

可以详细一点儿吗,是不是使用的客户端的问题,我用这个"github.com/go-redis/redis/v8"测试没有发现问题呀

ForestLH avatar Apr 30 '23 05:04 ForestLH

pipeline别用,有bug,例如以下语句,在并发执行的时候redis中正确执行,在pika中数据执行错乱。 go版本

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

python版本

with handle.pipeline(transaction=False) as p:
        p.lrange('key', 0, 200-1)
        p.ltrim('key', 200, -1)
        res = p.execute()
        items = res[0]

可以详细一点儿吗,是不是使用的客户端的问题,我用这个"github.com/go-redis/redis/v8"测试没有发现问题呀

So long time that you maybe can not get a reply.

AlexStocks avatar Apr 30 '23 06:04 AlexStocks

用的这个。go get github.com/go-redis/redis

ownthink avatar May 04 '23 02:05 ownthink

单个测试没问题,在多协程中同时执行以下语句会有问题,测试的时候可以往pika队列里面写入1-10000000的数字字符串,然后1000个协程同时执行以下语句将获取到的值存入txt,对比txt是否是1-10000000。

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

ownthink avatar May 04 '23 02:05 ownthink

单个测试没问题,在多协程中同时执行以下语句会有问题,测试的时候可以往pika队列里面写入1-10000000的数字字符串,然后1000个协程同时执行以下语句将获取到的值存入txt,对比txt是否是1-10000000。

pipe := client.Pipeline()
pipe.LRange("key", 0, 200-1)
pipe.LTrim("key", 200, -1)
cmders, err := pipe.Exec()

我这样测试还是没有测试出来问题,你可以看看这样测试对不对吗,谢谢您了 https://github.com/ForestLH/goredisclient

ForestLH avatar May 08 '23 03:05 ForestLH

哈喽不好意思,到现在才来提交。往redis里写数据参考put.py,从redis里读数据参考get.go。已经提交到https://github.com/ForestLH/goredisclient。redis写入和读取数据一致,pika会多数据,不知道这样编写是否有错误哈,仅供参考。

ownthink avatar Jun 25 '23 12:06 ownthink

哈喽不好意思,到现在才来提交。往 redis 里写数据参考 put.py,从 redis 里读数据参考 get.go。已经提交到 https://github.com/ForestLH/goredisclient。redis 写入和读取数据一致,pika 会多数据,不知道这样编写是否有错误哈,仅供参考。

你能否加下 Pika 微信助手 PikiwiDB【加的时候请注明暗号 pika pipeline】,然后它会把你拉进 Pika 微信群,我们在微信群里讨论下这个问题?

AlexStocks avatar Jun 25 '23 13:06 AlexStocks

https://github.com/OpenAtomFoundation/pika/blob/bf3574dbc4d877ebaacc423b0a8261fabe4ff466/src/net/src/redis_conn.cc#L89-L112

89 行从网络 read 一次,得到 nread,112 行,把 nread 传给 redis parser。

https://github.com/OpenAtomFoundation/pika/blob/bf3574dbc4d877ebaacc423b0a8261fabe4ff466/src/net/src/redis_parser.cc#L313-L321

parser 里逻辑,会判断上一次 read,有没有读“一半”的,有剩的一半,给拼到本次读的前面,然后进行对 request 进行 parse(解 cmd)

https://github.com/OpenAtomFoundation/pika/blob/bf3574dbc4d877ebaacc423b0a8261fabe4ff466/src/net/src/redis_parser.cc#L344-L346

可以看到解的过程是个循环,也就是说会把读到的可完整解出来的,都解出来,然后就是给 worker 现成执行了。

但是问题是读到的,比如 pipeline 里命令很多的话,或者网络 io 并发高等等因素,导致一次 read 没有把所有 pipeline 里命令都读完(不然正常 read 函数也不承诺一口气都读完),则 pipeline 里的多个命令,可能会在不同的 worker 线程里执行,导致响应的顺序跟请求的顺序对不上。

tedli avatar Jun 27 '23 01:06 tedli

还有一个点是 go-redis 的 pipeline 实现问题

https://github.com/redis/go-redis/blob/50f04c14dee344147e9cf061f752ee913fd6c3ee/redis.go#L497-L501

https://github.com/redis/go-redis/blob/50f04c14dee344147e9cf061f752ee913fd6c3ee/internal/proto/writer.go#L37-L53

https://github.com/redis/go-redis/blob/50f04c14dee344147e9cf061f752ee913fd6c3ee/command.go#L59-L61

可以看到,只是把 cmd 对象攒起来,然后循环 write 而已。

而看 redis 自己的 client pipeline 实现:

https://github.com/redis/redis/blob/22a29935ff7a354b8d90c110040f5bd0ea80068e/deps/hiredis/hiredis.c#L1106-L1117

可以看到 redis 自己的 client 的 pipeline 实现,攒 cmd 的逻辑是在拼 buffer,newbuf = hi_sdscatlen(c->obuf,cmd,len),这个 buffer 是最终一口气一次 write。

不过这个差别也不是硬伤,毕竟 write 也不承诺一口气都能写完。

tedli avatar Jun 27 '23 02:06 tedli

the oldest pika issue https://github.com/OpenAtomFoundation/pika/issues/19

AlexStocks avatar Jun 27 '23 04:06 AlexStocks

问题分 2 个维度,1 是 pipeline 本身维度,2 是并发读写状态维度。

  1. pipeline 本身维度: 少部分用户反馈的小概率遇到 pipeline 出错问题。 原因是,redis 网络协议层面判断不出来是不是 pipeline,io 层面只是知道 read 到了若干 command。pika 成功解出来 command 就放 worker 线程里执行了,command 执行是并发的,会有几率导致 pipeline 里后执行的 command,先 response 了,如果 pipeline 里前后 command 的返回值类型不同,比如 lpushx 的响应是个 int,lrange 的响应是个 string[],这种先后顺序回乱了,反映到 client 端代码,就是报错(解响应失败)。这个情况,pipeline 串起来的命令越长越容易复现。

  2. 并发读写维度: @ownthink 反馈的问题。这个见 复现代码。这个可以认为是 by design。至于 redis 为啥不会有这个问题,因为 redis 是单线程实现的。

tedli avatar Jun 28 '23 07:06 tedli

        * 0429 李浩负责验证推进 
        * 0506 李浩继续跟进
        * 0520 提问者未回复,不再跟进
        * 0624 李浩继续跟进,预计两周后做完
        * 0701 转到tedli继续跟进
        * 0708 已提交 PR,待合并
        * 0715 需要在确认复现
        * 0805 金鸽跟进复现
        * 0813 跟李振对一下,继续推进复现

AlexStocks avatar Jul 07 '23 11:07 AlexStocks

pika server解析出完整命令之后交由工作线程处理,然后会del对应connfd的读写事件,所以我理解在上一个请求处理完成向client回包之前,不会在收包,所以不会出现同一个连接上多个请求同时被工作线程池中的多个线程处理的情况。 但pika server不能保证多条conn的串行执行,所以可能跟客户端实现有关。即:多线程发起pipeline请求,最终每个线程的pipeline可能是由不同连接发送出去的。github.com/redis/go-redis/v9这里的客户端实现看起来就是这种情况。

wangshao1 avatar Aug 07 '23 07:08 wangshao1