iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Issue after migrating to Spark 3.3.0 and Iceberg 14.0

Open zsvoboda opened this issue 3 years ago • 3 comments
trafficstars

Apache Iceberg version

0.14.0 (latest release)

Query engine

Spark

Please describe the bug 🐞

I'm migrating my solution from Spark 3.2.1 and Iceberg 0.13.2 to Spark 3.3.0 and Iceberg 0.14.0. I get exception from pyspark code

spark.udf.register("sort_a", lambda x: sorted(x), ArrayType(StringType()))
    spark.udf.register("sort_zs", lambda x, k, v: [e[v] for e in sorted(x, key=lambda e: e[k])], ArrayType(StringType()))
    spark.udf.register("sort_zd", lambda x, k, v: [e[v] for e in sorted(x, key=lambda e: e[k])], ArrayType(DoubleType()))

    data = spark.sql(
    """
    create table warehouse.silver.consumption_aggregated_sorted as
    select
		consumption_date,
		user_id,
		eating_time_id,
		sort_a(food_category_ids) as food_category_ids,
		sort_zs(zipped_categories, 'food_category_ids', 'food_category_names') as food_category_names,
		sort_a(food_ids) as food_ids,
		sort_zs(zipped_names, 'food_ids', 'food_names') as food_names,
		sort_zd(zipped_units, 'food_ids', 'consumption_total_food_units') as food_units
	from warehouse.silver.consumption_zipped
    """)

Here is the exception:

py4j.protocol.Py4JJavaError: An error occurred while calling o39.sql.
: org.apache.spark.SparkException: Writing job aborted
at org.apache.spark.sql.errors.QueryExecutionErrors$.writingJobAbortedError(QueryExecutionErrors.scala:749)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:409)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2$(WriteToDataSourceV2Exec.scala:353)
at org.apache.spark.sql.execution.datasources.v2.AtomicCreateTableAsSelectExec.writeWithV2(WriteToDataSourceV2Exec.scala:108)
at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:503)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable(WriteToDataSourceV2Exec.scala:491)
at org.apache.spark.sql.execution.datasources.v2.TableWriteExecHelper.writeToTable$(WriteToDataSourceV2Exec.scala:486)
at org.apache.spark.sql.execution.datasources.v2.AtomicCreateTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:108)
at org.apache.spark.sql.execution.datasources.v2.AtomicCreateTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:131)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:622)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:617)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 0.0 failed 1 times, most recent failure: Lost task 3.0 in stage 0.0 (TID 3) (8ada6cf20eaa executor driver): java.io.UncheckedIOException: java.net.SocketException: Unexpected end of file from server
at software.amazon.awssdk.utils.FunctionalUtils.asRuntimeException(FunctionalUtils.java:180)
at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:110)
at software.amazon.awssdk.utils.FunctionalUtils.invokeSafely(FunctionalUtils.java:136)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.lambda$tryGetOutputStream$0(UrlConnectionHttpClient.java:311)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.getAndHandle100Bug(UrlConnectionHttpClient.java:345)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.tryGetOutputStream(UrlConnectionHttpClient.java:311)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:288)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:261)
at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:64)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:9325)
at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:392)
at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:253)
at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1197)
at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:240)
at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:118)
at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:142)
at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:33)
at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.close(SparkWrite.java:685)
at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.commit(SparkWrite.java:667)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:453)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:900)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:723)
at java.base/sun.net.www.protocol.http.HttpURLConnection.expect100Continue(HttpURLConnection.java:1277)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1381)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:108)
... 66 more

Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2672)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2608)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2607)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2607)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1182)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1182)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2860)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2802)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2791)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2228)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.writeWithV2(WriteToDataSourceV2Exec.scala:377)
... 50 more
Caused by: java.io.UncheckedIOException: java.net.SocketException: Unexpected end of file from server
at software.amazon.awssdk.utils.FunctionalUtils.asRuntimeException(FunctionalUtils.java:180)
at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:110)
at software.amazon.awssdk.utils.FunctionalUtils.invokeSafely(FunctionalUtils.java:136)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.lambda$tryGetOutputStream$0(UrlConnectionHttpClient.java:311)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.getAndHandle100Bug(UrlConnectionHttpClient.java:345)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.tryGetOutputStream(UrlConnectionHttpClient.java:311)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:288)
at software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient$RequestCallable.call(UrlConnectionHttpClient.java:261)
at software.amazon.awssdk.core.internal.util.MetricUtils.measureDurationUnsafe(MetricUtils.java:64)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.executeHttpRequest(MakeHttpRequestStage.java:77)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:56)
at software.amazon.awssdk.core.internal.http.pipeline.stages.MakeHttpRequestStage.execute(MakeHttpRequestStage.java:39)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
at software.amazon.awssdk.services.s3.DefaultS3Client.putObject(DefaultS3Client.java:9325)
at org.apache.iceberg.aws.s3.S3OutputStream.completeUploads(S3OutputStream.java:392)
at org.apache.iceberg.aws.s3.S3OutputStream.close(S3OutputStream.java:253)
at org.apache.iceberg.shaded.org.apache.parquet.io.DelegatingPositionOutputStream.close(DelegatingPositionOutputStream.java:38)
at org.apache.iceberg.shaded.org.apache.parquet.hadoop.ParquetFileWriter.end(ParquetFileWriter.java:1197)
at org.apache.iceberg.parquet.ParquetWriter.close(ParquetWriter.java:240)
at org.apache.iceberg.io.DataWriter.close(DataWriter.java:82)
at org.apache.iceberg.io.RollingFileWriter.closeCurrentWriter(RollingFileWriter.java:118)
at org.apache.iceberg.io.RollingFileWriter.close(RollingFileWriter.java:142)
at org.apache.iceberg.io.RollingDataWriter.close(RollingDataWriter.java:33)
at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.close(SparkWrite.java:685)
at org.apache.iceberg.spark.source.SparkWrite$UnpartitionedDataWriter.commit(SparkWrite.java:667)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.$anonfun$run$1(WriteToDataSourceV2Exec.scala:453)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538)
at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:480)
at org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec.$anonfun$writeWithV2$2(WriteToDataSourceV2Exec.scala:381)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:136)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
... 1 more
Caused by: java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:900)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:723)
at java.base/sun.net.www.protocol.http.HttpURLConnection.expect100Continue(HttpURLConnection.java:1277)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream0(HttpURLConnection.java:1381)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getOutputStream(HttpURLConnection.java:1342)
at software.amazon.awssdk.utils.FunctionalUtils.lambda$safeSupplier$4(FunctionalUtils.java:108)
... 66 more

  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 406, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py", line 66, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "/var/lib/ngods/dagster/spark.py", line 20, in normalize_food_arrays_op
    data = spark.sql(
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/session.py", line 1034, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self)
  File "/usr/local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(

zsvoboda avatar Aug 06 '22 03:08 zsvoboda

The stack trace reads like there's an S3 request timeout.

Can you provide the following infirmation?

  1. The exact Iceberg runtime jar dependency used (ensure that you're using the spark 3.3 iceberg bundle
  2. The catalog are you using (eg hadoop, hive, DynamoDB, etc).
  3. The Spark configuration used, including configuration settings for initializing the Iceberg catalog plus any non-default Spark configs applied.

Have you confirmed that this same code over the same input data works with your previous Spark 3.2 settings? And not necessarily previous runs, but this same input data? Given that there seems to be an S3 upload timeout, it would be really helpful to compare the previous setup to the new setup over the exact same dataset to be able to truly compare the two. Otherwise it's hard to be sure that the problem isn't simply input skew of your data (eg the input dataset is much larger than normal, it's much more skewed on the columns being sorted on and thus takes much more time to sort, etc).

It would also be very helpful to provide the Query plan from your old setup and from the new setup (either the output of EXPLAIN EXTENDED or a screenshot of the whole DAG for the Query from the SQL tab for both the old Spark 3.2 with Iceberg 0.13 and for the new Spark 3.3 with Iceberg 0.14).

Assuming that you're using the correct Spark iceberg runtime JAR for Spark 3.3.0, I'm wondering if maybe Spark's adaptive execution is lowering the parallelism of the write stage, which is then increasing the size of the data upload to S3 and leading to the timeout.

kbendick avatar Aug 08 '22 01:08 kbendick

To get around the issue, assuming you're using S3FileIO (which it seems like you are), you might consider increasing the number of multipart upload threads if the issue is indeed just a plain S3 upload timeout:

  • https://iceberg.apache.org/docs/latest/aws/#progressive-multipart-upload

Other things that might help would be allowing Spark to sort the arrays without using the UDFs, but instead using array_sort (and possibly using a UDF for the comparison logic, but that might not be needed and keep in mind that UDFs are always best avoided if Spark's built-in functions can achieve the same thing).

Examples of sort_array and array_sort (including without a UDF or with one but only one used for comparison and not whole array sorting): https://towardsdatascience.com/the-definitive-way-to-sort-arrays-in-spark-1224f5529961

kbendick avatar Aug 08 '22 01:08 kbendick

Based on the expanded output of the Python stack trace (I didn't scroll over), it seems like maybe one of your tasks is yield-ing empty data in your "dagster" library. That might lead to the unexpected EOF error (either writing data or reading it back into the JVM from Python - I suspect the second).

Maybe check that that yield statement isn't returning empty data when trying to coerce to an iterator or something funky?

  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/utils.py", line 47, in solid_execution_error_boundary
    yield
  File "/usr/local/lib/python3.9/site-packages/dagster/utils/__init__.py", line 406, in iterate_with_context
    next_output = next(iterator)
  File "/usr/local/lib/python3.9/site-packages/dagster/core/execution/plan/compute_generator.py", line 66, in _coerce_solid_compute_fn_to_iterator
    result = fn(context, **kwargs) if context_arg_provided else fn(**kwargs)
  File "/var/lib/ngods/dagster/spark.py", line 20, in normalize_food_arrays_op
    data = spark.sql(
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/session.py", line 1034, in sql
    return DataFrame(self._jsparkSession.sql(sqlQuery), self)
  File "/usr/local/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/usr/local/lib/python3.9/site-packages/pyspark/sql/utils.py", line 190, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.9/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(

kbendick avatar Aug 08 '22 18:08 kbendick

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Feb 05 '23 00:02 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Feb 23 '23 00:02 github-actions[bot]