iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

[🐞] Collection of a few bugs

Open kevinjqliu opened this issue 1 year ago • 12 comments

Apache Iceberg version

None

Please describe the bug 🐞

While working on a Python REST server implementation (https://github.com/kevinjqliu/iceberg-rest-catalog), I noticed a few Pyiceberg bugs while integrating with PySpark. The REST server implementation is using SqlCatalog as the underlying catalog.

Here's the collection of bugs that I would like to upstream (https://github.com/apache/iceberg-python/compare/main...kevinjqliu:iceberg-python:kevinjqliu/iceberg-rest-catalog) To summarize:

  • [x] rest.py's drop_namespace function is missing handling the 409 status code. (#868)
  • [x] sql.py's drop_namespace function does not return NoSuchNamespaceError when namespace is missing. (#865)
  • [x] sql.py's update_namespace_properties function only updates the first property and drops the rest. (#873)
  • [ ] TableMetadata is initialized with the default Pydantic object for schema, partition_spec, and sort_order, which does not play well with table updates.

I'll be happy to work on this but I think these are "good first issues". We can break these up into separate PRs with the relevant tests.

kevinjqliu avatar Jun 27 '24 17:06 kevinjqliu

@kevinjqliu, I've just created a PR for the second item on the list.

uatach avatar Jun 28 '24 02:06 uatach

@kevinjqliu created a PR to fix 3rd item on the list

ranli avatar Jun 30 '24 03:06 ranli

TableMetadata is initialized with the default Pydantic object for schema, partition_spec, and sort_order, which does not play well with table updates.

Hi @kevinjqliu. Could you please elaborate more on this? I am curious of the corresponding use-cases that are affected by default values of these fields.

I remembered I also encountered similar difficulties when working on https://github.com/apache/iceberg-python/pull/498 and I ended up with a workaround to let update_table_metadata handle some "initial" table updates specially.

HonahX avatar Jul 10 '24 04:07 HonahX

@HonahX Ok so I've found a way to reproduce this. Requires a bit of a setup. I found this while working on https://github.com/kevinjqliu/iceberg-rest-catalog and testing its integration with Spark.

Setup

Have pyiceberg's integration test docker running

make test-integration

Pull the kevinjqliu/iceberg-rest-catalog repo and run the fastapi server locally

poetry install
poetry run fastapi dev src/iceberg_rest/main.py

It requires an on-disk sqlite db located in /tmp/warehouse so you also need to create the dir

mkdir /tmp/warehouse

Reset the iceberg-python vendor folder to refer to main branch (tracking the official project's main branch), instead of the branch with the fixes.

cd vendor/iceberg-python
git checkout main
git pull 

Run Spark test

Have fastapi server running in one terminal.

poetry run fastapi dev src/iceberg_rest/main.py

In another terminal, run the spark test

poetry run python tests/pyspark_test.py

The fastapi server will error, sending HTTP 500 request.

Here's the stack trace

INFO:     127.0.0.1:55910 - "POST /v1/namespaces/default/tables/test_null_nan HTTP/1.1" 500 Internal Server Error
ERROR:    Exception in ASGI application
Traceback (most recent call last):
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 399, in run_asgi
    result = await app(  # type: ignore[func-returns-value]
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 70, in __call__
    return await self.app(scope, receive, send)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
    await super().__call__(scope, receive, send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
    raise exc
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
    await self.app(scope, receive, _send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
    await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
    await self.middleware_stack(scope, receive, send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
    await route.handle(scope, receive, send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
    await self.app(scope, receive, send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
    await wrap_app_handling_exceptions(app, request)(scope, receive, send)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
    raise exc
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
    await app(scope, receive, sender)
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 72, in app
    response = await func(request)
               ^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 278, in app
    raw_response = await run_endpoint_function(
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 193, in run_endpoint_function
    return await run_in_threadpool(dependant.call, **values)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/concurrency.py", line 42, in run_in_threadpool
    return await anyio.to_thread.run_sync(func, *args)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
    return await get_async_backend().run_sync_in_worker_thread(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2177, in run_sync_in_worker_thread
    return await future
           ^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 859, in run
    result = context.run(func, *args)
             ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/src/iceberg_rest/api/catalog_api.py", line 453, in update_table
    resp = catalog._commit_table(commit_table_request)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/catalog/sql.py", line 423, in _commit_table
    updated_staged_table = self._update_and_stage_table(current_table, table_request)
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/catalog/__init__.py", line 781, in _update_and_stage_table
    updated_metadata = update_table_metadata(
                       ^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/table/__init__.py", line 1179, in update_table_metadata
    new_metadata = _apply_table_update(update, new_metadata, context)
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/.pyenv/versions/3.11.0/lib/python3.11/functools.py", line 909, in wrapper
    return dispatch(args[0].__class__)(*args, **kw)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/table/__init__.py", line 1022, in _
    raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")
ValueError: Partition spec with id 0 already exists: []

From here, you can add breakpoints to the codebase in vendor/iceberg-python and debug.

Here are the fixes that I added to make the spark test work https://github.com/apache/iceberg-python/compare/main...kevinjqliu:iceberg-python:kevinjqliu/iceberg-rest-catalog

kevinjqliu avatar Jul 12 '24 03:07 kevinjqliu

I dont think the REST server API is at fault here since this request corresponds with the update_table function. Which just serialize the CommitTableRequest object and pass it to _commit_table.

INFO:     127.0.0.1:55910 - "POST /v1/namespaces/default/tables/test_null_nan HTTP/1.1" 500 Internal Server Error

And since I assume Spark is sending the right request, it has to be an issue downstream of _commit_table

kevinjqliu avatar Jul 12 '24 03:07 kevinjqliu

TableMetadata is initialized with the default Pydantic object for schema, partition_spec, and sort_order, which does not play well with table updates.

Hi @kevinjqliu - I just added a test case similar to yours and verified that it works with the tabular rest catalog image in the test suite. Could we double check if this is an issue in the Rest catalog implementation?

Branch with test: https://github.com/syun64/iceberg-python/tree/rest-integration

The test is: pytest tests/integration/test_rest.py::test_write_round_trip

I would appreciate you double checking my work as well to make sure it correctly replicates your test!

sungwy avatar Jul 19 '24 19:07 sungwy

Thanks for taking a look at this!

I reran the steps above; the issue is actually with the CREATE OR REPLACE TABLE statement https://github.com/kevinjqliu/iceberg-rest-catalog/blob/main/tests/pyspark_test.py#L94-L108

I verified that all Pyspark statements work with the tabular REST catalog. https://github.com/kevinjqliu/iceberg-rest-catalog/blob/7c5548133ae266d4fac215b063911c35f08461d9/tests/pyspark_test.py#L75-L116

To reproduce, add this test to test_rest_manifest.py and running pytest tests/integration/test_rest_manifest.py::test_write_round_trip -v -m integration

from pyspark.sql import SparkSession

@pytest.mark.integration
def test_write_round_trip(catalog: Catalog, spark: SparkSession) -> None:
    from pyiceberg.exceptions import NoSuchTableError
    from pyiceberg.schema import Schema
    from pyiceberg.types import NestedField, UUIDType, FixedType
    schema = Schema(
        NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False),
        NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False),
    )

    identifier = "default.test_table_write_round_trip"
    try:
        catalog.drop_table(identifier)
    except NoSuchTableError:
        pass

    catalog.create_table(
        identifier=identifier, schema=schema
    )

    # write to table with spark sql
    spark.sql(
        f"""
        INSERT INTO integration.{identifier} VALUES
        ('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)),
        ('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)),
        ('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)),
        ('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)),
        ('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY));
        """
    )

    tbl = catalog.load_table(identifier)
    assert tbl.schema() == schema
    df = tbl.scan().to_arrow().to_pandas()
    assert len(df) == 5
    assert b"1234567890123456789012345" in df["fixed_col"].to_list()

    # create a table with spark sql
    spark.sql(
        f"""
        CREATE OR REPLACE TABLE integration.{identifier}
        USING iceberg
        AS SELECT
        1            AS idx,
        float('NaN') AS col_numeric
    UNION ALL SELECT
        2            AS idx,
        null         AS col_numeric
    UNION ALL SELECT
        3            AS idx,
        1            AS col_numeric
    """
    )

    # write to table with spark sql
    spark.sql(
        f"""
        INSERT INTO integration.{identifier} VALUES
        (4, 999);
        """
    )

Could we double check if this is an issue in the Rest catalog implementation?

The issue is with this specific Rest catalog implementation. However, the implementation uses the Pyiceberg library to process catalog requests. CREATE OR REPLACE TABLE triggers an update_table request to the catalog. And the catalog is implemented by proxying the request directly to the underlying pyiceberg function https://github.com/kevinjqliu/iceberg-rest-catalog/blob/7c5548133ae266d4fac215b063911c35f08461d9/src/iceberg_rest/api/catalog_api.py#L453

I believe the issue is with the _commit_table function and how it deals with the incoming CommitTableRequest

kevinjqliu avatar Jul 19 '24 20:07 kevinjqliu

I think this is related to the CreateTableTransaction use-case and the current limitation of update_table_metadata, which is used in all non-rest catalog's _commit_table.

As @kevinjqliu mentioned in the issue description

TableMetadata is initialized with the default Pydantic object for schema, partition_spec, and sort_order, which does not play well with table updates.

This causes problem in the CreateTransaction case. Currently, our workaround is to initialize the table metadata https://github.com/apache/iceberg-python/blob/846fc0886d0e5cdfea9448e7b769f6e660a5c786/pyiceberg/catalog/init.py#L913-L924

and make some special note to some of TableUpdate that this is the initial_change https://github.com/apache/iceberg-python/blob/fd2af567f225754bd3e4ece69fac05e0dd7dc7dc/pyiceberg/table/init.py#L723-L725

such that update_table_metadata will handle those updates in a separate way to accommodate the default value in the "empty" metadata.

However, this won't work in the case of RestCatalog server implementation, where the TableUpdates come from arbitrary Rest Client (for example Spark) do not contain the extra special note.

So, I agree with @kevinjqliu that there is some room for improvement for update_table_metadata or TableMetadata here.

HonahX avatar Jul 19 '24 22:07 HonahX

Thanks for the pointer! Looks like this is from the interactions between CreateTableTransaction, StagedTable, and the table updates. I found a difference in implementation between Java and Python. The initial_change field in table update classes is python specific. And defaults to False for updates in CommitTableRequest.

Example

I was able to capture the request objects that Spark sends to the REST catalog server.

For CREATE OR REPLACE TABLE, spark sends multiple HTTP requests. The 2 relevant ones are:

POST /v1/namespaces/default/tables

Request:

CreateTableRequest(
    name='test_null_nan', 
    location=None, 
    schema=Schema(
        NestedField(field_id=0, name='idx', field_type=IntegerType(), required=False), 
        NestedField(field_id=1, name='col_numeric', field_type=FloatType(), required=False), 
        schema_id=0, 
        identifier_field_ids=[]
    ), 
    partition_spec=PartitionSpec(spec_id=0), 
    write_order=None, 
    stage_create=True, 
    properties={'owner': 'kevinliu'}
)

Response:

LoadTableResult(
    metadata_location='s3://warehouse/rest/default.db/test_null_nan/metadata/00000-30f7e048-7033-4d95-a130-e5cd3683d2d1.metadata.json',
    metadata=TableMetadataV2(
        location='s3://warehouse/rest/default.db/test_null_nan',
        table_uuid=UUID('575c87cb-14c2-4480-b9de-d2f55f28d7d8'),
        last_updated_ms=1721514513906,
        last_column_id=2,
        schemas=[
            Schema(
                NestedField(field_id=1, name='idx', field_type=IntegerType(), required=False),
                NestedField(field_id=2, name='col_numeric', field_type=FloatType(), required=False),
                schema_id=0,
                identifier_field_ids=[]
            )
        ],
        current_schema_id=0,
        partition_specs=[
            PartitionSpec(spec_id=0)
        ],
        default_spec_id=0,
        last_partition_id=999,
        properties={'owner': 'kevinliu'},
        current_snapshot_id=None,
        snapshots=[],
        snapshot_log=[],
        metadata_log=[],
        sort_orders=[
            SortOrder(order_id=0)
        ],
        default_sort_order_id=0,
        refs={},
        format_version=2,
        last_sequence_number=0
    ),
    config={'owner': 'kevinliu'}
)

POST /v1/namespaces/default/tables/test_null_nan

CommitTableRequest(
    identifier=TableIdentifier(
        namespace=Namespace(root=['default']),
        name='test_null_nan'
    ),
    requirements=[
        AssertCreate(type='assert-create')
    ],
    updates=[
        AssignUUIDUpdate(
            action='assign-uuid',
            uuid=UUID('575c87cb-14c2-4480-b9de-d2f55f28d7d8')
        ),
        UpgradeFormatVersionUpdate(
            action='upgrade-format-version',
            format_version=2
        ),
        AddSchemaUpdate(
            action='add-schema',
            schema_=Schema(
                NestedField(field_id=1, name='idx', field_type=IntegerType(), required=False),
                NestedField(field_id=2, name='col_numeric', field_type=FloatType(), required=False),
                schema_id=0,
                identifier_field_ids=[]
            ),
            last_column_id=2,
            initial_change=False
        ),
        SetCurrentSchemaUpdate(
            action='set-current-schema',
            schema_id=-1
        ),
        AddPartitionSpecUpdate( # ValueError: Partition spec with id 0 already exists: []
            action='add-spec',
            spec=PartitionSpec(spec_id=0),
            initial_change=False
        ),
        SetDefaultSpecUpdate(
            action='set-default-spec',
            spec_id=-1
        ),
        AddSortOrderUpdate(
            action='add-sort-order',
            sort_order=SortOrder(order_id=0),
            initial_change=False
        ),
        SetDefaultSortOrderUpdate(
            action='set-default-sort-order',
            sort_order_id=-1
        ),
        SetLocationUpdate(
            action='set-location',
            location='s3://warehouse/rest/default.db/test_null_nan'
        ),
        SetPropertiesUpdate(
            action='set-properties',
            updates={'owner': 'kevinliu'}
        ),
        AddSnapshotUpdate(
            action='add-snapshot',
            snapshot=Snapshot(
                snapshot_id=5611385456663920621,
                parent_snapshot_id=None,
                sequence_number=1,
                timestamp_ms=1721514574649,
                manifest_list='s3://warehouse/rest/default.db/test_null_nan/metadata/snap-5611385456663920621-1-5904fa53-d5f7-49c5-a86b-8df022d1dbdf.avro',
                summary=Summary(
                    Operation.APPEND,
                    **{
                        'spark.app.id': 'local-1721514510400',
                        'added-data-files': '3',
                        'added-records': '3',
                        'added-files-size': '1919',
                        'changed-partition-count': '1',
                        'total-records': '3',
                        'total-files-size': '1919',
                        'total-data-files': '3',
                        'total-delete-files': '0',
                        'total-position-deletes': '0',
                        'total-equality-deletes': '0'
                    }
                ),
                schema_id=0
            )
        ),
        SetSnapshotRefUpdate(
            action='set-snapshot-ref',
            ref_name='main',
            type='branch',
            snapshot_id=5611385456663920621,
            max_ref_age_ms=None,
            max_snapshot_age_ms=None,
            min_snapshots_to_keep=None
        )
    ]
)

Note, initial_change is default to False since this field isn't present in Spark

kevinjqliu avatar Jul 21 '24 18:07 kevinjqliu

For context, this is the CreateTableTransaction's self._updates after _initial_changes

AssignUUIDUpdate(
    action='assign-uuid', 
    uuid=UUID('14406c1d-1bce-43d7-99d6-6617e9c1ad80')
)

UpgradeFormatVersionUpdate(
    action='upgrade-format-version', 
    format_version=2
)

AddSchemaUpdate(
    action='add-schema', 
    schema_=Schema(
        NestedField(
            field_id=1, 
            name='foo', 
            field_type=StringType(), 
            required=False
        ), 
        schema_id=0, 
        identifier_field_ids=[]
    ), 
    last_column_id=1
)

SetCurrentSchemaUpdate(
    action='set-current-schema', 
    schema_id=-1
)

AddPartitionSpecUpdate(
    action='add-spec', 
    spec=PartitionSpec(spec_id=0)
)

SetDefaultSpecUpdate(
    action='set-default-spec', 
    spec_id=-1
)

AddSortOrderUpdate(
    action='add-sort-order', 
    sort_order=SortOrder(order_id=0)
)

SetDefaultSortOrderUpdate(
    action='set-default-sort-order', 
    sort_order_id=-1
)

SetLocationUpdate(
    action='set-location', 
    location='file:///private/var/folders/f1/3_vzsn7x1jq9hszb3z9y6f0m0000gn/T/pytest-of-kevinliu/pytest-37/test_sql0/default.db/arrow_create_table_transaction_test_sql_catalog_2'
)

SetPropertiesUpdate(
    action='set-properties', 
    updates={}
)

kevinjqliu avatar Jul 21 '24 18:07 kevinjqliu

Also, this is what _empty_table_metadata creates

from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.schema import Schema
TableMetadataV1(location="", last_column_id=-1, schema=Schema()) 

TableMetadataV1(
    location='',
    table_uuid=UUID('262bafa3-eccd-46e3-9250-e4ea753c338c'),
    last_updated_ms=1721585218373,
    last_column_id=-1,
    schemas=[Schema(schema_id=0,identifier_field_ids=[])],
    current_schema_id=0,
    partition_specs=[PartitionSpec(spec_id=0)],
    default_spec_id=0,
    last_partition_id=999,
    properties={},
    current_snapshot_id=None,
    snapshots=[],
    snapshot_log=[],
    metadata_log=[],
    sort_orders=[SortOrder(order_id=0)],
    default_sort_order_id=0,
    refs={},
    format_version=1,
    schema_=Schema(schema_id=0,identifier_field_ids=[]),
    partition_spec=PartitionSpec(spec_id=0)
)

Interestingly, it's initialized with schemas, partition_specs, and sort_orders.

kevinjqliu avatar Jul 21 '24 18:07 kevinjqliu

Opened #950, I was able to remove the initial_change field and fix the table update visitor functions to make tests pass.

I think we need more robust testing for the table update visitor functions. I also noticed that RemoveSnapshotRefUpdate was not implemented.

I think we should look at _empty_table_metadata to create a truly empty table metadata without the already initialized fields

kevinjqliu avatar Jul 21 '24 18:07 kevinjqliu