confluent-kafka-javascript icon indicating copy to clipboard operation
confluent-kafka-javascript copied to clipboard

Bug: Connecting multiple consumers / producer with "sasl: oauthbearer" in Promise.all fails with "Error Code -172: Local: Erroneous state"

Open nhr-one opened this issue 10 months ago • 4 comments

Environment Information

  • OS [e.g. Mac, Arch, Windows 10]: Linux on arm64
  • Node Version [e.g. 8.2.1]: 20
  • NPM Version [e.g. 5.4.2]: [email protected]
  • C++ Toolchain [e.g. Visual Studio, llvm, g++]: n/a
  • confluent-kafka-javascript version [e.g. 2.3.3]: 1.2.0

Steps to Reproduce

  1. Create multiple instances of a KafkaJS.Producer or KafkaJS.Consumer, with the following configuration in the KafkaJS.Kafka client:
{  
  kafkaJS:  { 
    brokers: ["my_aws_brokers:9098"], // config value
    clientId: "singleton-client",
    logger: new KafkaCustomLogger(),
    logLevel: logLevel["WARN"], // config value
    retry: {
      maxRetryTime: 30_000, // config value
      retries: 500, // config value
    },
    ssl: true,
    sasl: {
      mechanism: "oauthbearer" as const,
      oauthBearerProvider: async () => {
        const authTokenResponse = await generateAuthToken(); // from aws-msk-iam-sasl-signer-js
        return {
          value: authTokenResponse.token,
          lifetime: authTokenResponse.expiryTime,
          principal: "",
        };
      }
  }
}
  • Note: The same thing happens with multiple instances of a KafkaJS.Kafka client, or when there is a single client.
  1. Call await consumer.connect() or await producer.connect() for each of these instances in a Promise.all
  2. The following errors appear:
  • KafkaJSConnectionError: broker transport failure, associated type is ERR__TRANSPORT, error code -195
  • Local: Erroneous state, error code -172
  • [thrd:sasl_ssl://{my_aws_broker}]: sasl_ssl://b-3.{my_aws_broker}:9098/bootstrap: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker handle destroyed (after 0ms in state DOWN)

confluent-kafka-javascript Configuration Settings Shared above

Additional context

I believe this is related to the oauthbearer mechanism specifically, as I am able to successfully connect using an unauthenticated connection in a Promise.all. However, our production clusters utilize IAM auth, so this wouldn't work for us there.

Note that I am able to successfully connect using oauthbearer if I connect in a for loop, like:

for (const kafkaClient of kafkaClients) {
  await kafkaClient.consumer(consumerOptions).connect();
  await kafkaClient.producer(producerOptions).connect();
}

We will likely move forward using this pattern, but it's an unexpected result that a Promise.all would cause this issue.

For additional context, we are using Promise.all as we are initializing a set of different Nest applications simultaneously. The consumer and producer connections occur in the initialization of each of these independent applications.

nhr-one avatar Feb 06 '25 18:02 nhr-one

Thanks for filing this issue. I can't yet reproduce it locally, but we're seeing something like this in our CI intermittently. I'll investigate it further.

milindl avatar Feb 17 '25 12:02 milindl

Started noticing this recently as well. Running the components in a loop ~with a 0ms delay~ seems to workaround but there definitely seems like some kind of shared state issue or something?

snypelife avatar Mar 07 '25 21:03 snypelife

So, I was able to get around this.

I discovered that our logic for generating the auth token was requesting an initial set of temporary AWS creds, and then immediately requesting new AWS creds during token generation. This I suspect was hitting some kind of limit/being throttled/erroring on the AWS side. Once that was fixed and existing valid creds were re-used, this no longer was an issue.

This could totally be a red herring however, and there could still be some kind of concurrency bug. However, I just wanted to provide that feedback, in case someone comes across this with a potentially similar scenario.

snypelife avatar Mar 14 '25 18:03 snypelife

@nhr-one , if you can reproduce this issue consistently, could you turn on debug logs and attach them to the ticket? While I can sometimes reproduce this on CI or locally, it doesn't show up when I turn the debug logs on.

For the debug logs, the config has to be such. It makes the logging much more verbose:

{  
  kafkaJS:  { 
  // ....
  },
  'debug': 'all',
}

milindl avatar Jul 08 '25 08:07 milindl