confluent-kafka-dotnet icon indicating copy to clipboard operation
confluent-kafka-dotnet copied to clipboard

Infinite request loop when calling Committed

Open vchekan opened this issue 6 years ago • 2 comments

Description

I have a program which retrieves group's offsets and it works locally but would hang when deployed to azure cloud service.

There is a strange line in the logs, repeating every second: Fetch backoff for 132348351ms. This amounts to 36 hours. And there is no error before it appears the first time. once this happen, consumer falls into a loop of timing out and reopening connection over and over again.

So questions are: is Committed suppose to work for client which have not joined a group (no Subscribe or Assign has been called)? Why it works on dev boxes by falls into a loop on azure? In #575 I'e asked either it is possible to get group offsets without joining the group and was told that it is not currently. So, is usage of Committed without joining group a legitimate use case or it is non-deterministic (works sometimes)?

Program and log is below.

module Program

open System
open System.Collections.Generic
open Confluent.Kafka

let clientId = "eirik-tests"
let broker = "my-kafka-service.net:9092"
let topic = "some-topic-test-p16"
let topicPartitions = Array.init 16 (fun i -> TopicPartition(topic, i))
let groupId = "processcommand"

let logger = Log.create __SOURCE_FILE__

let mkConsumer() =
    let (=>) key value = KeyValuePair<string, obj>(key, box value)
    let config =
        [
            "bootstrap.servers" => broker
            "client.id" => clientId
            "debug" => "all"
            "group.id" => groupId
        ]

    new Consumer(config)


let getPartitionOffsets(consumer : Consumer) =
    consumer.Committed(topicPartitions, TimeSpan.FromMinutes 10.) 
    |> Seq.sortBy (fun tp -> tp.Topic, tp.TopicPartition.Partition)
    |> Seq.toArray


[<EntryPoint>]
let main _ = 
    Log.To.console "*" NLog.LogLevel.Info

    logger.info "creating consumer instance"
    let consumer = mkConsumer()
    logger.info "consumer instance created"

    logger.info "getting partition offsets %A" topicPartitions
    let offsets = getPartitionOffsets consumer // hangs here
    logger.info "partition offsets = %A" offsets

    0

Log file: https://drive.google.com/file/d/1UtG1EKtOod_x9ITfdwpuFG2BfDFKMgF4/view?usp=sharing

How to reproduce

Is 100% reproducible when running in azure service.

Checklist

Please provide the following information:

  • [ x] Confluent.Kafka nuget version: 0.11.5
  • [ x] Apache Kafka version: 2.0.0-1
  • [ x] Client configuration:
  • [ x] Operating system: windows(azure)
  • [ x] Provide logs (with "debug" : "..." as necessary in configuration)
  • [ x] Provide broker log excerpts
  • [ ] Critical issue

vchekan avatar Aug 16 '18 21:08 vchekan