iceberg-go icon indicating copy to clipboard operation
iceberg-go copied to clipboard

Manifest entry partition filtering on struct fields

Open GabrielM98 opened this issue 8 months ago • 2 comments

Apache Iceberg version

main (development)

Please describe the bug 🐞

There appears to be an issue with the way in which partition filtering is applied to manifest entries when partitioning by a struct field. In my case, I have a table with the following partition spec...

{
      "spec-id": 3,
      "fields": [
        {
          "name": "event_metadata.timing.created_at_year",
          "transform": "year",
          "source-id": 19,
          "field-id": 1000
        },
        {
          "name": "event_metadata.timing.created_at_month",
          "transform": "month",
          "source-id": 19,
          "field-id": 1001
        },
        {
          "name": "user_uuid_bucket_256",
          "transform": "bucket[256]",
          "source-id": 5,
          "field-id": 1002
        }
      ]
}

When I then attempt to query said table, the call to (*table.Scan).PlanFiles returns an empty []table.FileScanTask. With Delve & GoLand, I believe I've managed to narrow down the issue to the getPartitionRecord function here...

func getPartitionRecord(dataFile iceberg.DataFile, partitionType *iceberg.StructType) partitionRecord {
	partitionData := dataFile.Partition()

	out := make(partitionRecord, len(partitionType.FieldList))
	for i, f := range partitionType.FieldList {
		out[i] = partitionData[f.Name]
	}

	return out
}

It's returning a partitionRecord like so...

Image

Whereas the first and second element in the slice should be 55 and 663 respectively.

If we look at the Name field values for the first two elements of partitionType.FieldList, they're given as event_metadata.timing.created_at_year and event_metadata.timing.created_at_month, whereas in the partitionData map, the keys that correspond to these fields are given as event_metadata_x2Etiming_x2Ecreated_at_year and event_metadata_x2Etiming_x2Ecreated_at_month...

Image

Consequently, the access to the partitionData map by field name is returning a nil empty interface. Then the partition filter isn't applied correctly to the manifest entry and is filtered out, resulting in an empty slice of table.ScanFileTask.

From what I can tell, the partitionData map comes from the iceberg.DataFile that is instantiated from decoding the Avro manifest entry. Placing a breakpoint here in the fetchManifestEntries function I can see the following as the result of the decoded Avro...

Image

Is this maybe some Avro decoding quirk that hasn't been accounted for? I don't believe there's any issues with the manner in which the data is being written, as I've been able to reproduce this regardless of whether I've written the data via Spark or the Iceberg sink connector.

Thanks in advance!

GabrielM98 avatar Apr 25 '25 13:04 GabrielM98

can you share the full schema and possibly the code used to write the data?

I think I know the fix here but it seems weird to me. I'm also curious if you try reading the data with pyiceberg whether you run into the same issue or not.

zeroshade avatar Apr 25 '25 16:04 zeroshade

Thanks for the quick response @zeroshade!

can you share the full schema

The schema looks like so...

{
    "type": "struct",
    "schema-id": 0,
    "fields": [
      {
        "id": 1,
        "name": "event_metadata",
        "required": false,
        "type": {
          "type": "struct",
          "fields": [
            {
              "id": 6,
              "name": "uuid",
              "required": false,
              "type": "string"
            },
            {
              "id": 7,
              "name": "source",
              "required": false,
              "type": {
                "type": "struct",
                "fields": [
                  {
                    "id": 12,
                    "name": "type",
                    "required": false,
                    "type": "string"
                  },
                  {
                    "id": 13,
                    "name": "id",
                    "required": false,
                    "type": "string"
                  },
                  {
                    "id": 14,
                    "name": "service_id",
                    "required": false,
                    "type": "string"
                  }
                ]
              }
            },
            {
              "id": 8,
              "name": "subjects",
              "required": false,
              "type": {
                "type": "list",
                "element-id": 15,
                "element": {
                  "type": "struct",
                  "fields": [
                    {
                      "id": 16,
                      "name": "type",
                      "required": false,
                      "type": "string"
                    },
                    {
                      "id": 17,
                      "name": "id",
                      "required": false,
                      "type": "string"
                    },
                    {
                      "id": 18,
                      "name": "service_id",
                      "required": false,
                      "type": "string"
                    }
                  ]
                },
                "element-required": false
              }
            },
            {
              "id": 9,
              "name": "timing",
              "required": false,
              "type": {
                "type": "struct",
                "fields": [
                  {
                    "id": 19,
                    "name": "created_at",
                    "required": false,
                    "type": "timestamptz"
                  },
                  {
                    "id": 20,
                    "name": "emitted_at",
                    "required": false,
                    "type": "timestamptz"
                  }
                ]
              }
            },
            {
              "id": 10,
              "name": "collection",
              "required": false,
              "type": {
                "type": "struct",
                "fields": [
                  {
                    "id": 21,
                    "name": "collections",
                    "required": false,
                    "type": {
                      "type": "list",
                      "element-id": 22,
                      "element": {
                        "type": "struct",
                        "fields": [
                          {
                            "id": 23,
                            "name": "type",
                            "required": false,
                            "type": "string"
                          },
                          {
                            "id": 24,
                            "name": "id",
                            "required": false,
                            "type": "string"
                          },
                          {
                            "id": 25,
                            "name": "service_id",
                            "required": false,
                            "type": "string"
                          }
                        ]
                      },
                      "element-required": false
                    }
                  }
                ]
              }
            },
            {
              "id": 11,
              "name": "parent_event_uuid",
              "required": false,
              "type": "string"
            }
          ]
        }
      },
      {
        "id": 2,
        "name": "update_type",
        "required": false,
        "type": "string"
      },
      {
        "id": 3,
        "name": "old_value",
        "required": false,
        "type": "string"
      },
      {
        "id": 4,
        "name": "new_value",
        "required": false,
        "type": "string"
      },
      {
        "id": 5,
        "name": "user_uuid",
        "required": false,
        "type": "string"
      }
    ]
  }

the code used to write the data?

The data is being written via the Kafka Connect Apache Iceberg Sink Connector, which we're building as per these instructions from the Iceberg docs, using the latest version (1.8.1). I also used PySpark (via a Docker container running the tabulario/spark-iceberg:3.5.5_1.8.1 image) to write to the same table, but still experienced the same issue when attempting to query it via iceberg-go.

I'm also curious if you try reading the data with pyiceberg whether you run into the same issue or not.

Will give this a go on Monday 👍

GabrielM98 avatar Apr 25 '25 16:04 GabrielM98

Hey @zeroshade, apologies for not doing this on Monday as promised but I tried querying the table with PyIceberg and was able to do so successfully.

GabrielM98 avatar Apr 30 '25 16:04 GabrielM98

I think the real culprit here is looking up the partition fields by name and not by id https://github.com/apache/iceberg-go/blob/091352672b4191a4bb11b603c1fb9bd2ab6c2aaf/table/scanner.go#L116-L125

pyiceberg uses the partition's source_id https://github.com/apache/iceberg-python/blob/34c89494c39916b9b1aa7e6da2c24c34c4d7f058/pyiceberg/partitioning.py#L412-L423

kevinjqliu avatar May 05 '25 18:05 kevinjqliu

this is a good test case from the pyiceberg side https://github.com/apache/iceberg-python/blob/main/tests/integration/test_writes/test_writes.py#L1158-L1177

kevinjqliu avatar May 05 '25 18:05 kevinjqliu

the only time we need to sanitize column names is when writing https://github.com/apache/iceberg-python/blob/34c89494c39916b9b1aa7e6da2c24c34c4d7f058/pyiceberg/io/pyarrow.py#L2418-L2422 https://github.com/apache/iceberg-python/issues/584

kevinjqliu avatar May 05 '25 18:05 kevinjqliu

fixed by #427

zeroshade avatar May 13 '25 18:05 zeroshade