使用zk方式,如果生产端不断发送消息,相同组的消费端全停服,然后在起来,为什么收不到消息增加的消息了,还是有其他方式
肯定可以的。因为zk记录了各个消费者的偏移量,消费者再启动时会从上次的偏移量开始消费消息。
如果有疑问,可贴出代码来分析下。
Properties props = new Properties(); props.put("zk.connect", zkHosts); props.put("groupid", groupId.groupID()); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector connector = Consumer.create(config); Map<String, List<MessageStream<String>>> topicStreams = connector.createMessageStreams(ImmutableMap.of(groupId.topic().topicName(), 3), new StringDecoder()); List<MessageStream<String>> streams = topicStreams.get(groupId.topic().topicName()); ExecutorService executor = Executors.newFixedThreadPool(3); for (final MessageStream<String> stream : streams) { executor.submit(new Runnable() {
@Override
public void run() {
for (String msg : stream) {
System.err.println(msg);
}
}
});
}
这是我根据gi thub例子写的,麻烦给分析分析
如果把组ID换个新的,就能全部收到!
broker对应的topic有几个分区partition?
ExecutorService executor = Executors.newFixedThreadPool(3);
全部消费端的线程数加起来和 要比 broker的所有partition加起来和要多。 另外,有几个消费端?每个消费端都不消费消息么?
日志有什么WARN 提示?
broker 启动了三个,每个broker partition默认1个,消费端1,2个都试了! 但是我生产端发送过消息之后应用就停了。
broker和consumer 日志中是否有ERROR/WARN 可供分析?
2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-0 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-1 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-4 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-5 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-2 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-3 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.j
看起来像是zookeeper的配置问题,consumer连接的zk和broker发送的zk不是同一个。QQ ④①〇⑨④③②〇〇 临时沟通下