Apache Spark cannot properly appy operators and arithmetics on Decimal when uploaded by embucket
This failure was caught by integration test. Some queries fail with different row count returned. Upon investigation it boils down operations on Decimal somehow fail: i.e.
In [41]: r1 = s.sql("""select max(i_current_price) from item_0bb8d24e_spark LIMIT 50""").show()
+--------------------+
|max(i_current_price)|
+--------------------+
| 99.89|
+--------------------+
In [42]: r1 = s.sql("""select max(i_current_price) from item_567433c5_embucket LIMIT 50""").show()
+--------------------+
|max(i_current_price)|
+--------------------+
| NULL|
+--------------------+
In [43]: r1 = s.sql("""select max(cast(i_current_price as float)) from item_567433c5_embucket LIMIT 50""").show()
+-----------------------------------+
|max(CAST(i_current_price AS FLOAT))|
+-----------------------------------+
| 99.89|
+-----------------------------------+
In [44]: r1 = s.sql("""select max(cast(i_current_price as Decimal)) from item_567433c5_embucket LIMIT 50""").show()
+-------------------------------------------+
|max(CAST(i_current_price AS DECIMAL(10,0)))|
+-------------------------------------------+
| 100|
+-------------------------------------------+
However, Spark has no problems reading parquet file directly:
In [51]: s.sql("""select max(CAST(i_current_price AS float)) from parquet.`item_spark.parquet` LIMIT 50""").show(n=50)
+-----------------------------------+
|max(CAST(i_current_price AS FLOAT))|
+-----------------------------------+
| 99.89|
+-----------------------------------+
In [52]: s.sql("""select max(i_current_price) from parquet.`item_spark.parquet` LIMIT 50""").show(n=50)
+--------------------+
|max(i_current_price)|
+--------------------+
| 99.89|
+--------------------+
@rampage644 I took a look at this and these were my findings.
Repro
CREATE TABLE demo.public.decimal_test (price DECIMAL(10,2));
INSERT INTO demo.public.decimal_test VALUES
(99.89), (49.50), (12.34);
I started pyspark with this:
pyspark \
--packages org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0 \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.rest.type=rest \
--conf spark.sql.catalog.rest.uri=http://localhost:3000/catalog \
--conf spark.sql.catalog.rest.warehouse=demo \
--driver-java-options "-Djava.security.manager=allow"
Running the query through spark fails as expected from the issue's description:
>>> spark.sql("SELECT max(price) FROM rest.public.decimal_test").show()
+----------+
|max(price)|
+----------+
| NULL|
+----------+
>>> spark.sql("SELECT max(CAST(price AS Decimal)) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,0)))|
+---------------------------------+
| 100|
+---------------------------------+
Querying the parquet file directly works fine as expected:
>>> df = spark.read.parquet("/homedir/demo/demo/public/decimal_test/data/e037a4/42650cb2-90a8-11f0-93c5-e037\
a4e13194.parquet")
>>> df.selectExpr("max(price)").show()
+----------+
|max(price)|
+----------+
| 99.89|
+----------+
Experiments
Casting to Decimal shows max(CAST(price AS DECIMAL(10,0))) in the results. I then experimented with different values for the scale:
>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 1))) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,1)))|
+---------------------------------+
| 99.9|
+---------------------------------+
>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 2))) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,2)))|
+---------------------------------+
| NULL|
+---------------------------------+
>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 3))) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,3)))|
+---------------------------------+
| 99.890|
+---------------------------------+
As you can see from the above, only max(CAST(price AS DECIMAL(10,2))), which matches the table column type exactly returns NULL.
Different experiment, same outcome
I experimented with a different value for the scale in decimal and the outcome was the same:
CREATE TABLE demo.public.decimal_test_2 (
price DECIMAL(10,3)
);
INSERT INTO demo.public.decimal_test_2 VALUES
(99.899), (49.500), (12.344);
>>> spark.sql("SELECT max(price) FROM rest.public.decimal_test_2").show()
+----------+
|max(price)|
+----------+
| NULL|
+----------+
>>> spark.sql("SELECT max(CAST(price AS Decimal)) FROM rest.public.decimal_test_2").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,0)))|
+---------------------------------+
| 100|
+---------------------------------+
>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 2))) FROM rest.public.decimal_test_2").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,2)))|
+---------------------------------+
| 99.90|
+---------------------------------+
>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 3))) FROM rest.public.decimal_test_2").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,3)))|
+---------------------------------+
| NULL|
+---------------------------------+
In this case, the table column type is Decimal(10, 3) and that one returns NULL.
Conclusion
This likely points to a bug in Spark's Iceberg Connector, probably when resolving the decimal type.