[🐞] Collection of a few bugs
Apache Iceberg version
None
Please describe the bug 🐞
While working on a Python REST server implementation (https://github.com/kevinjqliu/iceberg-rest-catalog), I noticed a few Pyiceberg bugs while integrating with PySpark. The REST server implementation is using SqlCatalog as the underlying catalog.
Here's the collection of bugs that I would like to upstream (https://github.com/apache/iceberg-python/compare/main...kevinjqliu:iceberg-python:kevinjqliu/iceberg-rest-catalog) To summarize:
- [x]
rest.py'sdrop_namespacefunction is missing handling the409status code. (#868) - [x]
sql.py'sdrop_namespacefunction does not returnNoSuchNamespaceErrorwhen namespace is missing. (#865) - [x]
sql.py'supdate_namespace_propertiesfunction only updates the first property and drops the rest. (#873) - [ ]
TableMetadatais initialized with the default Pydantic object forschema,partition_spec, andsort_order, which does not play well with table updates.
I'll be happy to work on this but I think these are "good first issues". We can break these up into separate PRs with the relevant tests.
@kevinjqliu, I've just created a PR for the second item on the list.
@kevinjqliu created a PR to fix 3rd item on the list
TableMetadata is initialized with the default Pydantic object for schema, partition_spec, and sort_order, which does not play well with table updates.
Hi @kevinjqliu. Could you please elaborate more on this? I am curious of the corresponding use-cases that are affected by default values of these fields.
I remembered I also encountered similar difficulties when working on https://github.com/apache/iceberg-python/pull/498 and I ended up with a workaround to let update_table_metadata handle some "initial" table updates specially.
@HonahX Ok so I've found a way to reproduce this. Requires a bit of a setup. I found this while working on https://github.com/kevinjqliu/iceberg-rest-catalog and testing its integration with Spark.
Setup
Have pyiceberg's integration test docker running
make test-integration
Pull the kevinjqliu/iceberg-rest-catalog repo and run the fastapi server locally
poetry install
poetry run fastapi dev src/iceberg_rest/main.py
It requires an on-disk sqlite db located in /tmp/warehouse so you also need to create the dir
mkdir /tmp/warehouse
Reset the iceberg-python vendor folder to refer to main branch (tracking the official project's main branch), instead of the branch with the fixes.
cd vendor/iceberg-python
git checkout main
git pull
Run Spark test
Have fastapi server running in one terminal.
poetry run fastapi dev src/iceberg_rest/main.py
In another terminal, run the spark test
poetry run python tests/pyspark_test.py
The fastapi server will error, sending HTTP 500 request.
Here's the stack trace
INFO: 127.0.0.1:55910 - "POST /v1/namespaces/default/tables/test_null_nan HTTP/1.1" 500 Internal Server Error
ERROR: Exception in ASGI application
Traceback (most recent call last):
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/uvicorn/protocols/http/httptools_impl.py", line 399, in run_asgi
result = await app( # type: ignore[func-returns-value]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/uvicorn/middleware/proxy_headers.py", line 70, in __call__
return await self.app(scope, receive, send)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/fastapi/applications.py", line 1054, in __call__
await super().__call__(scope, receive, send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/applications.py", line 123, in __call__
await self.middleware_stack(scope, receive, send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 186, in __call__
raise exc
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/middleware/errors.py", line 164, in __call__
await self.app(scope, receive, _send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/middleware/exceptions.py", line 65, in __call__
await wrap_app_handling_exceptions(self.app, conn)(scope, receive, send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
raise exc
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
await app(scope, receive, sender)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 756, in __call__
await self.middleware_stack(scope, receive, send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 776, in app
await route.handle(scope, receive, send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 297, in handle
await self.app(scope, receive, send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 77, in app
await wrap_app_handling_exceptions(app, request)(scope, receive, send)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 64, in wrapped_app
raise exc
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/_exception_handler.py", line 53, in wrapped_app
await app(scope, receive, sender)
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/routing.py", line 72, in app
response = await func(request)
^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 278, in app
raw_response = await run_endpoint_function(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/fastapi/routing.py", line 193, in run_endpoint_function
return await run_in_threadpool(dependant.call, **values)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/starlette/concurrency.py", line 42, in run_in_threadpool
return await anyio.to_thread.run_sync(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/anyio/to_thread.py", line 56, in run_sync
return await get_async_backend().run_sync_in_worker_thread(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 2177, in run_sync_in_worker_thread
return await future
^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/.venv/lib/python3.11/site-packages/anyio/_backends/_asyncio.py", line 859, in run
result = context.run(func, *args)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/src/iceberg_rest/api/catalog_api.py", line 453, in update_table
resp = catalog._commit_table(commit_table_request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/catalog/sql.py", line 423, in _commit_table
updated_staged_table = self._update_and_stage_table(current_table, table_request)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/catalog/__init__.py", line 781, in _update_and_stage_table
updated_metadata = update_table_metadata(
^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/table/__init__.py", line 1179, in update_table_metadata
new_metadata = _apply_table_update(update, new_metadata, context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/.pyenv/versions/3.11.0/lib/python3.11/functools.py", line 909, in wrapper
return dispatch(args[0].__class__)(*args, **kw)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/Users/kevinliu/repos/iceberg-rest-catalog/vendor/iceberg-python/pyiceberg/table/__init__.py", line 1022, in _
raise ValueError(f"Partition spec with id {spec.spec_id} already exists: {spec}")
ValueError: Partition spec with id 0 already exists: []
From here, you can add breakpoints to the codebase in vendor/iceberg-python and debug.
Here are the fixes that I added to make the spark test work https://github.com/apache/iceberg-python/compare/main...kevinjqliu:iceberg-python:kevinjqliu/iceberg-rest-catalog
I dont think the REST server API is at fault here since this request corresponds with the update_table function.
Which just serialize the CommitTableRequest object and pass it to _commit_table.
INFO: 127.0.0.1:55910 - "POST /v1/namespaces/default/tables/test_null_nan HTTP/1.1" 500 Internal Server Error
And since I assume Spark is sending the right request, it has to be an issue downstream of _commit_table
TableMetadata is initialized with the default Pydantic object for schema, partition_spec, and sort_order, which does not play well with table updates.
Hi @kevinjqliu - I just added a test case similar to yours and verified that it works with the tabular rest catalog image in the test suite. Could we double check if this is an issue in the Rest catalog implementation?
Branch with test: https://github.com/syun64/iceberg-python/tree/rest-integration
The test is: pytest tests/integration/test_rest.py::test_write_round_trip
I would appreciate you double checking my work as well to make sure it correctly replicates your test!
Thanks for taking a look at this!
I reran the steps above; the issue is actually with the CREATE OR REPLACE TABLE statement
https://github.com/kevinjqliu/iceberg-rest-catalog/blob/main/tests/pyspark_test.py#L94-L108
I verified that all Pyspark statements work with the tabular REST catalog. https://github.com/kevinjqliu/iceberg-rest-catalog/blob/7c5548133ae266d4fac215b063911c35f08461d9/tests/pyspark_test.py#L75-L116
To reproduce, add this test to test_rest_manifest.py and running pytest tests/integration/test_rest_manifest.py::test_write_round_trip -v -m integration
from pyspark.sql import SparkSession
@pytest.mark.integration
def test_write_round_trip(catalog: Catalog, spark: SparkSession) -> None:
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.schema import Schema
from pyiceberg.types import NestedField, UUIDType, FixedType
schema = Schema(
NestedField(field_id=1, name="uuid_col", field_type=UUIDType(), required=False),
NestedField(field_id=2, name="fixed_col", field_type=FixedType(25), required=False),
)
identifier = "default.test_table_write_round_trip"
try:
catalog.drop_table(identifier)
except NoSuchTableError:
pass
catalog.create_table(
identifier=identifier, schema=schema
)
# write to table with spark sql
spark.sql(
f"""
INSERT INTO integration.{identifier} VALUES
('102cb62f-e6f8-4eb0-9973-d9b012ff0967', CAST('1234567890123456789012345' AS BINARY)),
('ec33e4b2-a834-4cc3-8c4a-a1d3bfc2f226', CAST('1231231231231231231231231' AS BINARY)),
('639cccce-c9d2-494a-a78c-278ab234f024', CAST('12345678901234567ass12345' AS BINARY)),
('c1b0d8e0-0b0e-4b1e-9b0a-0e0b0d0c0a0b', CAST('asdasasdads12312312312111' AS BINARY)),
('923dae77-83d6-47cd-b4b0-d383e64ee57e', CAST('qweeqwwqq1231231231231111' AS BINARY));
"""
)
tbl = catalog.load_table(identifier)
assert tbl.schema() == schema
df = tbl.scan().to_arrow().to_pandas()
assert len(df) == 5
assert b"1234567890123456789012345" in df["fixed_col"].to_list()
# create a table with spark sql
spark.sql(
f"""
CREATE OR REPLACE TABLE integration.{identifier}
USING iceberg
AS SELECT
1 AS idx,
float('NaN') AS col_numeric
UNION ALL SELECT
2 AS idx,
null AS col_numeric
UNION ALL SELECT
3 AS idx,
1 AS col_numeric
"""
)
# write to table with spark sql
spark.sql(
f"""
INSERT INTO integration.{identifier} VALUES
(4, 999);
"""
)
Could we double check if this is an issue in the Rest catalog implementation?
The issue is with this specific Rest catalog implementation. However, the implementation uses the Pyiceberg library to process catalog requests.
CREATE OR REPLACE TABLE triggers an update_table request to the catalog. And the catalog is implemented by proxying the request directly to the underlying pyiceberg function https://github.com/kevinjqliu/iceberg-rest-catalog/blob/7c5548133ae266d4fac215b063911c35f08461d9/src/iceberg_rest/api/catalog_api.py#L453
I believe the issue is with the _commit_table function and how it deals with the incoming CommitTableRequest
I think this is related to the CreateTableTransaction use-case and the current limitation of update_table_metadata, which is used in all non-rest catalog's _commit_table.
As @kevinjqliu mentioned in the issue description
TableMetadata is initialized with the default Pydantic object for schema, partition_spec, and sort_order, which does not play well with table updates.
This causes problem in the CreateTransaction case. Currently, our workaround is to initialize the table metadata
https://github.com/apache/iceberg-python/blob/846fc0886d0e5cdfea9448e7b769f6e660a5c786/pyiceberg/catalog/init.py#L913-L924
and make some special note to some of TableUpdate that this is the initial_change
https://github.com/apache/iceberg-python/blob/fd2af567f225754bd3e4ece69fac05e0dd7dc7dc/pyiceberg/table/init.py#L723-L725
such that update_table_metadata will handle those updates in a separate way to accommodate the default value in the "empty" metadata.
However, this won't work in the case of RestCatalog server implementation, where the TableUpdates come from arbitrary Rest Client (for example Spark) do not contain the extra special note.
So, I agree with @kevinjqliu that there is some room for improvement for update_table_metadata or TableMetadata here.
Thanks for the pointer! Looks like this is from the interactions between CreateTableTransaction, StagedTable, and the table updates.
I found a difference in implementation between Java and Python. The initial_change field in table update classes is python specific. And defaults to False for updates in CommitTableRequest.
Example
I was able to capture the request objects that Spark sends to the REST catalog server.
For CREATE OR REPLACE TABLE, spark sends multiple HTTP requests. The 2 relevant ones are:
POST /v1/namespaces/default/tables
Request:
CreateTableRequest(
name='test_null_nan',
location=None,
schema=Schema(
NestedField(field_id=0, name='idx', field_type=IntegerType(), required=False),
NestedField(field_id=1, name='col_numeric', field_type=FloatType(), required=False),
schema_id=0,
identifier_field_ids=[]
),
partition_spec=PartitionSpec(spec_id=0),
write_order=None,
stage_create=True,
properties={'owner': 'kevinliu'}
)
Response:
LoadTableResult(
metadata_location='s3://warehouse/rest/default.db/test_null_nan/metadata/00000-30f7e048-7033-4d95-a130-e5cd3683d2d1.metadata.json',
metadata=TableMetadataV2(
location='s3://warehouse/rest/default.db/test_null_nan',
table_uuid=UUID('575c87cb-14c2-4480-b9de-d2f55f28d7d8'),
last_updated_ms=1721514513906,
last_column_id=2,
schemas=[
Schema(
NestedField(field_id=1, name='idx', field_type=IntegerType(), required=False),
NestedField(field_id=2, name='col_numeric', field_type=FloatType(), required=False),
schema_id=0,
identifier_field_ids=[]
)
],
current_schema_id=0,
partition_specs=[
PartitionSpec(spec_id=0)
],
default_spec_id=0,
last_partition_id=999,
properties={'owner': 'kevinliu'},
current_snapshot_id=None,
snapshots=[],
snapshot_log=[],
metadata_log=[],
sort_orders=[
SortOrder(order_id=0)
],
default_sort_order_id=0,
refs={},
format_version=2,
last_sequence_number=0
),
config={'owner': 'kevinliu'}
)
POST /v1/namespaces/default/tables/test_null_nan
CommitTableRequest(
identifier=TableIdentifier(
namespace=Namespace(root=['default']),
name='test_null_nan'
),
requirements=[
AssertCreate(type='assert-create')
],
updates=[
AssignUUIDUpdate(
action='assign-uuid',
uuid=UUID('575c87cb-14c2-4480-b9de-d2f55f28d7d8')
),
UpgradeFormatVersionUpdate(
action='upgrade-format-version',
format_version=2
),
AddSchemaUpdate(
action='add-schema',
schema_=Schema(
NestedField(field_id=1, name='idx', field_type=IntegerType(), required=False),
NestedField(field_id=2, name='col_numeric', field_type=FloatType(), required=False),
schema_id=0,
identifier_field_ids=[]
),
last_column_id=2,
initial_change=False
),
SetCurrentSchemaUpdate(
action='set-current-schema',
schema_id=-1
),
AddPartitionSpecUpdate( # ValueError: Partition spec with id 0 already exists: []
action='add-spec',
spec=PartitionSpec(spec_id=0),
initial_change=False
),
SetDefaultSpecUpdate(
action='set-default-spec',
spec_id=-1
),
AddSortOrderUpdate(
action='add-sort-order',
sort_order=SortOrder(order_id=0),
initial_change=False
),
SetDefaultSortOrderUpdate(
action='set-default-sort-order',
sort_order_id=-1
),
SetLocationUpdate(
action='set-location',
location='s3://warehouse/rest/default.db/test_null_nan'
),
SetPropertiesUpdate(
action='set-properties',
updates={'owner': 'kevinliu'}
),
AddSnapshotUpdate(
action='add-snapshot',
snapshot=Snapshot(
snapshot_id=5611385456663920621,
parent_snapshot_id=None,
sequence_number=1,
timestamp_ms=1721514574649,
manifest_list='s3://warehouse/rest/default.db/test_null_nan/metadata/snap-5611385456663920621-1-5904fa53-d5f7-49c5-a86b-8df022d1dbdf.avro',
summary=Summary(
Operation.APPEND,
**{
'spark.app.id': 'local-1721514510400',
'added-data-files': '3',
'added-records': '3',
'added-files-size': '1919',
'changed-partition-count': '1',
'total-records': '3',
'total-files-size': '1919',
'total-data-files': '3',
'total-delete-files': '0',
'total-position-deletes': '0',
'total-equality-deletes': '0'
}
),
schema_id=0
)
),
SetSnapshotRefUpdate(
action='set-snapshot-ref',
ref_name='main',
type='branch',
snapshot_id=5611385456663920621,
max_ref_age_ms=None,
max_snapshot_age_ms=None,
min_snapshots_to_keep=None
)
]
)
Note, initial_change is default to False since this field isn't present in Spark
For context, this is the CreateTableTransaction's self._updates after _initial_changes
AssignUUIDUpdate(
action='assign-uuid',
uuid=UUID('14406c1d-1bce-43d7-99d6-6617e9c1ad80')
)
UpgradeFormatVersionUpdate(
action='upgrade-format-version',
format_version=2
)
AddSchemaUpdate(
action='add-schema',
schema_=Schema(
NestedField(
field_id=1,
name='foo',
field_type=StringType(),
required=False
),
schema_id=0,
identifier_field_ids=[]
),
last_column_id=1
)
SetCurrentSchemaUpdate(
action='set-current-schema',
schema_id=-1
)
AddPartitionSpecUpdate(
action='add-spec',
spec=PartitionSpec(spec_id=0)
)
SetDefaultSpecUpdate(
action='set-default-spec',
spec_id=-1
)
AddSortOrderUpdate(
action='add-sort-order',
sort_order=SortOrder(order_id=0)
)
SetDefaultSortOrderUpdate(
action='set-default-sort-order',
sort_order_id=-1
)
SetLocationUpdate(
action='set-location',
location='file:///private/var/folders/f1/3_vzsn7x1jq9hszb3z9y6f0m0000gn/T/pytest-of-kevinliu/pytest-37/test_sql0/default.db/arrow_create_table_transaction_test_sql_catalog_2'
)
SetPropertiesUpdate(
action='set-properties',
updates={}
)
Also, this is what _empty_table_metadata creates
from pyiceberg.table.metadata import TableMetadataV1
from pyiceberg.schema import Schema
TableMetadataV1(location="", last_column_id=-1, schema=Schema())
TableMetadataV1(
location='',
table_uuid=UUID('262bafa3-eccd-46e3-9250-e4ea753c338c'),
last_updated_ms=1721585218373,
last_column_id=-1,
schemas=[Schema(schema_id=0,identifier_field_ids=[])],
current_schema_id=0,
partition_specs=[PartitionSpec(spec_id=0)],
default_spec_id=0,
last_partition_id=999,
properties={},
current_snapshot_id=None,
snapshots=[],
snapshot_log=[],
metadata_log=[],
sort_orders=[SortOrder(order_id=0)],
default_sort_order_id=0,
refs={},
format_version=1,
schema_=Schema(schema_id=0,identifier_field_ids=[]),
partition_spec=PartitionSpec(spec_id=0)
)
Interestingly, it's initialized with schemas, partition_specs, and sort_orders.
Opened #950, I was able to remove the initial_change field and fix the table update visitor functions to make tests pass.
I think we need more robust testing for the table update visitor functions. I also noticed that RemoveSnapshotRefUpdate was not implemented.
I think we should look at _empty_table_metadata to create a truly empty table metadata without the already initialized fields