delta
delta copied to clipboard
[BUG][Spark] DELETE destination only supports Delta sources
Bug
Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Describe the problem
I have a delta table:
I want to call delete operation, when I use the API it fails, however if I use SQL it works:
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
Cell In[18], line 1
----> 1 deltaTable.delete((F.col("GLOBAL_ID").isin(['test'])))
File /usr/local/lib/python3.10/dist-packages/delta/tables.py:106, in DeltaTable.delete(self, condition)
104 self._jdt.delete()
105 else:
--> 106 self._jdt.delete(DeltaTable._condition_to_jcolumn(condition))
File /usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /usr/local/lib/python3.10/dist-packages/pyspark/errors/exceptions/captured.py:175, in capture_sql_exception.<locals>.deco(*a, **kw)
171 converted = convert_exception(e.java_exception)
172 if not isinstance(converted, UnknownException):
173 # Hide where the exception came from that shows a non-Pythonic
174 # JVM exception message.
--> 175 raise converted from None
176 else:
177 raise
AnalysisException: DELETE destination only supports Delta sources.
Some(DeleteFromTable GLOBAL_ID#223 IN (test)
+- SubqueryAlias spark_catalog.silver.YAK
+- Project [FIRMA#38, LAGER#39, VON_AN_LAGER#40, BESCHAFFUNGSART#41, BELEGART#42, UNTERBELEGART#43, KENNZEICHEN_STORNO#44, BESTANDSSTATUS#45, AKSTB2#46, KUNDE_LIEFERANT#47, KUNDE#48, KUNDE_STATISTIK#49, KUNDENNR_KREDIT#50, KONTO_ZAHLUNG#51, KUNDENNUMMER_LIEFERANT_FIBU#52, HAUPTNUMMER_KUNDE_LIEFERANT#53, VERBAND_ZUGEORDNET#54, LIEFERANT#55, LIEFERANTENNUMMER_FRACHT#56, FRACHTSTATUS#57, ORIGINAL_POSITION#58, ORIGINAL_SET_POSITION#59, BELEGNUMMER#60, BELEGNUMMER_LIEFERSCHEIN#61, ... 162 more fields]
+- Relation spark_catalog.silver.yak[FIRMA#38,LAGER#39,VON_AN_LAGER#40,BESCHAFFUNGSART#41,BELEGART#42,UNTERBELEGART#43,KENNZEICHEN_STORNO#44,BESTANDSSTATUS#45,AKSTB2#46,KUNDE_LIEFERANT#47,KUNDE#48,KUNDE_STATISTIK#49,KUNDENNR_KREDIT#50,KONTO_ZAHLUNG#51,KUNDENNUMMER_LIEFERANT_FIBU#52,HAUPTNUMMER_KUNDE_LIEFERANT#53,VERBAND_ZUGEORDNET#54,LIEFERANT#55,LIEFERANTENNUMMER_FRACHT#56,FRACHTSTATUS#57,ORIGINAL_POSITION#58,ORIGINAL_SET_POSITION#59,BELEGNUMMER#60,BELEGNUMMER_LIEFERSCHEIN#61,... 162 more fields] parquet
Steps to reproduce
Get Spark 3.4.3 (git revision 1eb558c3a6f) built for Hadoop 3.3.4 Get delta 2.4.0
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2c58c979-8fcb-4371-b930-681b6429a110;1.0
confs: [default]
found io.delta#delta-core_2.12;2.4.0 in central
found io.delta#delta-storage;2.4.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 135ms :: artifacts dl 6ms
:: modules in use:
io.delta#delta-core_2.12;2.4.0 from central in [default]
io.delta#delta-storage;2.4.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-2c58c979-8fcb-4371-b930-681b6429a110
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/5ms)
24/09/04 15:19:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/04 15:19:01 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/09/04 15:19:01 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
Get a Table similar to the one above. Sometime like this fails
deltaTable = DeltaTable.forName(spark, 'silver.' + TN)
deltaTable.delete((F.col("GLOBAL_ID").isin(['test'])))
Observed results
For some reason delta think data is parquet when using the API.
Expected results
Works
Further details
Please note that if I create a fresh delta table like so it works:
df = spark.createDataFrame([(1, 2, 3), (4, 5, 6), (7, 8, 9)])
df.write.format('delta').save('/tmp/temp')
spark.sql('create table temp using delta location "/tmp/temp"')
spark.sql('delete from temp where _1 in (1, 4)')
from delta.tables import *
deltaTable = DeltaTable.forName(spark, 'temp')
deltaTable.delete((F.col("_1").isin([1, 2])) & (F.col('_2') > 4))
Please note that we've just migrated from Spark: 3.3.* and Delta 2.3.* where it worked.
Environment information
- Delta Lake version: 2.4.*
- Spark version: 3.4.3
- Scala version:
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [x] No. I cannot contribute a bug fix at this time.