iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Problem with createTable

Open jessiedanwang opened this issue 3 years ago • 20 comments

Query engine

Spark shell on EMR 6.5

Question

We run spark-shell on EMR 6.5 in account B, and tried to create table in Glue catalog in account A, and we have grant cross-account access using resource policy on both Glue catalog and S3 bucket in account A.

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.0,software.amazon.awssdk:bundle:2.17.243,software.amazon.awssdk:url-connection-client:2.17.243 --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.my_catalog.warehouse=s3://my_bucket/prefix/ —conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.hadoop.hive.metastore.glue.catalogid=aaaaaa

scala> val schema = new Schema(Types.NestedField.required(1, "c1", Types.StringType.get(), "c1"));

scala> val partitionSpec = PartitionSpec.builderFor(schema).build();

scala> val icebergCatalogConfigs = Map(CatalogProperties.WAREHOUSE_LOCATION -> "s3://my_bucket/prefix/", CatalogProperties.CATALOG_IMPL -> "org.apache.iceberg.aws.glue.GlueCatalog",CatalogProperties.FILE_IO_IMPL -> "org.apache.iceberg.aws.s3.S3FileIO","glue.id" -> "aaaaaa");

scala> val gluecatalog = new GlueCatalog();

scala> gluecatalog.initialize("my_catalog",scala.collection.JavaConverters.mapAsJavaMap(icebergCatalogConfigs));

scala> val props = scala.collection.JavaConverters.mapAsJavaMap(Map("location" -> "s3://my_bucket/prefix/ns_test"));

scala> gluecatalog.createNamespace(Namespace.of("ns_test"), props);

scala> gluecatalog.createTable(TableIdentifier.of(Namespace.of("ns_test"),"table_test"), schema, partitionSpec);

Got the following error messages when trying to create table, would appreciate any advise on how to resolve the error, thanks a lot.

software.amazon.awssdk.services.glue.model.EntityNotFoundException: Database ns_test not found. (Service: Glue, Status Code: 400, Request ID: d30543c5-bf38-4217-aca0-015d74445533) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60) at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78) at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81) at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56) at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48) at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37) at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26) at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175) at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76) at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45) at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56) at software.amazon.awssdk.services.glue.DefaultGlueClient.getDatabase(DefaultGlueClient.java:5675) at org.apache.iceberg.aws.glue.GlueCatalog.defaultWarehouseLocation(GlueCatalog.java:226) at org.apache.iceberg.BaseMetastoreCatalog$BaseMetastoreCatalogTableBuilder.create(BaseMetastoreCatalog.java:167) at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:78) at org.apache.iceberg.catalog.Catalog.createTable(Catalog.java:96)

jessiedanwang avatar Aug 02 '22 02:08 jessiedanwang

can you please try once with AssumeRoleAwsClientFactory and creating cross-account IAM role. ref : https://iceberg.apache.org/docs/latest/aws/#cross-account-and-cross-region-access for more details.

singhpk234 avatar Aug 02 '22 06:08 singhpk234

thanks for getting back. I have tried cross-account IAM role, and the issue with createTable is gone, however, i am getting the following error when trying to write data to the table using df.writeTo("ns_name.xxxxxx").append()

Do i need to add inputFormat when creating table? Here is how iceberg table is created scala> val tableProperties = Map( "table_type" -> "iceberg", "format-version" -> "2", "write.distribution-mode" -> "hash", "write.spark.fanout.enabled" -> "true", "write.parquet.compression-codec" -> "snappy", "write.avro.compression-codec" -> "snappy", "write.metadata.delete-after-commit.enabled" -> "true", "write.metadata.previous-versions-max" -> "3", "write.target-file-size-bytes" -> s"$GIGABYTE" ) scala> catalog.createTable(TableIdentifier.of(Namespace.of(database), tableName), schema, partitionSpec, JavaConverters.mapAsJavaMap(tableProperties))

org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: Unable to fetch table xxxxxx. StorageDescriptor#InputFormat cannot be null for table: xxxxxx (Service: null; Status Code: 0; Error Code: null; Request ID: null; Proxy: null) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:135) at org.apache.spark.sql.hive.HiveExternalCatalog.tableExists(HiveExternalCatalog.scala:879) at org.apache.spark.sql.catalyst.catalog.ExternalCatalogWithListener.tableExists(ExternalCatalogWithListener.scala:146) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.tableExists(SessionCatalog.scala:462) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.requireTableExists(SessionCatalog.scala:197) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableRawMetadata(SessionCatalog.scala:488) at org.apache.spark.sql.catalyst.catalog.SessionCatalog.getTableMetadata(SessionCatalog.scala:474) at org.apache.spark.sql.execution.datasources.v2.V2SessionCatalog.loadTable(V2SessionCatalog.scala:65) at org.apache.spark.sql.connector.catalog.CatalogV2Util$.loadTable(CatalogV2Util.scala:282) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$lzycompute$1(Analyzer.scala:1183) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.loaded$1(Analyzer.scala:1183) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.$anonfun$lookupRelation$3(Analyzer.scala:1221) at scala.Option.orElse(Option.scala:447) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.org$apache$spark$sql$catalyst$analysis$Analyzer$ResolveRelations$$lookupRelation(Analyzer.scala:1220) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1130) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$$anonfun$apply$10.applyOrElse(Analyzer.scala:1112) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$3(AnalysisHelper.scala:90) at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:75) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUp$1(AnalysisHelper.scala:90) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:221) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:86) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:84) at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:29) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1112) at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations$.apply(Analyzer.scala:1077) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:220) at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126) at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122) at scala.collection.immutable.List.foldLeft(List.scala:89) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeBatch$1(RuleExecutor.scala:217) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$6(RuleExecutor.scala:290) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at org.apache.spark.sql.catalyst.rules.RuleExecutor$RuleExecutionContext$.withContext(RuleExecutor.scala:333) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5(RuleExecutor.scala:290) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$5$adapted(RuleExecutor.scala:280) at scala.collection.immutable.List.foreach(List.scala:392) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:280) at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:192) at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:196) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:190) at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:155) at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:183) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:183) at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:174) at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:228) at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:173) at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:73) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:163) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:163) at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:73) at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:71) at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:63) at org.apache.spark.sql.execution.QueryExecution.$anonfun$withCachedData$1(QueryExecution.scala:77) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:76) at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:87) at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:192) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:163) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:163) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:84) at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:95) at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:113) at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:110) at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:105) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:135) at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:107) at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:232) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:135) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:253) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:134) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:68) at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:196) at org.apache.spark.sql.DataFrameWriterV2.append(DataFrameWriterV2.scala:149)

jessiedanwang avatar Aug 02 '22 22:08 jessiedanwang

however, i am getting the following error when trying to write data to the table using df.writeTo("ns_name.xxxxxx").append()

I think this is happening because you haven't specified the catalog name in identifier of write to and niether made my_catalog as default ....

can you please try doing : df.writeTo("my_catalog.ns_name.xxxxxx").append() instead (ref : https://iceberg.apache.org/docs/latest/spark-writes/#appending-data)

ref : A ticket stating similar issue : https://github.com/apache/iceberg/issues/2202#issuecomment-786301938

singhpk234 avatar Aug 03 '22 09:08 singhpk234

The issue is resolved after specifying the catalog name, thanks a lot!

Now i am getting KyroException when using 'MERGE INTO'

Here is what we did, df.createOrReplaceGlobalTempView("temp_view")

MERGE INTO my_catalog.ns_name.table_name t USING (SELECT * FROM global_temp.temp_view) s ON s.id = t.id WHEN MATCHED THEN UPDATE SET * WHEN NOT MATCHED THEN INSERT *

Here is error message 22/08/03 14:29:11 WARN TaskSetManager: Lost task 8.0 in stage 15.0 (TID 1041) (xxx.xxx.xxx.xxx executor 12): java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: properties (org.apache.iceberg.aws.s3.S3FileIO) io (org.apache.iceberg.spark.source.SerializableTableWithSize) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1412) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:566) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:561) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:408) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: properties (org.apache.iceberg.aws.s3.S3FileIO) io (org.apache.iceberg.spark.source.SerializableTableWithSize) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:297) at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$unBlockifyObject$4(TorrentBroadcast.scala:336) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:338) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:257) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231) at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1405) ... 15 more Caused by: java.lang.UnsupportedOperationException at org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:714) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

jessiedanwang avatar Aug 03 '22 15:08 jessiedanwang

Caused by: java.lang.UnsupportedOperationException at org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:714)

I think this is happening because Kryo is not able to serialize / de-serialize Immutable map (this comes from properties in S3FileIO). (ref : https://groups.google.com/g/kryo-users/c/KP2FiJHFTzs).

possible work-around :

  1. use Java serializer.
  2. you can use https://github.com/magro/kryo-serializers which has classes for Immutable Map, and register via methods described here https://spark.apache.org/docs/latest/tuning.html#data-serialization. As the twitter chill library also doesn't contains this : https://github.com/twitter/chill/issues/66

Here is a sample test case in iceberg for the same, which fails in master :

  @Test
  public void testImmutableMapWithKryo() throws IOException {
    Map<String, String> dummyMap = ImmutableMap.of("x", "y");
    KryoHelpers.roundTripSerialize(dummyMap);
  }

singhpk234 avatar Aug 03 '22 18:08 singhpk234

Thanks for getting back to me quickly. I am still getting the same error after adding the following, do i miss anything here?

val sparkConf = new SparkConf().setAppName(appName).registerKryoClasses(Array(classOf[com.google.common.collect.ImmutableList[Any]], classOf[com.google.common.collect.ImmutableMap[Any, Any]]))

spark = SparkSession .builder() .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") // Iceberg related configs .config("spark.sql.autoBroadcastJoinThreshold", "-1") .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1") .config(s"spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") .config(s"spark.sql.catalog.iceberg_catalog.warehouse", warehousePath) .config(s"spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .config(s"spark.sql.catalog.iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config(s"spark.sql.catalog.iceberg_catalog.client.factory", "org.apache.iceberg.aws.AssumeRoleAwsClientFactory") .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.arn", s"arn:aws:iam::$catalogId:role/$role") .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.region", "us-east-2") .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") .config("spark.hadoop.hive.metastore.glue.catalogid", catalogId) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config(sparkConf) .enableHiveSupport() .getOrCreate()

I have also tried the following, same error as well,

spark = SparkSession .builder() .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config("spark.kryo.registrator", "xxx.xxx.CustomKryoRegistrator") ....

import com.esotericsoftware.kryo.Kryo import de.javakaffee.kryoserializers.guava.{ImmutableListSerializer, ImmutableSetSerializer, ImmutableMapSerializer} import org.apache.spark.serializer.KryoRegistrator

class CustomKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { ImmutableListSerializer.registerSerializers(kryo) ImmutableSetSerializer.registerSerializers(kryo) ImmutableMapSerializer.registerSerializers(kryo) } }

jessiedanwang avatar Aug 04 '22 02:08 jessiedanwang

I see now, why this would not work basically in iceberg we have relocated guava, from com.google.common.collect.ImmutableMap to org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap

the serializer for guava in https://github.com/magro/kryo-serializers, won't work now as they are trying to make obj of com.google.common.collect.ImmutableMap during deserialization ref CP

I was able to make the test case above pass via writing a custom serializer which on deserialization creates org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap in KryoHelper ...

import com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

    Kryo kryo = new KryoSerializer(new SparkConf()).newKryo();

    kryo.register(java.util.HashMap.class);
    final ImmutableRelocatedMapSerializer serializer = new ImmutableRelocatedMapSerializer();
    kryo.register(obj.getClass(), serializer);

  public static class ImmutableRelocatedMapSerializer extends
      Serializer<ImmutableMap> {

    private static final boolean DOES_NOT_ACCEPT_NULL = true;
    private static final boolean IMMUTABLE = true;

    public ImmutableRelocatedMapSerializer() {
      super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
    }

    @Override
    public void write(Kryo kryo, Output output, ImmutableMap object) {
      kryo.writeObject(output, Maps.newHashMap(object));
    }

    @Override
    public ImmutableMap read(Kryo kryo, Input input, Class<ImmutableMap> type) {
      Map map = kryo.readObject(input, HashMap.class);
      return ImmutableMap.copyOf(map);
    }
  }

so in this case you might wanna do something like this here, to achieve kryo serialization ..

singhpk234 avatar Aug 04 '22 06:08 singhpk234

We have faced similar issues before, for ex : https://github.com/apache/iceberg/pull/549/files

I think the fix was to use to use java collection instead https://github.com/apache/iceberg/pull/546, which needs to be made at iceberg end.

will try attempting a fix for it.

cc @rdblue @kbendick @jzhuge

singhpk234 avatar Aug 04 '22 06:08 singhpk234

thanks for looking into this, do i have to wait for the fix at iceberg end? Or i can create a custom serializer for org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap using the code you provided above?

jessiedanwang avatar Aug 04 '22 12:08 jessiedanwang

i can create a custom serializer for org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap using the code you provided above

I think attempting custom serializer as temp workaround should be fair, iceberg has point releases as well, please keep an eye for 0.14.1, there are also snapshots for present master published in maven as well (not a recommended approach) once this is in.

singhpk234 avatar Aug 04 '22 15:08 singhpk234

thanks a lot, will keep an eye for 0.14.1. In the meantime, i am trying to create custom serializer as follows, but got compilation error for newHashMap, looks like it does not work with relocated immutableMap?

overloaded method value newHashMap with alternatives: [K, V](x$1: java.util.Map[_ <: K, _ <: V])java.util.HashMap[K,V] K, Vjava.util.HashMap[K,V] cannot be applied to (org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap[Any,Any])

import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.Serializer import com.esotericsoftware.kryo.io.Input import com.esotericsoftware.kryo.io.Output import com.google.common.collect.Maps import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap

class ImmutableMapSerializer(acceptNull: Boolean = true, immutable: Boolean = true) extends Serializer[ImmutableMap[Any,Any]](acceptNull, immutable) {

override def write (kryo: Kryo, output: Output, map: ImmutableMap[Any,Any]): Unit = { kryo.writeObject(output, Maps.newHashMap(map) ) }

override def read (kryo: Kryo, input: Input, dataype: Class[ImmutableMap[Any,Any]]): ImmutableMap[Any,Any] = { val map: java.util.Map[Any,Any] = kryo.readObject (input, classOf[java.util.HashMap[Any,Any]] ) return ImmutableMap.copyOf(map) } }

and also,

class CustomKryoRegistrator extends KryoRegistrator { override def registerClasses(kryo: Kryo): Unit = { val serializer = new ImmutableMapSerializer() kryo.register(classOf[java.util.HashMap[Any,Any]]) kryo.register(classOf[ImmutableMap[Any,Any]], serializer) kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap"), serializer) } }

jessiedanwang avatar Aug 04 '22 17:08 jessiedanwang

where is the source for org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap? Can not seem to find it in the apache iceberg repo?

jessiedanwang avatar Aug 05 '22 02:08 jessiedanwang

i have change it to java to work around the compilation error, but i am still seeing the same kryo serialization issue on ImmutableMap Caused by: java.lang.UnsupportedOperationException at org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:714)

Here is the code change I have made, do i miss anything?

Add spark config .config("spark.kryo.registrator", "xxx.CustomKryoRegistrator")

import com.esotericsoftware.kryo.Kryo; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.spark.serializer.KryoRegistrator;

public class CustomKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo){ final ImmutableRelocatedMapSerializer serializer = new ImmutableRelocatedMapSerializer(); kryo.register(java.util.HashMap.class); kryo.register(ImmutableMap.class, serializer); } }

import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; import com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class ImmutableRelocatedMapSerializer extends Serializer<ImmutableMap> {

private static final boolean DOES_NOT_ACCEPT_NULL = true;
private static final boolean IMMUTABLE = true;

public ImmutableRelocatedMapSerializer() {
    super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
}

@Override
public void write(Kryo kryo, Output output, ImmutableMap object) {
    kryo.writeObject(output, Maps.newHashMap(object));
}

@Override
public ImmutableMap read(Kryo kryo, Input input, Class<ImmutableMap> type) {
    java.util.Map map = kryo.readObject(input, java.util.HashMap.class);
    return ImmutableMap.copyOf(map);
}

}

I have also tried the following, did not work either

val sparkConf = new SparkConf().setAppName(appName).registerKryoClasses(Array(classOf[org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap[Any, Any]]))

spark = SparkSession .builder() .config("spark.sql.legacy.parquet.datetimeRebaseModeInWrite", "CORRECTED") .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") // Iceberg related configs .config("spark.sql.autoBroadcastJoinThreshold", "-1") .config("spark.sql.adaptive.autoBroadcastJoinThreshold", "-1") .config(s"spark.sql.catalog.iceberg_catalog", "org.apache.iceberg.spark.SparkCatalog") .config(s"spark.sql.catalog.iceberg_catalog.warehouse", warehousePath) .config(s"spark.sql.catalog.iceberg_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") .config(s"spark.sql.catalog.iceberg_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO") .config(s"spark.sql.catalog.iceberg_catalog.client.factory", "org.apache.iceberg.aws.AssumeRoleAwsClientFactory") .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.arn", s"arn:aws:iam::$catalogId:role/$role") .config(s"spark.sql.catalog.iceberg_catalog.client.assume-role.region", "us-east-2") .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") .config("spark.hadoop.hive.metastore.glue.catalogid", catalogId) .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .config(sparkConf) .enableHiveSupport() .getOrCreate()

jessiedanwang avatar Aug 05 '22 19:08 jessiedanwang

Thanks for pinging me, @singhpk234. Looks like we need to convert the properties map over to a HashMap rather than an immutable map. Good catch!

rdblue avatar Aug 05 '22 21:08 rdblue

Just wondering you have an estimation for when the changes can be released? Let me know, thanks

jessiedanwang avatar Aug 06 '22 02:08 jessiedanwang

@singhpk234 i have specified java serializer, but still getting the same kryo exception?

.config("spark.serializer", "org.apache.spark.serializer.JavaSerializer")

22/08/08 20:19:23 WARN TaskSetManager: Lost task 4.0 in stage 15.0 (TID 1037) (ip-10-158-77-88.cmhemr.indeed.net executor 4): java.io.IOException: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: properties (org.apache.iceberg.aws.s3.S3FileIO) io (org.apache.iceberg.spark.source.SerializableTableWithSize) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1412) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:226) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:103) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:566) at org.apache.iceberg.spark.source.SparkWrite$WriterFactory.createWriter(SparkWrite.java:561) at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:408) at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:360) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Caused by: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException Serialization trace: properties (org.apache.iceberg.aws.s3.S3FileIO) io (org.apache.iceberg.spark.source.SerializableTableWithSize) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543) at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813) at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:297) at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$unBlockifyObject$4(TorrentBroadcast.scala:336) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439) at org.apache.spark.broadcast.TorrentBroadcast$.unBlockifyObject(TorrentBroadcast.scala:338) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$4(TorrentBroadcast.scala:257) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$2(TorrentBroadcast.scala:231) at org.apache.spark.util.KeyLock.withLock(KeyLock.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$readBroadcastBlock$1(TorrentBroadcast.scala:226) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1405) ... 15 more

Caused by: java.lang.UnsupportedOperationException at org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap.put(ImmutableMap.java:714) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162) at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39) at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125) ... 30 more

jessiedanwang avatar Aug 08 '22 20:08 jessiedanwang

@jessiedanwang I am not sure how kryo serializer comes to play here, are your spark confs overriden somewhere else ?

Ideally if you don't specify any serializer by default i think spark uses java serializer.

Note : I tried it with both with specifying spark.serializer=org.apache.spark.serializer.JavaSerializer and without specifying any serializer and can confirm it worked for me .. here is a sample spark submit (mostly taken from your spark submit example posted above)

spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.14.0,software.amazon.awssdk:bundle:2.17.243,software.amazon.awssdk:url-connection-client:2.17.243 --conf spark.serializer=org.apache.spark.serializer.JavaSerializer  \
--conf spark.driver.memory=4g --conf spark.executor.cores=4 --conf spark.dynamicAllocation.enabled=true --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/<prefix>  \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager --conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable

Regarding kryo serializer class for name :

kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap"), serializer)

you should this class instead :

kryo.register(
          Class.forName("org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableBiMap"),
          serializer);

as this is the actual class being used... for just POC purpose in the custom serializer i wrote in comment above i just did kryo.register(obj.getClass(), serializer);

Here is a complete git diff of my changes with custom serializer using class.forName : https://gist.github.com/singhpk234/5e570b04a8f8145846e1019a820eaef7

I hope it helps !

singhpk234 avatar Aug 09 '22 06:08 singhpk234

thanks for getting back to me. I cleaned up the existing database created previously, and now JavaSerializer works. However, kryo serialization is still generating the same error even after i have changed the class name to org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableBiMap for some reason, which is puzzling. I noticed that the change you made is on spark 3.3 branch, and i am using spark 3.1.2, but i do not think spark version should make a difference, correct?

jessiedanwang avatar Aug 09 '22 17:08 jessiedanwang

but i do not think spark version should make a difference, correct?

yup, i also don't think it should make difference. Just to be double sure, I applied this patch to 3.1 directory and can confirm it worked for me.

singhpk234 avatar Aug 10 '22 09:08 singhpk234

thanks for getting back. I have tried both spark 3.1 and spark 3.2 with SingletonImmutableBiMap as follows, still same error, i guess we will have to work with JavaSerializer for now

Add spark config .config("spark.kryo.registrator", "xxx.CustomKryoRegistrator")

import com.esotericsoftware.kryo.Kryo; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.spark.serializer.KryoRegistrator;

public class CustomKryoRegistrator implements KryoRegistrator { @Override public void registerClasses(Kryo kryo){ final ImmutableRelocatedMapSerializer serializer = new ImmutableRelocatedMapSerializer(); //kryo.register(java.util.HashMap.class); try { kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableBiMap"), serializer); } catch (Exception e) { // do nothing } } }

jessiedanwang avatar Aug 10 '22 21:08 jessiedanwang

Also having issues with kryo serialization of ImmutableMap for S3FileIO. Was trying to register a customer serializer for org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableBiMap as suggested above, however I was still getting kryo exceptions. Turns out, there are other concrete implementations of ImmutableMap that could potentially fail to serialize. So I just registered a serializer with all of them and it worked. I'm not sure which one in particular it was failing on.....

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.google.common.collect.Maps;

import java.util.HashMap;
import java.util.Map;

import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;

public class ImmutableRelocatedMapSerializer extends
        Serializer<ImmutableMap> {

    private static final boolean DOES_NOT_ACCEPT_NULL = true;
    private static final boolean IMMUTABLE = true;

    public ImmutableRelocatedMapSerializer() {
        super(DOES_NOT_ACCEPT_NULL, IMMUTABLE);
    }

    @Override
    public void write(Kryo kryo, Output output, ImmutableMap object) {
        kryo.writeObject(output, Maps.newHashMap(object));
    }

    @Override
    public ImmutableMap read(Kryo kryo, Input input, Class<ImmutableMap> type) {
        Map map = kryo.readObject(input, HashMap.class);
        return ImmutableMap.copyOf(map);
    }

    public void register(Kryo kryo) {
        try {
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableBiMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.JdkBackedImmutableBiMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.SingletonImmutableBiMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.RegularImmutableMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableSortedMapFauxverideShim"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableSortedMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableEnumMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.JdkBackedImmutableMap"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableBiMapFauxverideShim"), this);
            kryo.register(Class.forName("org.apache.iceberg.relocated.com.google.common.collect.ImmutableEnumMap"), this);
        } catch (Exception ex) {
            // do nothing
        }
    }
}

calvin-pietersen avatar Aug 25 '22 02:08 calvin-pietersen

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Mar 07 '23 00:03 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Mar 24 '23 00:03 github-actions[bot]