secor icon indicating copy to clipboard operation
secor copied to clipboard

Writing and reading offsets only from kafka

Open naresh-kotha-ck opened this issue 4 years ago • 10 comments

Hi Team

I have noticed that offsets are still being written to zookeeper even if I specify below config variables

kafka.offsets.storage=kafka

kafka.dual.commit.enabled= false

Is this expected or am I doing some thing wrong.

naresh-kotha-ck avatar Oct 20 '20 19:10 naresh-kotha-ck

we are using kafka 2.5

naresh-kotha-ck avatar Oct 20 '20 19:10 naresh-kotha-ck

Secor use ZK for 2 purposes:

  1. keep track of kafka message progress per topic/partition
  2. use ZK as a distributed lock during file uploading

Purpose 1 can be switched to kafka as the offset storage, but the dependency on ZK as a global lock is still there.

On Tue, Oct 20, 2020 at 12:49 PM naresh-kotha-ck [email protected] wrote:

we are using kafka 2.5

— You are receiving this because you are subscribed to this thread. Reply to this email directly, view it on GitHub https://github.com/pinterest/secor/issues/1647#issuecomment-713099165, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABYJP75U7RSQCMQTQDW7ITDSLXSU5ANCNFSM4SYZM4XA .

HenryCaiHaiying avatar Oct 21 '20 06:10 HenryCaiHaiying

yes but for purpose 1 I still see offsets in zookeeper even I set below properties

kafka.offsets.storage=kafka

kafka.dual.commit.enabled= false

Can you please let me know if I am missing it

naresh-kotha-ck avatar Oct 21 '20 17:10 naresh-kotha-ck

For purpose 1, I think there are still some code that writes the offset to ZK (while also writing to kafka), but the reader is only reading offsets from Kafka. If you prefer, you can submit a PR to clear those writing places (I think they are mostly in Uploader.java).

On Wed, Oct 21, 2020 at 10:30 AM naresh-kotha-ck [email protected] wrote:

yes but for purpose 1 I still see offsets in zookeeper even I set below properties

kafka.offsets.storage=kafka

kafka.dual.commit.enabled= false

Can you please let me know if I am missing it

— You are receiving this because you commented. Reply to this email directly, view it on GitHub https://github.com/pinterest/secor/issues/1647#issuecomment-713736111, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABYJP75YUUOQ5TNUJL2KK2LSL4LE3ANCNFSM4SYZM4XA .

HenryCaiHaiying avatar Oct 21 '20 18:10 HenryCaiHaiying

Sure I will create a PR for that . Thanks for the details

naresh-kotha-ck avatar Oct 21 '20 18:10 naresh-kotha-ck

Hey @HenryCaiHaiying, I also think there is a bug or inconsistency in SecorKafkaMessageIterator or I don't understand something correctly. Currently, there is a variable that is responsible for skipping reading offsets from zookeeper, but the variable is equal to offsetStorage.equals("kafka") && dualCommitEnabled.equals("true"). So, if I understand correctly, if we set offset.storage=kafka and dual.commit.enabled=false we are going to try to read offsets from Zookeeper, even though they will never be there due to dual.commit.enabled=false. Shouldn't the check be for false instead of true ?

DomWos avatar Dec 03 '20 13:12 DomWos

I think 'dual.commit.enabled' = false, the offsets will be written to ZK as default.

On Thu, Dec 3, 2020 at 5:22 AM DomWos [email protected] wrote:

Hey @HenryCaiHaiying https://github.com/HenryCaiHaiying, I also think there is a bug or inconsistency in SecorKafkaMessageIterator or I don't understand something correctly. Currently, there is a variable that is responsible for skipping reading offsets from zookeeper, but the variable is equal to offsetStorage.equals("kafka") && dualCommitEnabled.equals("true"). So, if I understand correctly, if we set offset.storage=kafka and dual.commit.enabled=false we are going to try to read offsets from Zookeeper, even though they will never be there due to dual.commit.enabled=false. Shouldn't the check be for false instead of true ?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/pinterest/secor/issues/1647#issuecomment-737990759, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABYJP723GV3ZO3IC45MD2MDSS6GLDANCNFSM4SYZM4XA .

HenryCaiHaiying avatar Dec 06 '20 04:12 HenryCaiHaiying

Yeah, that is correct for the old kafka consumer used by legacy iterator. However, in the SecorKafkaMessageIterator the new kafka consumer is used, which doesn't really even have the dual.commit.enabled property available, nor it's actually set by Secor. So, in the new consumer we won't ever save the offsets to Zookeeper, but for some reason Secor will always try to read them from there.

DomWos avatar Dec 09 '20 00:12 DomWos

Hello @HenryCaiHaiying and @naresh-kotha-ck, Is there any updates and/or clarification of this situation? Since Kafka 2.0 all internal Kafka tools don't support interaction with offsets in Zookeeper, so it is very hard to work with them if you need to reset offset, etc. If it possible to confirm that with configuration

kafka.offsets.storage=kafka
kafka.dual.commit.enabled=true

Secor will read offsets only from Kafka storage? I do understand that it will write to Zookeeper as well, but we want to migrate from Zookeeper offset storage to Kafka and want to be sure that we understand how Secor will interact after restart with updated configuration.

dpavlov-smartling avatar Dec 19 '23 16:12 dpavlov-smartling

By looking at the code (SecorConsumerRebalanceListener.java), it looks like it will skip zookeeper when the Kafka.offsets.storage=kafka:

    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        if (skipZookeeperOffsetSeek) {
            LOG.debug("offset storage set to kafka. Skipping reading offsets from zookeeper");
            return;
        }
        Map<TopicPartition, Long> committedOffsets = getCommittedOffsets(collection);

HenryCaiHaiying avatar Dec 22 '23 02:12 HenryCaiHaiying