jafka icon indicating copy to clipboard operation
jafka copied to clipboard

使用zk方式,如果生产端不断发送消息,相同组的消费端全停服,然后在起来,为什么收不到消息增加的消息了,还是有其他方式

Open zhanggaofeng opened this issue 9 years ago • 8 comments

zhanggaofeng avatar Jan 28 '16 09:01 zhanggaofeng

肯定可以的。因为zk记录了各个消费者的偏移量,消费者再启动时会从上次的偏移量开始消费消息。

如果有疑问,可贴出代码来分析下。

adyliu avatar Jan 28 '16 09:01 adyliu

Properties props = new Properties(); props.put("zk.connect", zkHosts); props.put("groupid", groupId.groupID()); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector connector = Consumer.create(config); Map<String, List<MessageStream<String>>> topicStreams = connector.createMessageStreams(ImmutableMap.of(groupId.topic().topicName(), 3), new StringDecoder()); List<MessageStream<String>> streams = topicStreams.get(groupId.topic().topicName()); ExecutorService executor = Executors.newFixedThreadPool(3); for (final MessageStream<String> stream : streams) { executor.submit(new Runnable() {

                            @Override
                            public void run() {
                                    for (String msg : stream) {
                                            System.err.println(msg);
                                    }
                            }
                    });
            }

这是我根据gi thub例子写的,麻烦给分析分析

zhanggaofeng avatar Jan 28 '16 10:01 zhanggaofeng

如果把组ID换个新的,就能全部收到!

zhanggaofeng avatar Jan 28 '16 10:01 zhanggaofeng

broker对应的topic有几个分区partition?

ExecutorService executor = Executors.newFixedThreadPool(3);

全部消费端的线程数加起来和 要比 broker的所有partition加起来和要多。 另外,有几个消费端?每个消费端都不消费消息么?

日志有什么WARN 提示?

adyliu avatar Jan 28 '16 10:01 adyliu

broker 启动了三个,每个broker partition默认1个,消费端1,2个都试了! 但是我生产端发送过消息之后应用就停了。

zhanggaofeng avatar Jan 28 '16 10:01 zhanggaofeng

broker和consumer 日志中是否有ERROR/WARN 可供分析?

adyliu avatar Jan 28 '16 10:01 adyliu

2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-0 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-1 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-4 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-5 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-2 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-3 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.j

zhanggaofeng avatar Jan 28 '16 11:01 zhanggaofeng

看起来像是zookeeper的配置问题,consumer连接的zk和broker发送的zk不是同一个。QQ ④①〇⑨④③②〇〇 临时沟通下

adyliu avatar Jan 28 '16 11:01 adyliu