delta
delta copied to clipboard
[BUG][Spark] Error when reading from the same Delta table as the MERGE INTO output in Delta Lake 3.2.0
Bug
Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Describe the problem
When trying to execute a MERGE INTO command that reads (inside the USING clause) from the same Delta table as the command output, the error org.apache.spark.sql.AnalysisException: Table does not support reads... is thrown in Delta Lake version 3.2.0. It's important to mention that this error was not occurring in Delta Lake 3.0.0 and below versions.
Steps to reproduce
import org.apache.spark.sql.SparkSession
import java.io.File
val deltaFilesLocation = new File("delta-files").getAbsolutePath
val spark = SparkSession.builder()
.master("local[1]")
.config("spark.app.name", "delta-test")
.config("spark.driver.bindAddress", "127.0.0.1")
.config("spark.sql.session.timeZone", "UTC")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.caseSensitive", "false")
.getOrCreate()
spark.sql(
s"""
CREATE OR REPLACE TABLE delta.`$deltaFilesLocation/database=database_test/table_a_test/` (
hub_index_transaction_date string,
sid string,
event_datetime timestamp,
d_event_type_id int,
hub_transaction_datetime timestamp
)
USING DELTA
TBLPROPERTIES (delta.compatibility.symlinkFormatManifest.enabled=true, delta.dataSkippingNumIndexedCols=4, delta.enableDeletionVectors=false)
PARTITIONED BY (hub_index_transaction_date)
""")
spark.sql(
s"""
CREATE OR REPLACE TABLE delta.`$deltaFilesLocation/database=database_test/table_b_test/` (
hub_index_2d_sid string,
id bigint,
sid string,
begin_session timestamp,
end_session timestamp,
status string,
hub_transaction_datetime timestamp
)
USING DELTA
TBLPROPERTIES (delta.compatibility.symlinkFormatManifest.enabled=true, delta.dataSkippingNumIndexedCols=6, delta.enableDeletionVectors=false)
PARTITIONED BY (hub_index_2d_sid)
""")
spark.sql(s"DESCRIBE TABLE EXTENDED delta.`$deltaFilesLocation/database=database_test/table_a_test/`").show(false)
spark.sql(s"DESCRIBE TABLE EXTENDED delta.`$deltaFilesLocation/database=database_test/table_b_test/`").show(false)
spark.sql(
s"""
MERGE INTO delta.`$deltaFilesLocation/database=database_test/table_b_test/` AS trg
USING (
WITH table_a_test AS (
SELECT
sid,
event_datetime,
event_datetime as origin_datetime,
d_event_type_id,
hub_transaction_datetime
FROM delta.`$deltaFilesLocation/database=database_test/table_a_test/` tat
WHERE hub_index_transaction_date = '2024-05-15'
),
table_b_test AS (
SELECT
id,
sid,
begin_session,
end_session,
hub_transaction_datetime
FROM delta.`$deltaFilesLocation/database=database_test/table_b_test/`
),
last_table_b_id AS (
SELECT
COALESCE(MAX(id), 0) AS last_id
FROM table_b_test
),
new_sessions AS (
SELECT
SUBSTRING(sid, 1, 2) hub_index_2d_sid,
sid,
'O' status,
TO_TIMESTAMP('2000-01-01 00:00:00.000') begin_session,
TO_TIMESTAMP('2999-12-31 00:00:00.000') end_session,
min(event_datetime) origin_datetime,
max(tat.hub_transaction_datetime) hub_transaction_datetime
FROM table_a_test tat
WHERE NOT EXISTS (
SELECT 1
FROM table_b_test s
WHERE s.sid = tat.sid
AND s.begin_session = TO_TIMESTAMP('2000-01-01 00:00:00.000')
)
GROUP BY tat.sid
),
purchases AS (
SELECT
tat.sid sid,
min(tat.origin_datetime) origin_datetime,
max(hub_transaction_datetime) hub_transaction_datetime
FROM table_a_test tat
WHERE tat.d_event_type_id = 4
GROUP BY tat.sid
),
renew_sessions AS (
SELECT
SUBSTRING(sid, 1, 2) hub_index_2d_sid,
p.sid sid,
'O' status,
p.origin_datetime begin_session,
TO_TIMESTAMP('2999-12-31 00:00:00.000') end_session,
p.hub_transaction_datetime
FROM purchases p
WHERE (
EXISTS (
SELECT 1
FROM table_b_test ds
WHERE p.sid = ds.sid
)
OR EXISTS (
SELECT 1
FROM new_sessions ns
WHERE p.sid = ns.sid
)
)
),
processed_new_sessions AS (
SELECT
ns.hub_index_2d_sid,
ns.sid,
CASE
WHEN rs.status is not null THEN 'C'
ELSE ns.status
END status,
ns.begin_session,
CASE
WHEN rs.begin_session is not null THEN rs.begin_session
ELSE ns.end_session
END end_session,
COALESCE(ns.hub_transaction_datetime, rs.hub_transaction_datetime) AS hub_transaction_datetime
FROM new_sessions ns
LEFT JOIN renew_sessions rs ON ns.sid = rs.sid
AND ns.begin_session < rs.begin_session
),
close_opened_sessions AS (
SELECT
hub_index_2d_sid,
sid,
'' status,
begin_session,
end_session,
rs.hub_transaction_datetime
FROM renew_sessions rs
WHERE NOT EXISTS (
SELECT 1
FROM processed_new_sessions pns
WHERE rs.sid = pns.sid
)
),
final_table AS (
SELECT
hub_index_2d_sid,
ROW_NUMBER() OVER(ORDER BY sid) + (SELECT last_id FROM last_table_b_id) AS id,
sid,
status,
begin_session,
end_session,
hub_transaction_datetime
FROM (
SELECT
hub_index_2d_sid,
sid,
status,
begin_session,
end_session,
hub_transaction_datetime
FROM processed_new_sessions
UNION ALL
SELECT
hub_index_2d_sid,
sid,
status,
begin_session,
end_session,
hub_transaction_datetime
FROM renew_sessions
UNION ALL
SELECT
hub_index_2d_sid,
sid,
status,
begin_session,
end_session,
hub_transaction_datetime
FROM close_opened_sessions
) AS ft
WHERE NOT EXISTS (
SELECT 1
FROM table_b_test AS ds
WHERE ft.sid = ds.sid
AND ds.begin_session >= ft.begin_session
)
)
SELECT
hub_index_2d_sid,
id,
sid,
status,
begin_session,
end_session,
hub_transaction_datetime
FROM final_table
) AS updates ON trg.hub_index_2d_sid = updates.hub_index_2d_sid
AND trg.sid = updates.sid
AND updates.status = ''
AND trg.status = 'O'
AND trg.begin_session < updates.begin_session
WHEN MATCHED THEN
UPDATE SET
trg.status = 'C',
trg.end_session = updates.begin_session
WHEN NOT MATCHED AND updates.status <> '' THEN
INSERT (
hub_index_2d_sid,
id,
sid,
begin_session,
end_session,
status,
hub_transaction_datetime
)
VALUES (
hub_index_2d_sid,
id,
sid,
begin_session,
end_session,
status,
hub_transaction_datetime
)
""")
Observed results
The MERGE command does not execute successfully. A few seconds after the start of the query execution, an error saying that the Delta table is does not support reads is thrown. The error stack trace can be checked in Further details section.
Expected results
I expected that the MERGE command executed successfully, as in Delta Lake 3.0.0 and below versions.
Further details
24/05/15 12:24:04 ERROR MergeIntoCommand: Fatal error in MERGE with materialized source in attempt 1.
org.apache.spark.sql.AnalysisException: Table does not support reads: delta.`file:/hidden/hidden/hidden/hidden/delta-files/database=database_test/table_b_test`.
at org.apache.spark.sql.errors.QueryCompilationErrors$.tableDoesNotSupportError(QueryCompilationErrors.scala:1392)
at org.apache.spark.sql.errors.QueryCompilationErrors$.tableDoesNotSupportReadsError(QueryCompilationErrors.scala:1396)
at org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits$TableHelper.asReadable(DataSourceV2Implicits.scala:38)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applyOrElse(V2ScanRelationPushDown.scala:58)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$$anonfun$createScanBuilder$1.applyOrElse(V2ScanRelationPushDown.scala:56)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
at org.apache.spark.sql.catalyst.plans.logical.Aggregate.mapChildren(basicLogicalOperators.scala:1134)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
at org.apache.spark.sql.catalyst.plans.logical.Subquery.mapChildren(basicLogicalOperators.scala:60)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:405)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.createScanBuilder(V2ScanRelationPushDown.scala:56)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$1(V2ScanRelationPushDown.scala:43)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.$anonfun$apply$8(V2ScanRelationPushDown.scala:52)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:51)
at org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown$.apply(V2ScanRelationPushDown.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries$$anonfun$apply$4.applyOrElse(Optimizer.scala:322)
at org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries$$anonfun$apply$4.applyOrElse(Optimizer.scala:317)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren(TreeNode.scala:1243)
at org.apache.spark.sql.catalyst.trees.BinaryLike.mapChildren$(TreeNode.scala:1240)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.mapChildren(Expression.scala:653)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
at org.apache.spark.sql.catalyst.expressions.UnaryExpression.mapChildren(Expression.scala:533)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$transformExpressionsDownWithPruning$1(QueryPlan.scala:167)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$1(QueryPlan.scala:208)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:208)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:219)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$3(QueryPlan.scala:224)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.Iterator.foreach(Iterator.scala:943)
at scala.collection.Iterator.foreach$(Iterator.scala:943)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
at scala.collection.IterableLike.foreach(IterableLike.scala:74)
at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
at org.apache.spark.sql.catalyst.plans.QueryPlan.recursiveTransform$1(QueryPlan.scala:224)
at org.apache.spark.sql.catalyst.plans.QueryPlan.$anonfun$mapExpressions$4(QueryPlan.scala:229)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:304)
at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:229)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsDownWithPruning(QueryPlan.scala:167)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsWithPruning(QueryPlan.scala:138)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressionsWithPruning$1.applyOrElse(QueryPlan.scala:261)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformAllExpressionsWithPruning$1.applyOrElse(QueryPlan.scala:259)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$3(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren(TreeNode.scala:1215)
at org.apache.spark.sql.catalyst.trees.UnaryLike.mapChildren$(TreeNode.scala:1214)
at org.apache.spark.sql.catalyst.plans.logical.Project.mapChildren(basicLogicalOperators.scala:71)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:466)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformWithPruning(TreeNode.scala:427)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformAllExpressionsWithPruning(QueryPlan.scala:259)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformAllExpressionsWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformAllExpressionsWithPruning(AnalysisHelper.scala:291)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformAllExpressionsWithPruning$(AnalysisHelper.scala:286)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformAllExpressionsWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries$.apply(Optimizer.scala:317)
at org.apache.spark.sql.catalyst.optimizer.Optimizer$OptimizeSubqueries$.apply(Optimizer.scala:305)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.IndexedSeqOptimized.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.IndexedSeqOptimized.foldLeft$(IndexedSeqOptimized.scala:68)
at scala.collection.mutable.WrappedArray.foldLeft(WrappedArray.scala:38)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$optimizedPlan$1(QueryExecution.scala:152)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:148)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:144)
at org.apache.spark.sql.execution.QueryExecution.assertOptimized(QueryExecution.scala:162)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:182)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:179)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:120)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
at org.apache.spark.sql.Dataset.checkpoint(Dataset.scala:727)
at org.apache.spark.sql.Dataset.localCheckpoint(Dataset.scala:714)
at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.prepareMergeSource(MergeIntoMaterializeSource.scala:314)
at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.prepareMergeSource$(MergeIntoMaterializeSource.scala:282)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.prepareMergeSource(MergeIntoCommand.scala:60)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2(MergeIntoCommand.scala:117)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$2$adapted(MergeIntoCommand.scala:84)
at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:227)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.$anonfun$runMerge$1(MergeIntoCommand.scala:84)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile(DeltaLogging.scala:168)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordFrameProfile$(DeltaLogging.scala:166)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordFrameProfile(MergeIntoCommand.scala:60)
at org.apache.spark.sql.delta.metering.DeltaLogging.$anonfun$recordDeltaOperationInternal$1(DeltaLogging.scala:136)
at com.databricks.spark.util.DatabricksLogging.recordOperation(DatabricksLogging.scala:128)
at com.databricks.spark.util.DatabricksLogging.recordOperation$(DatabricksLogging.scala:117)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordOperation(MergeIntoCommand.scala:60)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperationInternal(DeltaLogging.scala:135)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation(DeltaLogging.scala:125)
at org.apache.spark.sql.delta.metering.DeltaLogging.recordDeltaOperation$(DeltaLogging.scala:115)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.recordDeltaOperation(MergeIntoCommand.scala:60)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.runMerge(MergeIntoCommand.scala:82)
at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.$anonfun$run$1(MergeIntoCommandBase.scala:174)
at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries(MergeIntoMaterializeSource.scala:106)
at org.apache.spark.sql.delta.commands.merge.MergeIntoMaterializeSource.runWithMaterializedSourceLostRetries$(MergeIntoMaterializeSource.scala:94)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.runWithMaterializedSourceLostRetries(MergeIntoCommand.scala:60)
at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run(MergeIntoCommandBase.scala:174)
at org.apache.spark.sql.delta.commands.MergeIntoCommandBase.run$(MergeIntoCommandBase.scala:152)
at org.apache.spark.sql.delta.commands.MergeIntoCommand.run(MergeIntoCommand.scala:60)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.<init>(Dataset.scala:220)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:100)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
at com.hotmart.spark.delta.test.Main$.main(Main.scala:76)
at com.hotmart.spark.delta.test.Main.main(Main.scala)
Environment information
- Delta Lake version: 3.2.0
- Spark version: 3.5.0
- Scala version: 2.12.17
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [x] No. I cannot contribute a bug fix at this time.
Seems there's a conflict in resolving tables in Analyzer when a table is used for both target table and source table. A quick workaround could be
val sourceDf = spark.sql("(WITH ... FROM final_table)")
sourceDf.createTempView("sourceDataView")
// use the view as source:
spark.sql("""
MERGE INTO delta.`$deltaFilesLocation/database=database_test/table_b_test/` AS trg
USING sourceDataView as src
ON ...""")
Here's a smaller repro:
spark.sql("create table mytab (id int) using delta")
spark.sql("""
merge into mytab as target
using (
select (select max(id) from mytab) as id
from (
select to_timestamp('2000-01-01 00:00:00') dummy
from mytab
)
) as source
on target.id = source.id
when matched then update set target.id = 0
""")
And the root cause is because relation objects are shared by relationCache in Spark's Analyzer. Because plan nodes are mutable when it comes to tags, it should be cloning the objects instead of sharing the same object in multiple places. DeltaAnalysis adds a tag to the target relation to avoid V2 to V1 conversion, but the same relation can be used in the source clause, which causes the plan to have V2 relations in the source clause. And hence the "AnalysisException: Table does not support reads" error.
The issue should be fixed on the Spark side, although it is possible to implement a fix on the Delta side. We can clone the relation before adding the tag.