embucket-labs icon indicating copy to clipboard operation
embucket-labs copied to clipboard

Apache Spark cannot properly appy operators and arithmetics on Decimal when uploaded by embucket

Open rampage644 opened this issue 3 months ago • 2 comments

This failure was caught by integration test. Some queries fail with different row count returned. Upon investigation it boils down operations on Decimal somehow fail: i.e.

In [41]: r1 = s.sql("""select max(i_current_price) from item_0bb8d24e_spark  LIMIT 50""").show()
+--------------------+
|max(i_current_price)|
+--------------------+
|               99.89|
+--------------------+


In [42]: r1 = s.sql("""select max(i_current_price) from item_567433c5_embucket  LIMIT 50""").show()
+--------------------+
|max(i_current_price)|
+--------------------+
|                NULL|
+--------------------+


In [43]: r1 = s.sql("""select max(cast(i_current_price as float)) from item_567433c5_embucket  LIMIT 50""").show()
+-----------------------------------+
|max(CAST(i_current_price AS FLOAT))|
+-----------------------------------+
|                              99.89|
+-----------------------------------+


In [44]: r1 = s.sql("""select max(cast(i_current_price as Decimal)) from item_567433c5_embucket  LIMIT 50""").show()
+-------------------------------------------+
|max(CAST(i_current_price AS DECIMAL(10,0)))|
+-------------------------------------------+
|                                        100|
+-------------------------------------------+

rampage644 avatar Sep 04 '25 23:09 rampage644

However, Spark has no problems reading parquet file directly:

In [51]: s.sql("""select max(CAST(i_current_price AS float)) from parquet.`item_spark.parquet` LIMIT 50""").show(n=50)
+-----------------------------------+
|max(CAST(i_current_price AS FLOAT))|
+-----------------------------------+
|                              99.89|
+-----------------------------------+


In [52]: s.sql("""select max(i_current_price) from parquet.`item_spark.parquet` LIMIT 50""").show(n=50)
+--------------------+
|max(i_current_price)|
+--------------------+
|               99.89|
+--------------------+

rampage644 avatar Sep 04 '25 23:09 rampage644

@rampage644 I took a look at this and these were my findings.

Repro

CREATE TABLE demo.public.decimal_test (price DECIMAL(10,2));

INSERT INTO demo.public.decimal_test VALUES
  (99.89), (49.50), (12.34);

I started pyspark with this:

pyspark \
  --packages org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0 \
  --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
  --conf spark.sql.catalog.rest=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.rest.type=rest \
  --conf spark.sql.catalog.rest.uri=http://localhost:3000/catalog \
  --conf spark.sql.catalog.rest.warehouse=demo \
  --driver-java-options "-Djava.security.manager=allow"

Running the query through spark fails as expected from the issue's description:

>>> spark.sql("SELECT max(price) FROM rest.public.decimal_test").show()
+----------+
|max(price)|
+----------+
|      NULL|
+----------+

>>> spark.sql("SELECT max(CAST(price AS Decimal)) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,0)))|
+---------------------------------+
|                              100|
+---------------------------------+

Querying the parquet file directly works fine as expected:

>>> df = spark.read.parquet("/homedir/demo/demo/public/decimal_test/data/e037a4/42650cb2-90a8-11f0-93c5-e037\
a4e13194.parquet")
>>> df.selectExpr("max(price)").show()
+----------+
|max(price)|
+----------+
|     99.89|
+----------+

Experiments

Casting to Decimal shows max(CAST(price AS DECIMAL(10,0))) in the results. I then experimented with different values for the scale:

>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 1))) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,1)))|
+---------------------------------+
|                             99.9|
+---------------------------------+

>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 2))) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,2)))|
+---------------------------------+
|                             NULL|
+---------------------------------+

>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 3))) FROM rest.public.decimal_test").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,3)))|
+---------------------------------+
|                           99.890|
+---------------------------------+

As you can see from the above, only max(CAST(price AS DECIMAL(10,2))), which matches the table column type exactly returns NULL.

Different experiment, same outcome

I experimented with a different value for the scale in decimal and the outcome was the same:

CREATE TABLE demo.public.decimal_test_2 (
    price DECIMAL(10,3)
);

INSERT INTO demo.public.decimal_test_2 VALUES
  (99.899), (49.500), (12.344);
>>> spark.sql("SELECT max(price) FROM rest.public.decimal_test_2").show()
+----------+
|max(price)|
+----------+
|      NULL|
+----------+

>>> spark.sql("SELECT max(CAST(price AS Decimal)) FROM rest.public.decimal_test_2").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,0)))|
+---------------------------------+
|                              100|
+---------------------------------+

>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 2))) FROM rest.public.decimal_test_2").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,2)))|
+---------------------------------+
|                            99.90|
+---------------------------------+

>>> spark.sql("SELECT max(CAST(price AS Decimal(10, 3))) FROM rest.public.decimal_test_2").show()
+---------------------------------+
|max(CAST(price AS DECIMAL(10,3)))|
+---------------------------------+
|                             NULL|
+---------------------------------+

In this case, the table column type is Decimal(10, 3) and that one returns NULL.

Conclusion

This likely points to a bug in Spark's Iceberg Connector, probably when resolving the decimal type.

AmosAidoo avatar Sep 15 '25 07:09 AmosAidoo