Spark: Cannot read or write UUID columns
Because of the String -> Fixed Binary conversion the readers and writers are both incorrect.
The vectorized reader initializes a FixedBinary reader on a column we report is a String causing an unsupported reader exception.
java.lang.UnsupportedOperationException: Unsupported type: UTF8String
at org.apache.iceberg.arrow.vectorized.ArrowVectorAccessor.getUTF8String(ArrowVectorAccessor.java:82)
at org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector.getUTF8String(IcebergArrowColumnVector.java:140)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Sour
The writer is broken because it gets String Columns from Spark but needs to write fixed binary.
Something like this needed as a fix
private static PrimitiveWriter<UTF8String> uuids(ColumnDescriptor desc) {
return new UUIDWriter(desc);
}
private static class UUIDWriter extends PrimitiveWriter<UTF8String> {
private ByteBuffer buffer = ByteBuffer.allocate(16);
private UUIDWriter(ColumnDescriptor desc) {
super(desc);
}
@Override
public void write(int repetitionLevel, UTF8String string) {
UUID uuid = UUID.fromString(string.toString());
buffer.rewind();
buffer.putLong(uuid.getMostSignificantBits());
buffer.putLong(uuid.getLeastSignificantBits());
buffer.rewind();
column.writeBinary(repetitionLevel, Binary.fromReusedByteBuffer(buffer));
}
}
I'm working on a PR for this
Actually after looking at this for a while I think we should probably just always handle UUID as binary type in Spark rather than trying to do a String conversion. I think if the end user needs the string representation they can always cast?
@RussellSpitzer: I am not sure that it is still an actual issue - or it was fixed in the current code, but I have found a year ago that Parquet and ORC/Avro expects UUID differently for writes. See: #1881
And this is even before the Spark code 😄
I have kind of punted on this, I was just toying around with it for a types test and don't have any users yet 🤞 who have hit this
Hi @RussellSpitzer ,
when we want to read a Spark df using iceberg, and this df has UUID column type, while exporting this df to CSV we got an error:
py4j.protocol.Py4JJavaError: An error occurred while calling o669.csv.
: org.apache.spark.SparkException: Job aborted.
at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
(...)
... 1 moreCaused by: java.lang.ClassCastException: class [B cannot be cast to class org.apache.spark.unsafe.types.UTF8String ([B is in module java.base of loader 'bootstrap'; org.apache.spa
rk.unsafe.types.UTF8String is in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getUTF8String$(rows.scala:46)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getUTF8String(rows.scala:195)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
Can this be related to this issue? We could reproduce that by having a table with only one column of UUID type.
Yep, currently the Spark code cannot read or write UUID correctly.
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
@RussellSpitzer could you point us to the code, where the adjustments are needed?
Should it be SparkValueReaders.java and SparkValueWriters.java?
I guess there is no test that writes the UUID column as it is not working yet, so we should probably adjust the tests like TestSparkParquetWriter as well.
Normally uuids(ColumnDescriptor desc) returns an instance of ValueWriter<UTF8String> and not PrimitiveWriter<UTF8String>, what is the reason we are going for PrimitiveWriter here?
Sorry we totally deprioritized this so I haven't looked at it in a while so I couldn't tell you why.
This has been fixed by https://github.com/apache/iceberg/pull/7399 and also backported to Spark 3.3 + 3.2 with https://github.com/apache/iceberg/pull/7496 / https://github.com/apache/iceberg/pull/7497
Reopening this, since I'm seeing the same error in https://github.com/apache/iceberg-python/pull/2007 The issue is that we use the vectorized reader to read the field as UTF8, which doesn't work since it is variable length. Let me check if I can come up with a patch.
I think it has to do with the logical annotation missing:
parq 00000-0-baffe582-fd33-46a2-8330-158b90403511.parquet -s
# Schema
<pyarrow._parquet.ParquetSchema object at 0x104554c40>
required group field_id=-1 schema {
required fixed_len_byte_array(16) field_id=1 uuid;
}
Edit: Also, with the logical type, we're seeing the same:
parq 00000-0-032d4c0d-b3be-4b11-acb1-3b5e56aeb684.parquet -s
# Schema
<pyarrow._parquet.ParquetSchema object at 0x109a58d40>
required group field_id=-1 schema {
required fixed_len_byte_array(16) field_id=1 uuid (UUID);
}
I'm able to reproduce it when the table is bucket partitioned on the UUID column; it works fine with an identity partition on the UUID column. This is because for the identity column, the value is extracted from the metadata rather than read from the Parquet file.
It looks like it was only fixed for the plain encoded UUIDs. https://github.com/apache/iceberg/pull/13324 adds support for dictionary-encoded columns.