iceberg icon indicating copy to clipboard operation
iceberg copied to clipboard

The AvroSchemaUtil will convert iceberg's optional map into confusing union.

Open openinx opened this issue 5 years ago • 7 comments

When I write few unit tests for https://github.com/apache/iceberg/pull/1477/files, I found that the encode/decode test would not pass because of the AvroSchemaUtil conversion issue.

The test is easy to understand:

package org.apache.iceberg.avro;


import java.io.IOException;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.junit.Assert;

public class TestAvroEncoderUtil extends AvroDataTest {

  @Override
  protected void writeAndValidate(org.apache.iceberg.Schema schema) throws IOException {
    List<GenericData.Record> expected = RandomAvroData.generate(schema, 100, 1990L);
    Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());

    for (GenericData.Record record : expected) {
      byte[] serializedData = AvroEncoderUtil.encode(record, avroSchema);
      GenericData.Record expectedRecord = AvroEncoderUtil.decode(serializedData);

      byte[] serializedData2 = AvroEncoderUtil.encode(expectedRecord, avroSchema);
      Assert.assertArrayEquals(serializedData2, serializedData);
  }

After digging into this issue, I found that the cause is here.

For example, if we convert the simple iceberg schema to avro schema:

    Schema schema = new Schema(
        required(0, "id", Types.LongType.get()),
        optional(1, "data", Types.MapType.ofOptional(2, 3,
            Types.LongType.get(),
            Types.StringType.get())));
    org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct());
    System.out.println(avroSchema.toString(true));

We will get

{
  "type" : "record",
  "name" : "rnull",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "field-id" : 0
  }, {
    "name" : "data",
    "type" : [ "null", {
      "type" : "array",       // <-  it will add an array ???   That's quite confusing ? 
      "items" : {
        "type" : "record",
        "name" : "k2_v3",
        "fields" : [ {
          "name" : "key",
          "type" : "long",
          "field-id" : 2
        }, {
          "name" : "value",
          "type" : [ "null", "string" ],
          "default" : null,
          "field-id" : 3
        } ]
      },
      "logicalType" : "map"
    } ],
    "default" : null,
    "field-id" : 1
  } ]
}

For my understanding, it should be the normal json:

{
  "type" : "record",
  "name" : "rnull",
  "fields" : [ {
    "name" : "id",
    "type" : "long",
    "field-id" : 0
  }, {
    "name" : "data",
    "type" : [ "null", {
      "type" : "record",
        "name" : "k2_v3",
        "fields" : [ {
          "name" : "key",
          "type" : "long",
          "field-id" : 2
        }, {
          "name" : "value",
          "type" : [ "null", "string" ],
          "default" : null,
          "field-id" : 3
        } ],
      "logicalType" : "map"
    } ],
    "default" : null,
    "field-id" : 1
  } ]
}

What's the reason that we plan to accomplish like that ? Not quite understand the log message from the commit https://github.com/apache/iceberg/commit/d8cecc411daf16955963766fa6336d4260e7c797 actually.

@rdblue

openinx avatar Oct 10 '20 13:10 openinx

Let's take an example to show why would this break the encode/decode unit tests. For example, we have the schema:

  Schema schema = new Schema(
        required(0, "id", Types.LongType.get()),
        optional(1, "data", Types.MapType.ofOptional(2, 3,
            Types.LongType.get(),
            Types.StringType.get())));

And we have an original record :

record0 =  { id=1,  data = {Map{a=b, c=d, e=f}} }

Firstly we encode the record0 by using the converted avro schema , then decode the record0 (using avro schema), we will get the unexpected record:

record1 = { id=1, data = {List{Record{a=b},  Record{c=d}, Record{e=f}}}}

The assertion fail because record0 != record1 .

openinx avatar Oct 10 '20 13:10 openinx

Maps with non-string keys are not allowed in Avro, but they are valid in Iceberg. That means we need to serialize m: map<long, string> differently. The approach we take is to store a map as a list of key/value pairs, just like you would get by calling entrySet. The list is needed because the map can have any number of key/value pairs. Omitting it would be equivalent to m: struct<key: long, value: string>.

The Iceberg readers are built to handle this and reconstruct the map correctly on read. And we use a logical type annotation, "logicalType": "map", to indicate to regular Avro readers that the list<struct<long, string>> that is decoded should be converted to map<long, string>.

rdblue avatar Oct 10 '20 22:10 rdblue

OK, got it, thanks for the explanation. Let me think how to make this unit test work.

== Update ==

OK, I found that root cause that why the decoder read the record1 as { id=1, data = {List{Record{a=b}, Record{c=d}, Record{e=f}}}}, because I missed to register the map to LogicalTypes, adding the following sentence works for me now.

  static {
    LogicalTypes.register(LogicalMap.NAME, schema -> LogicalMap.get());
  }

openinx avatar Oct 11 '20 02:10 openinx

Storing a map as a list of key/value pairs will introduce another problem: its avro schema type is array while its real data type in GenericData.Record is Map, so we could not compare those records by using the default GenericData.Record comparator because it will use the array type from avro schema to parse the Map object.

That seems to be OK, we can fallback to compare the toString.

openinx avatar Oct 11 '20 03:10 openinx

If you needed to register a logical type, then you must be using Avro's DatumReader instead of an Iceberg DatumReader. Iceberg has a set of readers for objection construction that produce Avro generics that are used by internals, in GenericAvroReader.

I think this is also used by default for IcebergDecoder and IcebergEncoder, which use Iceberg schemas and write Avro's single-message encoding.

rdblue avatar Oct 12 '20 22:10 rdblue

If you needed to register a logical type, then you must be using Avro's DatumReader instead of an Iceberg DatumReader.

Yes, you're right. Here is the code: https://github.com/apache/iceberg/pull/1477/files#diff-863fca4ad067cd7dd66730275ed9dcb29fa4468b5b474461934bda9372f73280R72. It just use the GenericAvroReader to decode the datum. I see the IcebergDecoder & IcebergEncoder could only encode/decode the iceberg Record, while ManifestFile is a IndexedRecord, in theory we should define the AvroDecoder and AvroEncoder, and providing those tests to address the schema evolution issues.

openinx avatar Oct 13 '20 03:10 openinx

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Feb 27 '24 00:02 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Mar 13 '24 00:03 github-actions[bot]