张高峰
张高峰
Properties props = new Properties(); props.put("zk.connect", zkHosts); props.put("groupid", groupId.groupID()); ConsumerConfig config = new ConsumerConfig(props); ConsumerConnector connector = Consumer.create(config); Map topicStreams = connector.createMessageStreams(ImmutableMap.of(groupId.topic().topicName(), 3), new StringDecoder()); List streams = topicStreams.get(groupId.topic().topicName()); ExecutorService...
如果把组ID换个新的,就能全部收到!
broker 启动了三个,每个broker partition默认1个,消费端1,2个都试了! 但是我生产端发送过消息之后应用就停了。
2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578) -No broker partitions consumed by consumer thread test4_localhost-1453978297875-8c763ac9-0 for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:579) -Check the consumer threads or the brokers for topic test [2016-01-28 18/:51/:38]WARN com.sohu.jafka.consumer.ZookeeperConsumerConnector(line/:578)...