confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

OffsetsForTimes giving Local: Unknown partition

Open mhowlett opened this issue 4 years ago • 15 comments

Description

It has been reported that Consumer.OffsetsForTimes throws an exception "Local: Unknown partition" intermittently for partitions that exist, and that were not recently created.

mhowlett avatar Jan 25 '21 20:01 mhowlett

We have the same issue and can reproduce with version 1.5.3. We downgraded on all our clients to version 1.5.2 and the error does not occur. Do you have any plans when this problem will be solved ? Thank you.

ericrameil avatar Feb 11 '21 14:02 ericrameil

I believe we fixed some issues with leader lookups in librdkafka v1.6.0: https://github.com/edenhill/librdkafka/commit/5514d2de87d41a6283b92fab32b964908dedae02

edenhill avatar Feb 11 '21 15:02 edenhill

We tested again now and the is it working with Confluent.Kafka version 1.5.3 and librdkafka version 1.5.3, but not with librdkafka 1.6.0. After our testing the problem have to be in librdkafka version 1.6.0.

ericrameil avatar Feb 12 '21 09:02 ericrameil

Can you reproduce with Debug: "all" set and provide us the logs?

edenhill avatar Feb 12 '21 10:02 edenhill

Here the log with the method QueryWatermarkOffsets, I am sorry but does not use OffsetsForTimes on the client anymore. But we had the same error some days ago with both methods. In the logfile you can see two times the error and one time it does work without an error. kafkalog3.txt

ericrameil avatar Feb 12 '21 14:02 ericrameil

There's alot going on in that log and it's hard to make out where you are calling which API. Could you add a printout where you call QueryWatermarkOffsets and for which topic+partitions, and add a second printout with the results / error from that same call?

edenhill avatar Feb 12 '21 15:02 edenhill

Attached the code and error and debug log, the error occurs after the the assign of the consumer:

ErrorMessageCSharp

log3.txt

` var conf = new ConsumerConfig();

            if (this.Fertigungsauftrag != null && this.Fertigungsauftrag.Maschine != null && this.Fertigungsauftrag.CancelToken != null)
            {
                if (this.Fertigungsauftrag.Maschine.KafkaCluster != null && this.Fertigungsauftrag.Maschine.KafkaCluster.IsValid)
                {
                    conf = new ConsumerConfig                        
                    {
                        GroupId = "Consumer-" + Environment.MachineName,
                        BootstrapServers = this.Fertigungsauftrag.Maschine.KafkaCluster.BootstrapServer,
                        AutoOffsetReset = AutoOffsetReset.Latest,
                        SessionTimeoutMs = 30000,
                        Debug = "all"
                    };
                }
                else
                {
                    Logger.CustomLogger.Error("Fertigungsauftrag oder Maschine null in Kafka Consume");
                    return;
                }
            }


            using (var c = new ConsumerBuilder<string, string>(conf)
                .Build())
            {

                if (this.Fertigungsauftrag != null && this.Fertigungsauftrag.Maschine != null)
                {
                    var topic = Fertigungsauftrag.Maschine.ID.ToString() + "_121";
                    var topicNio = Fertigungsauftrag.Maschine.ID.ToString() + "_122";

                    List<CustomKafkaTopicOffset> list = new List<CustomKafkaTopicOffset>();
                    list.Add(new CustomKafkaTopicOffset(c.QueryWatermarkOffsets(new TopicPartition(topic, 0), new TimeSpan(0, 0, 10)), topic));


                    list.Add(new CustomKafkaTopicOffset(c.QueryWatermarkOffsets(new TopicPartition(topicNio, 0), new TimeSpan(0, 0, 10)), topicNio));

                    List<TopicPartitionOffset> offsets = new List<TopicPartitionOffset>();

                    foreach (var item in list)
                    {
                        //Offset zuweisen
                        if (item.Offset.High > 1)
                            offsets.Add(new TopicPartitionOffset(item.Topic, 0, item.Offset.High - 1));
                        else
                            offsets.Add(new TopicPartitionOffset(item.Topic, 0, 0));
                    }

                    c.Assign(offsets);

`

ericrameil avatar Feb 12 '21 20:02 ericrameil

Anything new ?

ericrameil avatar Mar 01 '21 13:03 ericrameil

I got similar problems though in a bit more random way. https://github.com/confluentinc/confluent-kafka-python/issues/1043

Rolling back to version 1.5.0 seems to fix the issue.

qiuwei avatar Mar 03 '21 06:03 qiuwei

I have the same issue. Confluent.Kafka 1.6.3, Kafka 2.7.0. Call to QueryWatermarkOffsets() periodically throws an exception "Local: Partition unknown". It happens just after the consumer is created, I don't assign or subscribe the consumer before calling QueryWatermarkOffsets(). It's hard to reproduce this error deliberately, but it happens sometimes.

SergeSavel avatar Apr 04 '21 18:04 SergeSavel

This is fixed and will be available in the next release (~3w). As a workaround, call QueryWatermarkOffsets again with the same arguments.

edenhill avatar Apr 06 '21 13:04 edenhill

This is fixed and will be available in the next release (~3w). As a workaround, call QueryWatermarkOffsets again with the same arguments.

Hey there, has a fix for this been released or is this issue actually still open?

a-elsheikh avatar Aug 06 '21 11:08 a-elsheikh

Should be fixed in v1.7.0

edenhill avatar Aug 09 '21 06:08 edenhill

Should be fixed in v1.7.0

I dont think so

default-work avatar Oct 20 '21 03:10 default-work

Hello, we still have this error today :

│ sentry-post-process-forward-transactions-756dc65f9b-lhwtr cimpl.KafkaException: KafkaError{code=_UNKNOWN_PARTITION,val=-190,str="Failed to get watermark offsets: Local: Unknown partition"}                 

Is there a workaround ?

kopax-polyconseil avatar May 13 '24 13:05 kopax-polyconseil