hangout
hangout copied to clipboard
重复消费问题
childe,你好: 首先非常感谢使用了hangout对我的帮助,我这里遇到一个问题,想请教你。 **问题:**我的hangout在消费kafka的并写入到Elasticsearch的时候,发现Elasticsearch上日志数据量比kafka的LOG-END-OFFSET数据要多,而kafka的LOG-END-OFFSET数据与我的日志文件数据量是一样的。 比如我的日志条数是48078条,我的kafka的LOG-END-OFFSE总和也是48078,但是我的Elasticsearch上查看到的日志是51334条,很好奇多余的几千条数据是怎么来的。 **我的理解:**我感觉可能是hangout消费kafka后,kafka并没有认为hangout消费了,所以数据显示没有被消费,则又被hangout消费了一遍,那么请问hangout消费一条数据之后,会做什么处理? **注:**由于某些原因,我的ES版本不能升级。 下面是我的应用环境: ES版本:Elasticsearch 2.3.5 hangout版本:hangout-0.1.8.2-ES2.3.5 kafka版本:kafka_2.11-1.1.0 zookeeper版本:zookeeper-3.4.12 下面是我的hangout配置
inputs:
- NewKafka:
codec: json
topic:
topic-test: 6
consumer_settings:
bootstrap.servers: 192.168.1.10:9092,192.168.1.11:9092,192.168.1.12:9092
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
group.id: topic-test
outputs:
- Elasticsearch:
cluster: es-cluster # cluster name, required
hosts: # required
- 192.168.10.10:9301
- 192.168.10.11:9301
- 192.168.10.12:9301
- 192.168.10.13:9301
- 192.168.10.14:9301
index: 'hangout-test'
index_type: logs # default logs
bulk_actions: 20000 # default 20000
bulk_size: 15 #default 15
flush_interval: 10 #default 10
concurrent_requests: 0 #default 0
timezone: "Asia/Shanghai" # defaut UTC 时区. 只用于生成索引名字的字符串格式化
sniff: false
很有可能是kafka group consumer 做了rebalance. 默认kafka是XX秒commit一次当前的offset. 如果新起一个消费者, 他会Join Group, 导致Group里面的成员之前的memberID失效. 当commit的时候, 新的memberID还没有拿到. 所以这段时间的数据可能会重复消费. 上面只是我的猜测 :) 你可以看一下client/server端日志, 是不是有rebalance.
childe,你好: 我说明以下三点: 1、如你所说,我做了个测试,重启hangout的时候,确实kafka会有rebalance group topic-test的日志信息出现,但是我昨天一直到今天并没有停止hangout,并且kafka日志也没有rebalance group topic-test的信息,最后查Elasticsearch日志数据量还是比file文件中日志数据量多(查询的是一小时的数据)。 2、为了多方面测试,我昨天用logstash2.3.4版本也消费kafka数据写入Elasticsearch,用的是同一组kafka,不同的topic,发现最后Elasticsearch和file文件中一小时的数据是一样的,并没有出现Elasticsearch多数据的情况。 3、我想到了一个问题:有没有可能是hangout消费完kafka数据并写入Elasticsearch之后成功之后返回给kafka的时候出现错误,然后并没有返回给kafka这条日志已经消费,所以再次消费了同一条数据?请问如果hangout写入Elasticsearch之后成功之后,返回给kafka的那段代码是怎么写的?有没有可能写入Elasticsearch成功因为某方面原因返回给kafka的结果是消费失败?
补充:我也重启过logstash,也么有出现多数据的情况
kafka读数据的代码与写数据到ES的代码, 之间并没有直接的调用关系. '写入Elasticsearch之后成功之后返回给kafka的时候出现错误' 这个应该是不存在的.
但kafka消费本身有可能出现offset没有正确记录的情况. 一时想不出还有什么其他情况了. 要不加个微信聊? 64973150 我也想搞清楚到底是为啥