Bug: Connecting multiple consumers / producer with "sasl: oauthbearer" in Promise.all fails with "Error Code -172: Local: Erroneous state"
Environment Information
- OS [e.g. Mac, Arch, Windows 10]: Linux on arm64
- Node Version [e.g. 8.2.1]: 20
- NPM Version [e.g. 5.4.2]:
[email protected] - C++ Toolchain [e.g. Visual Studio, llvm, g++]: n/a
- confluent-kafka-javascript version [e.g. 2.3.3]: 1.2.0
Steps to Reproduce
- Create multiple instances of a
KafkaJS.ProducerorKafkaJS.Consumer, with the following configuration in theKafkaJS.Kafkaclient:
{
kafkaJS: {
brokers: ["my_aws_brokers:9098"], // config value
clientId: "singleton-client",
logger: new KafkaCustomLogger(),
logLevel: logLevel["WARN"], // config value
retry: {
maxRetryTime: 30_000, // config value
retries: 500, // config value
},
ssl: true,
sasl: {
mechanism: "oauthbearer" as const,
oauthBearerProvider: async () => {
const authTokenResponse = await generateAuthToken(); // from aws-msk-iam-sasl-signer-js
return {
value: authTokenResponse.token,
lifetime: authTokenResponse.expiryTime,
principal: "",
};
}
}
}
- Note: The same thing happens with multiple instances of a
KafkaJS.Kafkaclient, or when there is a single client.
- Call
await consumer.connect()orawait producer.connect()for each of these instances in aPromise.all - The following errors appear:
KafkaJSConnectionError: broker transport failure, associated type isERR__TRANSPORT, error code-195Local: Erroneous state, error code-172[thrd:sasl_ssl://{my_aws_broker}]: sasl_ssl://b-3.{my_aws_broker}:9098/bootstrap: SASL authentication error: SaslAuthenticateRequest failed: Local: Broker handle destroyed (after 0ms in state DOWN)
confluent-kafka-javascript Configuration Settings Shared above
Additional context
I believe this is related to the oauthbearer mechanism specifically, as I am able to successfully connect using an unauthenticated connection in a Promise.all. However, our production clusters utilize IAM auth, so this wouldn't work for us there.
Note that I am able to successfully connect using oauthbearer if I connect in a for loop, like:
for (const kafkaClient of kafkaClients) {
await kafkaClient.consumer(consumerOptions).connect();
await kafkaClient.producer(producerOptions).connect();
}
We will likely move forward using this pattern, but it's an unexpected result that a Promise.all would cause this issue.
For additional context, we are using Promise.all as we are initializing a set of different Nest applications simultaneously. The consumer and producer connections occur in the initialization of each of these independent applications.
Thanks for filing this issue. I can't yet reproduce it locally, but we're seeing something like this in our CI intermittently. I'll investigate it further.
Started noticing this recently as well. Running the components in a loop ~with a 0ms delay~ seems to workaround but there definitely seems like some kind of shared state issue or something?
So, I was able to get around this.
I discovered that our logic for generating the auth token was requesting an initial set of temporary AWS creds, and then immediately requesting new AWS creds during token generation. This I suspect was hitting some kind of limit/being throttled/erroring on the AWS side. Once that was fixed and existing valid creds were re-used, this no longer was an issue.
This could totally be a red herring however, and there could still be some kind of concurrency bug. However, I just wanted to provide that feedback, in case someone comes across this with a potentially similar scenario.
@nhr-one , if you can reproduce this issue consistently, could you turn on debug logs and attach them to the ticket? While I can sometimes reproduce this on CI or locally, it doesn't show up when I turn the debug logs on.
For the debug logs, the config has to be such. It makes the logging much more verbose:
{
kafkaJS: {
// ....
},
'debug': 'all',
}