amoro icon indicating copy to clipboard operation
amoro copied to clipboard

[Bug]: Flink Using Unified Catalog to read/write iceberg table will encounter Kerberos Problem

Open lklhdu opened this issue 1 year ago • 0 comments

What happened?

When reading or writing to the iceberg table via the Unified Catalog, the kerberos authentication fails, causing the job to fail.

Affects Versions

master

What table formats are you seeing the problem on?

Iceberg

What engines are you seeing the problem on?

Flink

How to reproduce

  1. start Flink SQL Client
  2. create Flink Unified Catalog
 create catalog arctic_catalog with(
     'type'='unified',
     'metastore.url' = 'thrift://{ams-address}:18070/unified_catalog'
 );
  1. create print table
create table printsink(
     id int,
     name string,
     dt string
) with (
     'connector' = 'print'
);
  1. read the iceberg table and write to print table
insert into printsink select * from arctic_catalog.iceberg_test.iceberg_by_uc_spark /*+ OPTIONS('streaming'='true','table.format'='ICEBERG') */;

Relevant log output

javax.security.sasl.SaslException: GSS initiate failed
	at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:220) ~[?:1.8.0_332]
	at org.apache.hadoop.security.SaslRpcClient.saslConnect(SaslRpcClient.java:407) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client$Connection.setupSaslConnection(Client.java:629) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client$Connection.access$2200(Client.java:423) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:833) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client$Connection$2.run(Client.java:829) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_332]
	at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_332]
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1938) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client$Connection.setupIOstreams(Client.java:828) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client$Connection.access$3700(Client.java:423) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client.getConnection(Client.java:1621) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client.call(Client.java:1450) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.Client.call(Client.java:1403) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:230) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:118) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at com.sun.proxy.$Proxy42.getBlockLocations(Unknown Source) ~[?:?]
	at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:263) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_332]
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_332]
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_332]
	at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_332]
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:433) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeMethod(RetryInvocationHandler.java:166) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invoke(RetryInvocationHandler.java:158) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.io.retry.RetryInvocationHandler$Call.invokeOnce(RetryInvocationHandler.java:96) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:362) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at com.sun.proxy.$Proxy43.getBlockLocations(Unknown Source) ~[?:?]
	at org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:870) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:859) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:848) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:348) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:307) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:292) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1087) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:327) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DistributedFileSystem$4.doCall(DistributedFileSystem.java:324) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:324) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:920) ~[flink-shaded-hadoop-2-uber-2.10.2-10.0.jar:2.10.2-10.0]
	at org.apache.amoro.shade.org.apache.iceberg.hadoop.HadoopInputFile.newStream(HadoopInputFile.java:183) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:272) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.TableMetadataParser.read(TableMetadataParser.java:266) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$0(BaseMetastoreTableOperations.java:189) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreTableOperations.lambda$refreshFromMetadataLocation$1(BaseMetastoreTableOperations.java:208) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.util.Tasks$Builder.runTaskWithRetry(Tasks.java:413) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.util.Tasks$Builder.runSingleThreaded(Tasks.java:219) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:203) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.util.Tasks$Builder.run(Tasks.java:196) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:208) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:185) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreTableOperations.refreshFromMetadataLocation(BaseMetastoreTableOperations.java:180) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.hive.HiveTableOperations.doRefresh(HiveTableOperations.java:178) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreTableOperations.refresh(BaseMetastoreTableOperations.java:97) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreTableOperations.current(BaseMetastoreTableOperations.java:80) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.BaseMetastoreCatalog.loadTable(BaseMetastoreCatalog.java:47) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.com.github.benmanes.caffeine.cache.BoundedLocalCache.lambda$doComputeIfAbsent$14(BoundedLocalCache.java:2406) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at java.util.concurrent.ConcurrentHashMap.compute(ConcurrentHashMap.java:1853) ~[?:1.8.0_332]
	at org.apache.amoro.shade.com.github.benmanes.caffeine.cache.BoundedLocalCache.doComputeIfAbsent(BoundedLocalCache.java:2404) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.com.github.benmanes.caffeine.cache.BoundedLocalCache.computeIfAbsent(BoundedLocalCache.java:2387) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.com.github.benmanes.caffeine.cache.LocalCache.computeIfAbsent(LocalCache.java:108) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.com.github.benmanes.caffeine.cache.LocalManualCache.get(LocalManualCache.java:62) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.CachingCatalog.loadTable(CachingCatalog.java:166) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.flink.FlinkCatalog.loadIcebergTable(FlinkCatalog.java:339) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.flink.FlinkCatalog.getTable(FlinkCatalog.java:333) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.shade.org.apache.iceberg.flink.FlinkCatalog.getTable(FlinkCatalog.java:93) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.amoro.flink.catalog.FlinkUnifiedCatalog.getTable(FlinkUnifiedCatalog.java:182) ~[amoro-mixed-format-flink-runtime-1.17-0.7-SNAPSHOT.jar:0.7-SNAPSHOT]
	at org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:364) ~[flink-table-api-java-uber-1.17.1.jar:1.17.1]
	at org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:73) ~[?:?]
	at org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:126) ~[?:?]
	at org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:295) ~[?:?]
	at org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:145) ~[?:?]
	at org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:101) ~[?:?]
	at org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:214) ~[?:?]
	at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:116) ~[?:?]
	at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:188) ~[?:?]
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1042) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1017) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3287) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3272) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3558) ~[?:?]
	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:64) ~[?:?]
	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:89) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1042) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1017) ~[?:?]
	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:247) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:992) ~[?:?]
	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:741) ~[?:?]
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:183) ~[?:?]
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) ~[?:?]
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validateRichSqlInsert(FlinkPlannerImpl.scala:292) ~[?:?]
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:175) ~[?:?]
	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113) ~[?:?]
	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:281) ~[?:?]
	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) ~[?:?]
	at org.apache.flink.table.gateway.service.operation.OperationExecutor.executeStatement(OperationExecutor.java:186) ~[flink-sql-gateway-1.17.1.jar:1.17.1]
	at org.apache.flink.table.gateway.service.SqlGatewayServiceImpl.lambda$executeStatement$1(SqlGatewayServiceImpl.java:212) ~[flink-sql-gateway-1.17.1.jar:1.17.1]
	at org.apache.flink.table.gateway.service.operation.OperationManager.lambda$submitOperation$1(OperationManager.java:119) ~[flink-sql-gateway-1.17.1.jar:1.17.1]
	at org.apache.flink.table.gateway.service.operation.OperationManager$Operation.lambda$run$0(OperationManager.java:258) ~[flink-sql-gateway-1.17.1.jar:1.17.1]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_332]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_332]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_332]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_332]
	at java.lang.Thread.run(Thread.java:750) [?:1.8.0_332]
Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
	at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:162) ~[?:1.8.0_332]
	at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122) ~[?:1.8.0_332]
	at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:189) ~[?:1.8.0_332]
	at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224) ~[?:1.8.0_332]
	at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212) ~[?:1.8.0_332]
	at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) ~[?:1.8.0_332]
	at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:201) ~[?:1.8.0_332]
	... 107 more

[ERROR] Could not execute SQL statement. Reason:
GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)

Anything else

No response

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

lklhdu avatar Jul 10 '24 08:07 lklhdu