iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

Spark: Cannot read or write UUID columns

Open RussellSpitzer opened this issue 3 years ago • 13 comments

Because of the String -> Fixed Binary conversion the readers and writers are both incorrect.

The vectorized reader initializes a FixedBinary reader on a column we report is a String causing an unsupported reader exception.

java.lang.UnsupportedOperationException: Unsupported type: UTF8String
	at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:82)
	at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:140)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Sour

The writer is broken because it gets String Columns from Spark but needs to write fixed binary.

Something like this needed as a fix

  private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
    return new UUIDWriter(desc);
  }

  private static class UUIDWriter extends PrimitiveWriter<UTF8String> {
    private ByteBuffer buffer = ByteBuffer.allocate(16);

    private UUIDWriter(ColumnDescriptor desc) {
      super(desc);
    }

    @Override
    public void write(int repetitionLevel, UTF8String string) {
      UUID uuid = UUID.fromString(string.toString());
      buffer.rewind();
      buffer.putLong(uuid.getMostSignificantBits());
      buffer.putLong(uuid.getLeastSignificantBits());
      buffer.rewind();
      column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
    }
  }

RussellSpitzer avatar Apr 18 '22 16:04 RussellSpitzer

I'm working on a PR for this

RussellSpitzer avatar Apr 18 '22 16:04 RussellSpitzer

Actually after looking at this for a while I think we should probably just always handle UUID as binary type in Spark rather than trying to do a String conversion. I think if the end user needs the string representation they can always cast?

RussellSpitzer avatar Apr 18 '22 18:04 RussellSpitzer

@RussellSpitzer: I am not sure that it is still an actual issue - or it was fixed in the current code, but I have found a year ago that Parquet and ORC/Avro expects UUID differently for writes. See: #1881

And this is even before the Spark code 😄

pvary avatar Apr 26 '22 20:04 pvary

I have kind of punted on this, I was just toying around with it for a types test and don't have any users yet 🤞 who have hit this

RussellSpitzer avatar Apr 26 '22 20:04 RussellSpitzer

Hi @RussellSpitzer ,

when we want to read a Spark df using iceberg, and this df has UUID column type, while exporting this df to CSV we got an error:

py4j.protocol.Py4JJavaError: An error occurred while calling o669.csv.
: org.apache.spark.SparkException: Job aborted.
        at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
        at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
(...)
        ... 1 moreCaused by: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.unsafe.types.UTF8String ([B is in module java.base of loader 'bootstrap'; org.apache.spa
rk.unsafe.types.UTF8String is in unnamed module of loader 'app')
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
        at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
        at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

Can this be related to this issue? We could reproduce that by having a table with only one column of UUID type.

wobrycki avatar Jul 15 '22 07:07 wobrycki

Yep, currently the Spark code cannot read or write UUID correctly.

RussellSpitzer avatar Jul 15 '22 18:07 RussellSpitzer

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Jan 23 '23 00:01 github-actions[bot]

@RussellSpitzer could you point us to the code, where the adjustments are needed? Should it be SparkValueReaders.java and SparkValueWriters.java?

I guess there is no test that writes the UUID column as it is not working yet, so we should probably adjust the tests like TestSparkParquetWriter as well.

Normally uuids(ColumnDescriptor desc) returns an instance of ValueWriter<UTF8String> and not PrimitiveWriter<UTF8String>, what is the reason we are going for PrimitiveWriter here?

wobrycki avatar Mar 20 '23 15:03 wobrycki

Sorry we totally deprioritized this so I haven't looked at it in a while so I couldn't tell you why.

RussellSpitzer avatar Mar 20 '23 16:03 RussellSpitzer

This has been fixed by https://github.com/apache/iceberg/pull/7399 and also backported to Spark 3.3 + 3.2 with https://github.com/apache/iceberg/pull/7496 / https://github.com/apache/iceberg/pull/7497

nastra avatar May 02 '23 06:05 nastra

Reopening this, since I'm seeing the same error in https://github.com/apache/iceberg-python/pull/2007 The issue is that we use the vectorized reader to read the field as UTF8, which doesn't work since it is variable length. Let me check if I can come up with a patch.

Fokko avatar Jun 16 '25 08:06 Fokko

I think it has to do with the logical annotation missing:

parq 00000-0-baffe582-fd33-46a2-8330-158b90403511.parquet -s 

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x104554c40>
required group field_id=-1 schema {
  required fixed_len_byte_array(16) field_id=1 uuid;
}

Edit: Also, with the logical type, we're seeing the same:

parq 00000-0-032d4c0d-b3be-4b11-acb1-3b5e56aeb684.parquet -s 

 # Schema 
 <pyarrow._parquet.ParquetSchema object at 0x109a58d40>
required group field_id=-1 schema {
  required fixed_len_byte_array(16) field_id=1 uuid (UUID);
}

I'm able to reproduce it when the table is bucket partitioned on the UUID column; it works fine with an identity partition on the UUID column. This is because for the identity column, the value is extracted from the metadata rather than read from the Parquet file.

Fokko avatar Jun 16 '25 08:06 Fokko

It looks like it was only fixed for the plain encoded UUIDs. https://github.com/apache/iceberg/pull/13324 adds support for dictionary-encoded columns.

Fokko avatar Jun 16 '25 11:06 Fokko