snowflake-kafka-connector
snowflake-kafka-connector copied to clipboard
No way to set connection properties, connector fails with "Authentication token has expired. The user must authenticate again."
Snowflake connector does not accept connection properties in snowflake URL: https://github.com/snowflakedb/snowflake-kafka-connector/blob/4d358495ea172244ede28c42c39fb20b559f1fad/src/main/java/com/snowflake/kafka/connector/internal/SnowflakeURL.java#L53
We need to be able to pass CLIENT_SESSION_KEEP_ALIVE set to true. Without this settings connector fails with (after connection is idle for a few hours):
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:611)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: com.snowflake.kafka.connector.internal.SnowflakeKafkaConnectorException:
[SF_KAFKA_CONNECTOR] Exception: Failed to upload file with cache
[SF_KAFKA_CONNECTOR] Error Code: 2011
[SF_KAFKA_CONNECTOR] Detail: Failed to upload file to Snowflake Stage though credential caching
[SF_KAFKA_CONNECTOR] Message:
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Exception: Max retry exceeded
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Error Code: 2010
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Detail: Api retry exceeded the max retry limit
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Message:
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Exception: Failed to execute cached put
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Error Code: 5018
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Detail: Error in cached put command
[SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] [SF_KAFKA_CONNECTOR] Message: Authentication token has expired. The user must authenticate again.
Similar issue happened to me as well.
Would it make sense to set CLIENT_SESSION_KEEP_ALIVE = true
for the connector user?
@amkartashov which version of connector is this? I couldn't find the message of "Authentication token has expired. The user must authenticate again." in the latest code
@sfc-gh-tzhang For me, it happened with version 1.6.9
@sfc-gh-tzhang which is the recommended version to use according to the documentation: https://docs.snowflake.com/en/user-guide/kafka-connector-install.html#installing-the-connector
@amkartashov which version of connector is this? I couldn't find the message of "Authentication token has expired. The user must authenticate again." in the latest code
@sfc-gh-tzhang it's 1.8.0
Error message comes from snowflake jdbc driver, and they suggest to set session parameter to fix this, but there is no way to do this with kafka connector because it accepts bare url only, w/o any parameters. See https://github.com/snowflakedb/snowflake-jdbc/issues/182 f.e.
Thanks, looks like the error is actually from Snowflake server side instead of JDBC. For the fix, we could support reading it from the config file and instruct this code to accept it as a property.
@sfc-gh-tzhang
I see below possible options:
- additional options in connector configuration file, smth like
snowflake.connection.properties
- same place where we havesnowflake.url.name
- env vars
- JVM system properties
Current w/a from snowflake support is to set this setting on snowflake side: ALTER USER [user_name] SET CLIENT_SESSION_KEEP_ALIVE = true