notes icon indicating copy to clipboard operation
notes copied to clipboard

某个队列的消费者不足时,再给这个队列添加 work进程

Open MrKLL opened this issue 6 years ago • 12 comments

你好,我现在使用tp5队列来做一个风控,买卖股票的订单会随着股票价钱波动产生盈亏,达到某个亏损值的时候我需要进行平仓,可以队列每一秒只能处理15个订单,我想问一下怎么可以加进程,让系统处理更多的订单呢

MrKLL avatar Aug 15 '18 03:08 MrKLL

每执行一次 php think queue:work --queue xxxx 命令, 就可以增加该队列的一个消费者
如果一个消费者不够, 就多执行几次, 达到消息的推送与消费的平衡

可以使用 supervisor 等运维工具自动化地管理消费者进程数量

coolseven avatar Aug 15 '18 03:08 coolseven

因为我这个风控需要每一秒去查看所有订单是否需要进行平仓,所以我这个队列使用了--daemon 参数让它一直执行,这样的话还能使用你的这个方法,多开进程吗,可以的话,会不会出现重复消费的情况呢

MrKLL avatar Aug 15 '18 04:08 MrKLL

从你的描述来看, 你说的需求实际上是定时任务, 而不是消费消息

消息队列的特点是有消息就消费, 没有消息就等着

你说的 每一秒去查看所有订单是否需要进行平仓 ,这个属于定时任务的范畴

coolseven avatar Aug 15 '18 04:08 coolseven

对的,因为crontab最小只能支持每分钟执行一次,而我的需求是每秒要执行一次,而且为了减轻数据库压力,我每次订单需要存在缓存中,刚好用这个队列满足可以每秒执行一次以及把对应的数据给我,就这么做了,但是后面发现一秒钟执行的任务有限,所以一直在找解决办法,你能给点建议吗,谢谢

MrKLL avatar Aug 15 '18 04:08 MrKLL

你可以尝试一下当订单生成之后主动向队列推送一条消息, 然后由队列的消费者去处理这个订单

而不是定时去扫描现有的所有订单

coolseven avatar Aug 15 '18 06:08 coolseven

我现在的做法貌似就是这样,我是订单任务都推到队列中,然后在消费者方法中取任务进行判断,没有达到某个点就用release(1)隔一秒继续执行,达到了就去平仓,删除任务,但是这样我看了一下,每秒就只能处理15个订单,而且我换成release(),每秒处理的也就是15单

MrKLL avatar Aug 15 '18 07:08 MrKLL

你可以把需求拆分成两个部分: 第一部分负责从找出达到平仓条件的订单, 并将达到平仓条件的订单推送到队列中 第二个部分负责处理达到平仓条件的订单

第一部分可以不用队列来做, 而是用常驻脚本来做, 该常驻脚本的伪代码如下:

while(true){
     // 从数据库或缓存中按照时间或者其他维度取出一定数量的订单
     $ordersToCheck = $this->queryOrders();
    
     // 从这些订单中过滤出达到平仓条件的订单
     $orders = $this->filterSuitableOrders($ordersToCheck) ;   

     foreach($orders as $order){
           // 将达到平仓条件的订单推送到 'close-position' 队列中, 由消费进程去进行平仓操作
           Queue::push($order, 'close-position');
     }
}

第二部分负责消费 'close-position' 队列中的订单

你可以找一下每秒处理15单的瓶颈在哪里,

  • 如果瓶颈在过滤出达到平仓条件的订单的话, 就优化第一部分的逻辑, 核心思路是并行过滤 比如: 每次将订单存储到缓存或者数据库时, 根据订单的某个特征,如 订单号的 前3位数字, 将订单存储到不同的缓存 list 里面 或者 不同的表里面 然后, 针对每个 缓存 list , 或者针对每个表, 分别启动一个或多个常驻脚本, 这个常驻脚本负责从它对应的 缓存 list 或者 表里面进行扫描和过滤

  • 如果瓶颈在处理达到平仓条件的订单的话, 就增加 'close-position' 队列的消费进程数量

coolseven avatar Aug 15 '18 12:08 coolseven

好的,十分感谢

MrKLL avatar Aug 16 '18 01:08 MrKLL

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

im55cc avatar Apr 29 '20 08:04 im55cc

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

获取一个待消费的任务的本质是调用 redis 的 pop() 方法得到一个 job payload
由于 redis 服务是单进程的, 所以即使多个消费进程同时调用 pop() 方法, 也不会得到重复的 job payload

coolseven avatar May 10 '20 06:05 coolseven

嗯嗯,好的,非常谢谢你的讲解。

---原始邮件--- 发件人: "HuZhongyuan"<[email protected]> 发送时间: 2020年5月10日(周日) 下午2:37 收件人: "coolseven/notes"<[email protected]>; 抄送: "Comment"<[email protected]>;"55"<[email protected]>; 主题: Re: [coolseven/notes] 某个队列的消费者不足时,再给这个队列添加 work进程 (#12)

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

获取一个待消费的任务的本质是调用 redis 的 pop() 方法得到一个 job payload 由于 redis 服务是单进程的, 所以即使多个消费进程同时调用 pop() 方法, 也不会得到重复的 job payload

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or unsubscribe.

im55cc avatar May 10 '20 06:05 im55cc

你好我想请问一下。我创建multiTask的时候,taskA跟taskB都进入队列了,然后运行php think queue:work --queue multiTaskJobQueue的时候,第一次消费了TaskA,第二次消费了TaskB。但是命令行中并无输出。只有Processed: application\index\job\MultiTask@taskB这种提示,请问是什么原因?

Caesar0220 avatar Jul 21 '20 08:07 Caesar0220