dinofei

Results 5 comments of dinofei

> 在kafkaQueue.startProducers中开Consumers个goroutine去拉取消息再向一个channel写,感觉没什么意义啊。FetchMessage本身也是从kafka-go的缓存channel里面获取消息,真实的拉取消息是kafka-go异步批量拉取的。从一个缓冲channel并发读再写入另一个缓冲channel,为什么不省掉这一步呢 感觉是为了做异步,因为fetch是个很轻量的动作,但是consume中有业务会很重,所以中间加一个channel解耦

> 还是没什么意义,fetch本身已经是从一个带缓冲的channel中获取即已经是异步,多增加的这个channel看不出有什么解藕。因为consume消费慢了会导致channel背压,自然会导致调用FetchMessage的生产者goroutine阻塞,再反向传递到kafka-go的自有缓冲channel。 目前这种实现,并发从一个channel拿再并发写入另一个channel,再并发从最后这个channel拿数据consume,相当于凭白多了一轮并发写和并发读,损耗了性能 加的这个channel就是为了在业务层可以并发消费,利用kafka-go的merge commit进行批量提交(底层会对msg的offset进行排序),应该是可以提升性能的,相当于框架帮我们实现了本应该在业务层实现的东西。

> > ( > > ??,加不加channel跟业务层并发有啥关系,在go-queue这个框架的封装里取channel里面的数据跟调用fetch有啥区别,你再看看kafka-go源码,kafka-go这个库已经偏高层api了,很多细节已经实现好了 你说的有道理,看了下别的库的处理方式都是直接fetchMessage后直接consume,然后提交,感觉只保留 startProducer 应该就够了

> 感谢对`kafka-go`进行服务化封装,的确用起来更简单了! > > 我对消费代码有个疑惑,望解答: > > ``` > for i := 0; i < q.c.Processors; i++ { > q.consumerRoutines.Run(func() { > for msg := range q.channel { > if...

> > > 感谢对`kafka-go`进行服务化封装,的确用起来更简单了! > > > 我对消费代码有个疑惑,望解答: > > > ``` > > > for i := 0; i < q.c.Processors; i++ { > > > q.consumerRoutines.Run(func() { >...