The AvroSchemaUtil will convert iceberg's optional map into confusing union.
When I write few unit tests for https://github.com/apache/iceberg/pull/1477/files, I found that the encode/decode test would not pass because of the AvroSchemaUtil conversion issue.
The test is easy to understand:
package org.apache.iceberg.avro;
import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.junit.Assert;
public class TestAvroEncoderUtil extends AvroDataTest {
@Override
protected void writeAndValidate(org.apache.iceberg.Schema schema) throws IOException {
List<GenericData.Record> expected = RandomAvroData.generate(schema, 100, 1990L);
Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
for (GenericData.Record record : expected) {
byte[] serializedData = AvroEncoderUtil.encode(record, avroSchema);
GenericData.Record expectedRecord = AvroEncoderUtil.decode(serializedData);
byte[] serializedData2 = AvroEncoderUtil.encode(expectedRecord, avroSchema);
Assert.assertArrayEquals(serializedData2, serializedData);
}
After digging into this issue, I found that the cause is here.
For example, if we convert the simple iceberg schema to avro schema:
Schema schema = new Schema(
required(0, "id", Types.LongType.get()),
optional(1, "data", Types.MapType.ofOptional(2, 3,
Types.LongType.get(),
Types.StringType.get())));
org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
System.out.println(avroSchema.toString(true));
We will get
{
"type" : "record",
"name" : "rnull",
"fields" : [ {
"name" : "id",
"type" : "long",
"field-id" : 0
}, {
"name" : "data",
"type" : [ "null", {
"type" : "array", // <- it will add an array ??? That's quite confusing ?
"items" : {
"type" : "record",
"name" : "k2_v3",
"fields" : [ {
"name" : "key",
"type" : "long",
"field-id" : 2
}, {
"name" : "value",
"type" : [ "null", "string" ],
"default" : null,
"field-id" : 3
} ]
},
"logicalType" : "map"
} ],
"default" : null,
"field-id" : 1
} ]
}
For my understanding, it should be the normal json:
{
"type" : "record",
"name" : "rnull",
"fields" : [ {
"name" : "id",
"type" : "long",
"field-id" : 0
}, {
"name" : "data",
"type" : [ "null", {
"type" : "record",
"name" : "k2_v3",
"fields" : [ {
"name" : "key",
"type" : "long",
"field-id" : 2
}, {
"name" : "value",
"type" : [ "null", "string" ],
"default" : null,
"field-id" : 3
} ],
"logicalType" : "map"
} ],
"default" : null,
"field-id" : 1
} ]
}
What's the reason that we plan to accomplish like that ? Not quite understand the log message from the commit https://github.com/apache/iceberg/commit/d8cecc411daf16955963766fa6336d4260e7c797 actually.
@rdblue
Let's take an example to show why would this break the encode/decode unit tests. For example, we have the schema:
Schema schema = new Schema(
required(0, "id", Types.LongType.get()),
optional(1, "data", Types.MapType.ofOptional(2, 3,
Types.LongType.get(),
Types.StringType.get())));
And we have an original record :
record0 = { id=1, data = {Map{a=b, c=d, e=f}} }
Firstly we encode the record0 by using the converted avro schema , then decode the record0 (using avro schema), we will get the unexpected record:
record1 = { id=1, data = {List{Record{a=b}, Record{c=d}, Record{e=f}}}}
The assertion fail because record0 != record1 .
Maps with non-string keys are not allowed in Avro, but they are valid in Iceberg. That means we need to serialize m: map<long, string> differently. The approach we take is to store a map as a list of key/value pairs, just like you would get by calling entrySet. The list is needed because the map can have any number of key/value pairs. Omitting it would be equivalent to m: struct<key: long, value: string>.
The Iceberg readers are built to handle this and reconstruct the map correctly on read. And we use a logical type annotation, "logicalType": "map", to indicate to regular Avro readers that the list<struct<long, string>> that is decoded should be converted to map<long, string>.
OK, got it, thanks for the explanation. Let me think how to make this unit test work.
== Update ==
OK, I found that root cause that why the decoder read the record1 as { id=1, data = {List{Record{a=b}, Record{c=d}, Record{e=f}}}}, because I missed to register the map to LogicalTypes, adding the following sentence works for me now.
static {
LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
}
Storing a map as a list of key/value pairs will introduce another problem: its avro schema type is array while its real data type in GenericData.Record is Map, so we could not compare those records by using the default GenericData.Record comparator because it will use the array type from avro schema to parse the Map object.
That seems to be OK, we can fallback to compare the toString.
If you needed to register a logical type, then you must be using Avro's DatumReader instead of an Iceberg DatumReader. Iceberg has a set of readers for objection construction that produce Avro generics that are used by internals, in GenericAvroReader.
I think this is also used by default for IcebergDecoder and IcebergEncoder, which use Iceberg schemas and write Avro's single-message encoding.
If you needed to register a logical type, then you must be using Avro's DatumReader instead of an Iceberg DatumReader.
Yes, you're right. Here is the code: https://github.com/apache/iceberg/pull/1477/files#diff-863fca4ad067cd7dd66730275ed9dcb29fa4468b5b474461934bda9372f73280R72. It just use the GenericAvroReader to decode the datum. I see the IcebergDecoder & IcebergEncoder could only encode/decode the iceberg Record, while ManifestFile is a IndexedRecord, in theory we should define the AvroDecoder and AvroEncoder, and providing those tests to address the schema evolution issues.
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'