go-queue
go-queue copied to clipboard
kq.KqConf.Consumers 感觉没什么意义啊
在kafkaQueue.startProducers中开Consumers个goroutine去拉取消息再向一个channel写,感觉没什么意义啊。FetchMessage本身也是从kafka-go的缓存channel里面获取消息,真实的拉取消息是kafka-go异步批量拉取的。从一个缓冲channel并发读再写入另一个缓冲channel,为什么不省掉这一步呢
在kafkaQueue.startProducers中开Consumers个goroutine去拉取消息再向一个channel写,感觉没什么意义啊。FetchMessage本身也是从kafka-go的缓存channel里面获取消息,真实的拉取消息是kafka-go异步批量拉取的。从一个缓冲channel并发读再写入另一个缓冲channel,为什么不省掉这一步呢
感觉是为了做异步,因为fetch是个很轻量的动作,但是consume中有业务会很重,所以中间加一个channel解耦
还是没什么意义,fetch本身已经是从一个带缓冲的channel中获取即已经是异步,多增加的这个channel看不出有什么解藕。因为consume消费慢了会导致channel背压,自然会导致调用FetchMessage的生产者goroutine阻塞,再反向传递到kafka-go的自有缓冲channel。 目前这种实现,并发从一个channel拿再并发写入另一个channel,再并发从最后这个channel拿数据consume,相当于凭白多了一轮并发写和并发读,损耗了性能
还是没什么意义,fetch本身已经是从一个带缓冲的channel中获取即已经是异步,多增加的这个channel看不出有什么解藕。因为consume消费慢了会导致channel背压,自然会导致调用FetchMessage的生产者goroutine阻塞,再反向传递到kafka-go的自有缓冲channel。 目前这种实现,并发从一个channel拿再并发写入另一个channel,再并发从最后这个channel拿数据consume,相当于凭白多了一轮并发写和并发读,损耗了性能
加的这个channel就是为了在业务层可以并发消费,利用kafka-go的merge commit进行批量提交(底层会对msg的offset进行排序),应该是可以提升性能的,相当于框架帮我们实现了本应该在业务层实现的东西。
(
??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了
(
??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了
你说的有道理,看了下别的库的处理方式都是直接fetchMessage后直接consume,然后提交,感觉只保留 startProducer 应该就够了
(
??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了
你说的有道理,看了下别的库的处理方式都是直接fetchMessage后直接consume,然后提交,感觉只保留 startProducer 应该就够了
当用户传入的consumerHandler很重时,这里的q.channel 会引起消费者一侧的阻塞,感觉把这一部分的逻辑都交给业务层自行定制更好