pykafka
pykafka copied to clipboard
Zookeeper topic and consumer names with b literal
I think a problem that I am facing relates to these two issues:
https://github.com/Parsely/pykafka/issues/567 https://github.com/Parsely/pykafka/pull/569
Specifically, my problem is that Zookeeper is saving the b literal inside the string of topic and consumer group names, see the log line below:
2018-11-09 18:18:31 INFO [pykafka.balancedconsumer] Rebalancing consumer "b'DESKTOP-K7VPQI3:f1ea4868-71c1-44f2-a55c-00e7a908b1dc'" for topic "b'this-is-a-topic'".
and:
2018-11-09 18:22:59 INFO [pykafka.cluster] Attempting to discover offset manager for consumer group 'b'this-is-a-consumer-group''
My initialization of pykafka looks like this:
self.kazoo = KazooClient(hosts=self.zookeeper_hosts)
self.kazoo.start()
while not self.kazoo.connected:
continue
# Set up the client with the hosts
self.client = pykafka.KafkaClient(
zookeeper_hosts=self.zookeeper_hosts,
socket_timeout_ms=self.socket_timeout_ms,
broker_version=self.broker_version,
)
return self.client.topics[self.topic].get_balanced_consumer(
consumer_group=self.group_id,
zookeeper=self.kazoo,
consumer_timeout_ms=self._consumer_timeout_ms,
auto_offset_reset=OffsetType.LATEST,
auto_commit_enable=self._auto_commit_enable,
auto_commit_interval_ms=self._auto_commit_interval_ms,
)
If I query Zookeeper with Kazoo with this bit of code:
print(kazoo.get_children("/consumers/this-is-a-consumer-group/owners"))
I get this:
["b'this-is-a-topic'"]
I don't know if this is an issue with my set up being wrong, but it's not obvious to me why it should be behaving like this. I can't get it to become part of the correct partition and start consuming from the latest offset there, because of this.
PyKafka version: 2.8.0 Kafka version: 0.8.2 Python version: 3.6.6
Thanks for reporting this issue, @Atheuz. What are the types of self.topic
and self.group_id
? Since https://github.com/Parsely/pykafka/pull/760, they should be str
instances.
I've tried with both str
and b literal strings, and the behaviour is the same. If I run a consumer with either:
-
It does not appear to commit the offset of its messages as the lag for that consumer group just grows unbounded, and it doesn't appear that any offsets are committed.
-
It doesn't matter if I use b literals or
str
instances, the behaviour is the same. -
It's giving me these partitions:
My partitions: ["b'my-unused-topic'-4-112", "b'my-unused-topic'-3-63", "b'my-unused-topic'-5-125", "b'my-unused-topic'-4-40", "b'my-unused-topic'-2-110", ...]
Maybe I'm just not using it correctly, can you tell me where ZK/pykafka saves the partitions and offsets? Is it in /consumers/my-unused-group/offsets/my-unused-topic
? That's where my lag metric is being read from. I saw a post where you mentioned that the offset is not being stored in ZK, but in kafka, but I'd assume it'd have to be replicated on ZK given that I don't think the partitions can be stored in Kafka in 0.8.2 and I know those offsets don't match.
Regarding the b
literal in zookeeper, I think that's a bug that is definitely annoying but doesn't strictly preclude use of the consumer.
Pykafka stores offset information in Kafka itself via the OffsetCommitRequest
and OffsetFetchRequest
interface - it does not use Zookeeper for offset storage in any capacity. Pykafka's BalancedConsumer
, which you're using, does use Zookeeper for the storage of partition ownership information, which is used to balance consumer groups and is distinct from offset information. Thus I believe your assertion that offset information is "replicated in ZK" is not correct.
The code you've provided appears to be correct with respect to automatic offset committing, assuming that you're setting auto_commit_enable
to True
and auto_commit_interval_ms
to a reasonable number of milliseconds. If that's the case, it's hard to debug further without additional code. You mention that "the lag for that consumer group just grows unbounded" - what do you mean by that? More information about exactly how you're checking for committed offsets would be helpful in fixing your issue.
@emmett9001 my mistake. The problem I was having was that I could not get it to report offset lag as I was expecting it to, I changed over to the method used in kafka_tools.py and that way I was getting the correct offset lag - so more precisely the problem was that the offsets were not being stored in Zookeeper where I was expecting them to be stored.
I still get the weird b
literal in in the pykafka logs, and I'm not sure what to make of it, but it seems to be consuming/producing messages properly.
This is not a 'bug' for me any longer, I just had to fully grok what pykafka was actually doing.