secor
secor copied to clipboard
Writing and reading offsets only from kafka
Hi Team
I have noticed that offsets are still being written to zookeeper even if I specify below config variables
kafka.offsets.storage=kafka
kafka.dual.commit.enabled= false
Is this expected or am I doing some thing wrong.
we are using kafka 2.5
Secor use ZK for 2 purposes:
- keep track of kafka message progress per topic/partition
- use ZK as a distributed lock during file uploading
Purpose 1 can be switched to kafka as the offset storage, but the dependency on ZK as a global lock is still there.
On Tue, Oct 20, 2020 at 12:49 PM naresh-kotha-ck [email protected] wrote:
we are using kafka 2.5
— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/pinterest/secor/issues/1647#issuecomment-713099165, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABYJP75U7RSQCMQTQDW7ITDSLXSU5ANCNFSM4SYZM4XA .
yes but for purpose 1 I still see offsets in zookeeper even I set below properties
kafka.offsets.storage=kafka
kafka.dual.commit.enabled= false
Can you please let me know if I am missing it
For purpose 1, I think there are still some code that writes the offset to ZK (while also writing to kafka), but the reader is only reading offsets from Kafka. If you prefer, you can submit a PR to clear those writing places (I think they are mostly in Uploader.java).
On Wed, Oct 21, 2020 at 10:30 AM naresh-kotha-ck [email protected] wrote:
yes but for purpose 1 I still see offsets in zookeeper even I set below properties
kafka.offsets.storage=kafka
kafka.dual.commit.enabled= false
Can you please let me know if I am missing it
— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/pinterest/secor/issues/1647#issuecomment-713736111, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABYJP75YUUOQ5TNUJL2KK2LSL4LE3ANCNFSM4SYZM4XA .
Sure I will create a PR for that . Thanks for the details
Hey @HenryCaiHaiying,
I also think there is a bug or inconsistency in SecorKafkaMessageIterator
or I don't understand something correctly.
Currently, there is a variable that is responsible for skipping reading offsets from zookeeper, but the variable is equal to
offsetStorage.equals("kafka") && dualCommitEnabled.equals("true")
. So, if I understand correctly, if we set offset.storage=kafka
and dual.commit.enabled=false
we are going to try to read offsets from Zookeeper, even though they will never be there due to dual.commit.enabled=false
. Shouldn't the check be for false
instead of true
?
I think 'dual.commit.enabled' = false, the offsets will be written to ZK as default.
On Thu, Dec 3, 2020 at 5:22 AM DomWos [email protected] wrote:
Hey @HenryCaiHaiying https://github.com/HenryCaiHaiying, I also think there is a bug or inconsistency in SecorKafkaMessageIterator or I don't understand something correctly. Currently, there is a variable that is responsible for skipping reading offsets from zookeeper, but the variable is equal to offsetStorage.equals("kafka") && dualCommitEnabled.equals("true"). So, if I understand correctly, if we set offset.storage=kafka and dual.commit.enabled=false we are going to try to read offsets from Zookeeper, even though they will never be there due to dual.commit.enabled=false. Shouldn't the check be for false instead of true ?
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pinterest/secor/issues/1647#issuecomment-737990759, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABYJP723GV3ZO3IC45MD2MDSS6GLDANCNFSM4SYZM4XA .
Yeah, that is correct for the old kafka consumer used by legacy iterator. However, in the SecorKafkaMessageIterator
the new kafka consumer is used, which doesn't really even have the dual.commit.enabled
property available, nor it's actually set by Secor. So, in the new consumer we won't ever save the offsets to Zookeeper, but for some reason Secor will always try to read them from there.
Hello @HenryCaiHaiying and @naresh-kotha-ck, Is there any updates and/or clarification of this situation? Since Kafka 2.0 all internal Kafka tools don't support interaction with offsets in Zookeeper, so it is very hard to work with them if you need to reset offset, etc. If it possible to confirm that with configuration
kafka.offsets.storage=kafka
kafka.dual.commit.enabled=true
Secor will read offsets only from Kafka storage? I do understand that it will write to Zookeeper as well, but we want to migrate from Zookeeper offset storage to Kafka and want to be sure that we understand how Secor will interact after restart with updated configuration.
By looking at the code (SecorConsumerRebalanceListener.java), it looks like it will skip zookeeper when the Kafka.offsets.storage=kafka:
public void onPartitionsAssigned(Collection<TopicPartition> collection) {
if (skipZookeeperOffsetSeek) {
LOG.debug("offset storage set to kafka. Skipping reading offsets from zookeeper");
return;
}
Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(collection);