[Bug]: Table already exist when writing using Spark
What happened?
When trying to use Spark to ingest data into Iceberg tables using createOrReplace. An AlreadyExistException is thrown even if the table doesn't exist before the call of this method.
Affects Versions
0.8.0-incubating
What table formats are you seeing the problem on?
Iceberg
What engines are you seeing the problem on?
Spark
How to reproduce
Doing the following writing using Spark:
dataset.writeTo(collectionName)
.createOrReplace();
This command send twice a POST request: 1st to start the transaction and second to commit it.
Right now, we workaround the issue by retrying the createOrReplace() operation multiples times. We are using Spark 3.5.2 with Iceberg bundle v1.7.2 (also tried with v1.6.1 with same result).
Relevant log output
25/05/18 19:40:14 ERROR Utils: Aborting task
org.apache.iceberg.exceptions.AlreadyExistsException: Requirement failed: table already exists
at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:115)
at org.apache.iceberg.rest.ErrorHandlers$TableErrorHandler.accept(ErrorHandlers.java:102)
at org.apache.iceberg.rest.HTTPClient.throwFailure(HTTPClient.java:211)
at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:323)
at org.apache.iceberg.rest.HTTPClient.execute(HTTPClient.java:262)
at org.apache.iceberg.rest.HTTPClient.post(HTTPClient.java:368)
at org.apache.iceberg.rest.RESTClient.post(RESTClient.java:112)
at org.apache.iceberg.rest.RESTTableOperations.commit(RESTTableOperations.java:152)
at org.apache.iceberg.BaseTransaction.commitCreateTransaction(BaseTransaction.java:327)
at org.apache.iceberg.BaseTransaction.commitTransaction(BaseTransaction.java:306)
at org.apache.iceberg.CommitCallbackTransaction.commitTransaction(CommitCallbackTransaction.java:126)
at org.apache.iceberg.spark.source.StagedSparkTable.commitStagedChanges(StagedSparkTable.java:34)
at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.$anonfun$writeToTable$1(WriteToDataSourceV2Exec.scala:585)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1397)
at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable(WriteToDataSourceV2Exec.scala:578)
at org.apache.spark.sql.execution.datasources.v2.V2CreateTableAsSelectBaseExec.writeToTable$(WriteToDataSourceV2Exec.scala:572)
at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.writeToTable(WriteToDataSourceV2Exec.scala:186)
at org.apache.spark.sql.execution.datasources.v2.AtomicReplaceTableAsSelectExec.run(WriteToDataSourceV2Exec.scala:221)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result$lzycompute(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.result(V2CommandExec.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2CommandExec.executeCollect(V2CommandExec.scala:49)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
at org.apache.spark.sql.DataFrameWriterV2.runCommand(DataFrameWriterV2.scala:201)
at org.apache.spark.sql.DataFrameWriterV2.internalReplace(DataFrameWriterV2.scala:213)
at org.apache.spark.sql.DataFrameWriterV2.createOrReplace(DataFrameWriterV2.scala:135)
at com.soprabanking.dxp.pure.bf.db.iceberg.IcebergDataSource.createReplaceWithRetry(IcebergDataSource.java:97)
at com.soprabanking.dxp.pure.bf.db.iceberg.IcebergDataSource.saveDataset(IcebergDataSource.java:75)
at com.soprabanking.dxp.pure.integrate.data.step.ResultManager.storeGoodData(ResultManager.java:234)
at com.soprabanking.dxp.pure.integrate.data.step.ResultManager.doStoreData(ResultManager.java:189)
at com.soprabanking.dxp.pure.integrate.data.step.ResultManager.storeData(ResultManager.java:172)
at com.soprabanking.dxp.pure.integrate.data.dataset.management.IntegrateDataDatasetManagement.storeData(IntegrateDataDatasetManagement.java:115)
at java.base/java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)
at org.mockito.internal.util.reflection.InstrumentationMemberAccessor$Dispatcher$ByteBuddy$3YA8MpMo.invokeWithArguments(Unknown Source)
at org.mockito.internal.util.reflection.InstrumentationMemberAccessor.invoke(InstrumentationMemberAccessor.java:251)
at org.mockito.internal.util.reflection.ModuleMemberAccessor.invoke(ModuleMemberAccessor.java:55)
at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.tryInvoke(MockMethodAdvice.java:314)
at org.mockito.internal.creation.bytebuddy.MockMethodAdvice$RealMethodCall.invoke(MockMethodAdvice.java:234)
at org.mockito.internal.invocation.InterceptedInvocation.callRealMethod(InterceptedInvocation.java:142)
at org.mockito.internal.stubbing.answers.CallsRealMethods.answer(CallsRealMethods.java:45)
at org.mockito.Answers.answer(Answers.java:90)
at org.mockito.internal.handler.MockHandlerImpl.handle(MockHandlerImpl.java:111)
at org.mockito.internal.handler.NullResultGuardian.handle(NullResultGuardian.java:29)
at org.mockito.internal.handler.InvocationNotifierHandler.handle(InvocationNotifierHandler.java:34)
at org.mockito.internal.creation.bytebuddy.MockMethodInterceptor.doIntercept(MockMethodInterceptor.java:82)
at org.mockito.internal.creation.bytebuddy.MockMethodAdvice.handle(MockMethodAdvice.java:134)
at com.soprabanking.dxp.pure.integrate.data.dataset.management.IntegrateDataDatasetManagement.storeData(IntegrateDataDatasetManagement.java:115)
at com.soprabanking.dxp.pure.integrate.data.steps.StoreDataStep.lambda$storeResultInPrimaryDB$0(StoreDataStep.java:110)
at java.base/java.util.HashMap$Values.forEach(HashMap.java:1065)
at com.soprabanking.dxp.pure.integrate.data.steps.StoreDataStep.storeResultInPrimaryDB(StoreDataStep.java:108)
at com.soprabanking.dxp.pure.integrate.data.steps.StoreDataStep.execute(StoreDataStep.java:76)
at com.soprabanking.dxp.pure.integrate.data.steps.StoreDataStep.execute(StoreDataStep.java:34)
at com.soprabanking.dxp.pure.batch.core.AbstractBatchInstance.executeSteps(AbstractBatchInstance.java:174)
at com.soprabanking.dxp.pure.batch.core.AbstractBatchInstance.execute(AbstractBatchInstance.java:96)
at com.soprabanking.dxp.pure.integrate.data.it.IntegrateDataJobIT.execute_shouldSucceed_whenJobIsLaunchedWithCustomerMappingRules(IntegrateDataJobIT.java:317)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:767)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:156)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:147)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:86)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(InterceptingExecutableInvoker.java:103)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:86)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$8(TestMethodTestDescriptor.java:217)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:213)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:138)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:68)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:156)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:160)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:146)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:144)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:100)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:107)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.processAllTestClasses(JUnitPlatformTestClassProcessor.java:124)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor$CollectAllTestClassesExecutor.access$000(JUnitPlatformTestClassProcessor.java:99)
at org.gradle.api.internal.tasks.testing.junitplatform.JUnitPlatformTestClassProcessor.stop(JUnitPlatformTestClassProcessor.java:94)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.stop(SuiteTestClassProcessor.java:63)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:92)
at jdk.proxy2/jdk.proxy2.$Proxy6.stop(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker$3.run(TestWorker.java:200)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.executeAndMaintainThreadName(TestWorker.java:132)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:103)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.execute(TestWorker.java:63)
at org.gradle.process.internal.worker.child.ActionExecutionWorker.execute(ActionExecutionWorker.java:56)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:121)
at org.gradle.process.internal.worker.child.SystemApplicationClassLoaderWorker.call(SystemApplicationClassLoaderWorker.java:71)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.run(GradleWorkerMain.java:69)
at worker.org.gradle.process.internal.worker.GradleWorkerMain.main(GradleWorkerMain.java:74)
Anything else
No response
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Can you provide more details on how to create a sprak session on the client side and what the logs are in the REST catalog service?
We are using the following configuration for SparkSession:
SparkSession.builder()
.master("local[*]"))
.config("spark.app.id", appId)
.config("spark.driver.memory", "4g")
.config("spark.executor.memory", "4g")
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
.config("spark.sql.catalog.iceberg", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg.type", "rest")
.config("spark.sql.catalog.iceberg.uri", "http://localhost:1630/api/iceberg/rest")
.config("spark.sql.catalog.iceberg.warehouse", "iceberg").getOrCreate()
There is no error only INFO message:
2025-05-24 20:57:03,368 INFO [thrift-server-optimize-manager-0] [org.apache.amoro.server.optimizing.SchedulingPolicy] [] - Using sorter instance org.apache.amoro.server.optimizing.sorter.QuotaOccupySorter corresponding to the scheduling policy quota
2025-05-24 20:57:05,616 INFO [async-table-runtime-refresh-executor-5] [org.apache.amoro.utils.CatalogUtil] [] - TableMetaStore use auth config in catalog meta, authType is custom
2025-05-24 20:57:05,616 INFO [async-table-runtime-refresh-executor-5] [org.apache.amoro.table.TableMetaStore] [] - Build table meta store with configurations: authMethod:null, hadoopUsername:null, krbPrincipal:null
2025-05-24 20:57:05,616 INFO [async-table-runtime-refresh-executor-5] [org.apache.iceberg.CatalogUtil] [] - Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
2025-05-24 20:57:05,616 INFO [async-table-runtime-refresh-executor-5] [org.apache.amoro.utils.CatalogUtil] [] - TableMetaStore use auth config in catalog meta, authType is custom
2025-05-24 20:57:05,616 INFO [async-table-runtime-refresh-executor-5] [org.apache.amoro.table.TableMetaStore] [] - Build table meta store with configurations: authMethod:null, hadoopUsername:null, krbPrincipal:null
2025-05-24 20:57:08,905 INFO [async-table-runtime-refresh-executor-0] [org.apache.amoro.utils.CatalogUtil] [] - TableMetaStore use auth config in catalog meta, authType is custom
2025-05-24 20:57:08,905 INFO [async-table-runtime-refresh-executor-0] [org.apache.amoro.table.TableMetaStore] [] - Build table meta store with configurations: authMethod:null, hadoopUsername:null, krbPrincipal:null
2025-05-24 20:57:08,905 INFO [async-table-runtime-refresh-executor-0] [org.apache.iceberg.CatalogUtil] [] - Loading custom FileIO implementation: org.apache.iceberg.aws.s3.S3FileIO
2025-05-24 20:57:08,905 INFO [async-table-runtime-refresh-executor-0] [org.apache.amoro.utils.CatalogUtil] [] - TableMetaStore use auth config in catalog meta, authType is custom
2025-05-24 20:57:08,905 INFO [async-table-runtime-refresh-executor-0] [org.apache.amoro.table.TableMetaStore] [] - Build table meta store with configurations: authMethod:null, hadoopUsername:null, krbPrincipal:null
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.