go-queue icon indicating copy to clipboard operation
go-queue copied to clipboard

kq.KqConf.Consumers 感觉没什么意义啊

Open welllog opened this issue 1 year ago • 6 comments

在kafkaQueue.startProducers中开Consumers个goroutine去拉取消息再向一个channel写,感觉没什么意义啊。FetchMessage本身也是从kafka-go的缓存channel里面获取消息,真实的拉取消息是kafka-go异步批量拉取的。从一个缓冲channel并发读再写入另一个缓冲channel,为什么不省掉这一步呢

welllog avatar Jul 17 '23 07:07 welllog

在kafkaQueue.startProducers中开Consumers个goroutine去拉取消息再向一个channel写,感觉没什么意义啊。FetchMessage本身也是从kafka-go的缓存channel里面获取消息,真实的拉取消息是kafka-go异步批量拉取的。从一个缓冲channel并发读再写入另一个缓冲channel,为什么不省掉这一步呢

感觉是为了做异步,因为fetch是个很轻量的动作,但是consume中有业务会很重,所以中间加一个channel解耦

dinofei avatar Mar 29 '24 09:03 dinofei

还是没什么意义,fetch本身已经是从一个带缓冲的channel中获取即已经是异步,多增加的这个channel看不出有什么解藕。因为consume消费慢了会导致channel背压,自然会导致调用FetchMessage的生产者goroutine阻塞,再反向传递到kafka-go的自有缓冲channel。 目前这种实现,并发从一个channel拿再并发写入另一个channel,再并发从最后这个channel拿数据consume,相当于凭白多了一轮并发写和并发读,损耗了性能

welllog avatar Mar 30 '24 03:03 welllog

还是没什么意义,fetch本身已经是从一个带缓冲的channel中获取即已经是异步,多增加的这个channel看不出有什么解藕。因为consume消费慢了会导致channel背压,自然会导致调用FetchMessage的生产者goroutine阻塞,再反向传递到kafka-go的自有缓冲channel。 目前这种实现,并发从一个channel拿再并发写入另一个channel,再并发从最后这个channel拿数据consume,相当于凭白多了一轮并发写和并发读,损耗了性能

加的这个channel就是为了在业务层可以并发消费,利用kafka-go的merge commit进行批量提交(底层会对msg的offset进行排序),应该是可以提升性能的,相当于框架帮我们实现了本应该在业务层实现的东西。

dinofei avatar Mar 30 '24 04:03 dinofei

??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了

welllog avatar Mar 30 '24 05:03 welllog

??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了

你说的有道理,看了下别的库的处理方式都是直接fetchMessage后直接consume,然后提交,感觉只保留 startProducer 应该就够了

dinofei avatar Mar 30 '24 06:03 dinofei

??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了

你说的有道理,看了下别的库的处理方式都是直接fetchMessage后直接consume,然后提交,感觉只保留 startProducer 应该就够了

当用户传入的consumerHandler很重时,这里的q.channel 会引起消费者一侧的阻塞,感觉把这一部分的逻辑都交给业务层自行定制更好

planktonzp avatar Apr 08 '24 05:04 planktonzp