kafkajs
kafkajs copied to clipboard
KafkaJSLockTimeout when producing large amount of messages at once with SASL/OAUTHBEARER
This issue is extremely close to the https://github.com/tulios/kafkajs/issues/554. The difference is that I am using SASL/OAUTHBEARER
for authentication.
Some background I am watching a directory using chokidar. When a new file arrives in the directory, it is immediately sent to Kafka. I am currently stress testing my Kafka setup. For this, I am copying some (~1000) files to the directory using a bash script. Unfortunately, this is causing errors and only a fraction of files (200-400) are ending up in Kafka.
Errors faced Below is the list of errors being thrown from KafkaJS. I have tried to add as many errors as possible. I have formatted the errors for brevity.
{"level":"ERROR",
"message": "[BrokerPool] KafkaJSLockTimeout: Timeout while acquiring lock (55 waiting locks): "connect to broker localhost:9093"",
"retryCount": 0, "retryTime": 311,
"stack": "KafkaJSLockTimeout: Timeout while acquiring lock (55 waiting locks): "connect to broker localhost:9093"
at Timeout._onTimeout (node_modules\\kafkajs\\src\\utils\\lock.js:48:23)}
{"level": "ERROR", "message": "[Connection] Connection timeout", "broker": "localhost:9093", "clientId":"test-producer"}
{"level":"ERROR",
"message":"[SASLOAuthBearerAuthenticator] SASL OAUTHBEARER authentication failed: Not connected", "broker": "localhost:9093"}
{"level":"ERROR",
"message":"[BrokerPool] KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Not connected",
"retryCount":0, "retryTime":306,
"stack": "KafkaJSSASLAuthenticationError: SASL OAUTHBEARER authentication failed: Not connected
at OAuthBearerAuthenticator.authenticate (node_modules\\kafkajs\\src\\broker\\saslAuthenticator\\oauthBearer.js:49:21)
---- snipped -----
at async sendBatch (node_modules\\kafkajs\\src\\producer\\messageProducer.js:95:12)"}
error: KafkaJSNonRetriableError
Caused by: KafkaJSLockTimeout: Timeout while acquiring
lock (107 waiting locks): "connect to broker localhost:9093"
at Timeout._onTimeout (node_modules\kafkajs\src\utils\lock.js:48:23)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7) {
name: 'KafkaJSNumberOfRetriesExceeded',
retriable: false,
helpUrl: undefined,
originalError: KafkaJSLockTimeout: Timeout while acquiring lock (107 waiting locks): "connect to broker localhost:9093"
at Timeout._onTimeout (node_modules\kafkajs\src\utils\lock.js:48:23)
at listOnTimeout (internal/timers.js:554:17)
at processTimers (internal/timers.js:497:7) {
retriable: false,
helpUrl: undefined
},
retryCount: 0,
retryTime: 291
}
{"level":"ERROR", "message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout", "retryCount":0,"retryTime":287}
{"level":"ERROR",
"message":"[Connection] Connection error: write after end",
"broker": "localhost:9093","clientId": "test-producer",
"stack":"Error [ERR_STREAM_WRITE_AFTER_END]: write after end
at Socket.Writable.write (_stream_writable.js:292:11)
at Object.sendRequest (node_modules\\kafkajs\\src\\network\\connection.js:312:27)
--- snipped ---
at sendRequest (node_modules\\kafkajs\\src\\network\\connection.js:302:14)
at async Connection.send (node_modules\\kafkajs\\src\\network\\connection.js:321:53)"}
error: KafkaJSNonRetriableError
Caused by: KafkaJSProtocolError: Request is not valid given the current SASL state
at createErrorFromCode (node_modules\kafkajs\src\protocol\error.js:581:10)
at Object.parse (node_modules\kafkajs\src\protocol\requests\metadata\v0\response.js:56:11)
at Connection.send (node_modules\kafkajs\src\network\connection.js:336:35)
--- snipped ---
at async Cluster.refreshMetadata (node_modules\kafkajs\src\cluster\index.js:134:5)
at async Cluster.findBroker (node_modules\kafkajs\src\cluster\index.js:221:9) {
name: 'KafkaJSNumberOfRetriesExceeded', retriable: false, helpUrl: undefined,
originalError: KafkaJSProtocolError: Request is not valid given the current SASL state
at createErrorFromCode (node_modules\kafkajs\src\protocol\error.js:581:10)
--- snipped --
at async Cluster.findBroker (\node_modules\kafkajs\src\cluster\index.js:221:9) {
retriable: false,
helpUrl: undefined,
type: 'ILLEGAL_SASL_STATE',
code: 34
},
retryCount: 0,
retryTime: 244
}
I have logged when the oauthBearerProvider gets called for refreshing the token. Ideally, it should be run every 50 seconds. But from the log, I saw that sometimes it was being called simultaneously,
[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080
[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080
[Thu Mar 04 2021 23:47:46 GMT+0600] - Fetching token from http://keycloak:8080
Besides, I have noticed the following errors on the Kafka broker log,
[36mbroker |[0m java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST
[36mbroker |[0m at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleApiVersionsRequest(SaslServerAuthenticator.java:562)
[36mbroker |[0m at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.handleKafkaRequest(SaslServerAuthenticator.java:499)
[36mbroker |[0m at org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate(SaslServerAuthenticator.java:259)
[36mbroker |[0m at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:177)
[36mbroker |[0m at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:547)
[36mbroker |[0m at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
[36mbroker |[0m at kafka.network.Processor.poll(SocketServer.scala:893)
[36mbroker |[0m at kafka.network.Processor.run(SocketServer.scala:792)
[36mbroker |[0m at java.lang.Thread.run(Thread.java:748)
[36mbroker |[0m [2021-02-25 10:40:11,239] WARN [SocketServer brokerId=1] Unexpected error from /172.18.0.1; closing connection (org.apache.kafka.common.network.Selector)
INFO [SocketServer brokerId=1] Failed re-authentication with /172.18.0.1 (Unexpected Kafka request of type SASL_HANDSHAKE during SASL authentication.) (org.apache.kafka.common.network.Selector)
[2021-03-03 18:14:44,782] INFO [SocketServer brokerId=1] Failed re-authentication with /172.18.0.1 (Unexpected Kafka request of type PRODUCE during SASL authentication.) (org.apache.kafka.common.network.Selector)
I think the error is happening because KafkaJS is trying to produce value during the re-authentication of the OAuth token. I have no idea how I can fix this.
For comparison, I tried the same thing with the Java client and it can send thousands of messages to the same Kafka broker without any issues.
For example, here is the debug log printed by the Java producer during re-authentication,
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to SEND_APIVERSIONS_REQUEST
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:178 - Creating SaslClient: client=null;service=kafka;serviceHostname=localhost;mechs=[OAUTHBEARER]
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to SEND_CLIENT_FIRST_MESSAGE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_PROCESS_ORIG_APIVERSIONS_RESPONSE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_SEND_HANDSHAKE_REQUEST
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_RECEIVE_HANDSHAKE_OR_OTHER_RESPONSE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to REAUTH_INITIAL
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to RECEIVE_SERVER_FIRST_MESSAGE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to INTERMEDIATE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:109 - Successfully authenticated as test-producer
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG OAuthBearerSaslClient:155 - Setting SASL/OAUTHBEARER client state to COMPLETE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:348 - Set SASL client state to COMPLETE
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG SaslClientAuthenticator:620 - Finished re-authentication with session expiration in 55385 ms and session re-authentication on or after 49950 ms
2021-03-05 01:59:54 [kafka-producer-network-thread | test-producer] DEBUG Selector:553 - [Producer clientId=test-producer] Successfully re-authenticated with localhost/127.0.0.1
Notice the last message. A similar flow happens with KafkaJS too. But the broker somehow throws the previously mentioned error
java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST
Kafka Setup
I am running Kafka in docker using the confluentinc/cp-kafka:5.5.0
image. My sample broker setup is below,
broker:
image: confluentinc/cp-kafka:5.5.0
ports:
- 29092:29092
- 9093:9093
environment:
# --- common properties --- #
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CLIENT:SASL_PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://broker:29092,CLIENT://broker:9093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,CLIENT://localhost:9093
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
## OAuth Bearer SASL
KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: OAUTHBEARER
KAFKA_SASL_ENABLED_MECHANISMS: OAUTHBEARER
KAFKA_CONNECTIONS_MAX_REAUTH_MS: 60000
Producer code
const { Kafka } = require('kafkajs');
const BROKER = 'localhost:9093';
const kafka = new Kafka({
clientId: "<client ID>",
brokers: [ "localhost:9093" ],
ssl: false,
sasl: {
mechanism: 'oauthbearer',
oauthBearerProvider: async () => {
// fetch token from keycloak. code omitted.
return {
value: token
}
}
},
})
const producer = kafka.producer();
producer.connect()
.then((value) => console.log("Producer connected"))
.catch((err) => console.error("Failed to connect", err));
const sendDataToTopic = async (data, topic) => {
return producer.send({
topic: topic,
messages: data
})
.then((resp) => {
console.log('producerData: ', resp);
})
.catch((err) => {
console.error('error: ', err);
})
}
module.exports = {
sendDataToTopic,
}
To Reproduce
Call the sendDataToTopic
method in a loop (~1000 iterations)
Expected behavior All the messages are sent to Kafka and connection reauthentication happens automatically.
Environment:
- OS [Windows 10]
- KafkaJS version [1.15.0]
- Kafka version [2.5 (using
confluentinc/cp-kafka:5.5.0
image in docker)] - NodeJS version [14.15.0]
I was facing similar issues while load testing, did you try producing in batch? It has helped me to avoid producing it multiple times.
@sp-suresh yes, I tried producing in batch. Still faced the same issue :( Are you using OAUTHBEARER too?
No, I am not using OAUTHBEARER. Also, worth mentioning that connectionTimeout: 4000
and authenticationTimeout: 4000
has been a relief too.
https://github.com/tulios/kafkajs/issues/554#issuecomment-568168042
Thanks for the info. I went through that issue multiple times and tried different configs without any success. I'll try again with your suggestions to see if the situation improves.
I have exactly the same issue when trying to push data coming from websocket to kafka cluster.
imeout while acquiring lock (712 waiting locks): "updating target topics" {"name":"KafkaJSNumberOfRetriesExceeded","retriable":false,"originalError":{"name":"KafkaJSLockTimeout","retriable":false},"retryCount":0,"retryTime":281,"stack":"KafkaJSNonRetriableError\n Caused by: KafkaJSLockTimeout: Timeout while acquiring lock (712 waiting locks): "updating target topics"\n at Timeout._onTimeout (F:\Environement\git-repo\crypto-feeds-api\node_modules\kafkajs\src\utils\lock.js:48:23)\n at listOnTimeout (internal/timers.js:549:17)\n at processTimers (internal/timers.js:492:7)"}
@j-a-h-i-r do you have updates for this issue?
@safonovklim Unfortunately no. I moved on to other tasks and haven't revisited this one. Are you also using SASL/OAUTHBEARER?
Regarding the timeout issue @kzay reported above, like: KafkaJSLockTimeout: Timeout while acquiring lock (2162 waiting locks): "updating target topics"
It looks like this specific lock timeout error is coming from the 'mutatingTargetTopics' lock in 'cluster' module. Each message publish takes and releases this lock before the actual publish. And with a high publish parallelism, too many pending contexts cause the lock to be slower (~few thousands). Increasing the requestTimeout can avoid these errors, but the lock is not really needed, at least for 99% of the calls. Here is a PR that can avoid the lock when possible: https://github.com/tulios/kafkajs/pull/1185
Any update on this? We're also running into the same java.lang.IllegalStateException: Unexpected ApiVersions request received during SASL authentication state HANDSHAKE_REQUEST
every 60 seconds on all brokers