iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

RestCatalog append table is slow (2+s)

Open HungYangChang opened this issue 9 months ago • 4 comments

Question

Hello PyIceberg dev

I successfully set up lakekeeper as catalog and connect to ADLS gen 2 stoarge I found out table.append takes a long time to finish (2+s)

Here is my question:

  1. Do we have other faster way to append table into Restcatalog?
  2. If not, I started logging the detailed time of append? Is there any way to speed up writing time? (PS: I already dug into the source code and see there is append and fast_append)

Image

Here is my code

from pyiceberg.catalog.rest import RestCatalog

catalog = RestCatalog(
            name=CATALOG_NAME,
            uri=CATALOG_URL,
            warehouse=CATALOG_WAREHOUSE_PATH,
            token=CATALOG_TOKEN,
            properties={
                "adlfs.account-name": AZURE_STORAGE_ACCOUNT_NAME,
                "adlfs.container": CONTAINER_NAME,
                "adlfs.client-id": AZURE_STORAGE_CLIENT_ID,
                "adlfs.tenant-id": AZURE_STORAGE_TENANT_ID,
                "adlfs.client-secret": AZURE_STORAGE_CLIENT_SECRET,
                "client_secret": AZURE_STORAGE_CLIENT_SECRET,
                "client_id": AZURE_STORAGE_CLIENT_ID,
                "tenant_id": AZURE_STORAGE_TENANT_ID,
                "io-impl": "pyiceberg.io.fsspec.FsspecFileIO",
            }
        )

# My table is ready 

load_start = time.time()
iceberg_table = catalog.load_table(iceberg_table_identifier)
load_end = time.time()

# Perform the append with optimized options
append_start = time.time()
# solution 1 (seems slow):
iceberg_table.append(table)

# # solution 2: Use a bulk transaction instead of a direct append
# # Fail with error...
# with iceberg_table.transaction() as txn:
#     txn.append(table)
#     txn.commit_transaction()
append_end = time.time()

Thanks for your help in advance :)

HungYangChang avatar Mar 18 '25 16:03 HungYangChang

I did some dirty logging in pyiceberg.table.append

def append(self, df: pa.Table, snapshot_properties: Dict[str, str] = EMPTY_DICT) -> None:
        """
        Shorthand API for appending a PyArrow table to a table transaction.

        Args:
            df: The Arrow dataframe that will be appended to overwrite the table
            snapshot_properties: Custom properties to be added to the snapshot summary
        """
        start_append_time = time.time()
        try:
            import pyarrow as pa
        except ModuleNotFoundError as e:
            raise ModuleNotFoundError("For writes PyArrow needs to be installed") from e

        from pyiceberg.io.pyarrow import _check_pyarrow_schema_compatible, _dataframe_to_data_files

        if not isinstance(df, pa.Table):
            raise ValueError(f"Expected PyArrow table, got: {df}")

        if unsupported_partitions := [
            field for field in self.table_metadata.spec().fields if not field.transform.supports_pyarrow_transform
        ]:
            raise ValueError(
                f"Not all partition types are supported for writes. Following partitions cannot be written using pyarrow: {unsupported_partitions}."
            )
        downcast_ns_timestamp_to_us = Config().get_bool(DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE) or False
        _check_pyarrow_schema_compatible(
            self.table_metadata.schema(), provided_schema=df.schema, downcast_ns_timestamp_to_us=downcast_ns_timestamp_to_us
        )
        manifest_merge_enabled = property_as_bool(
            self.table_metadata.properties,
            TableProperties.MANIFEST_MERGE_ENABLED,
            TableProperties.MANIFEST_MERGE_ENABLED_DEFAULT,
        )
        update_snapshot = self.update_snapshot(snapshot_properties=snapshot_properties)
        append_method = update_snapshot.merge_append if manifest_merge_enabled else update_snapshot.fast_append

        logging.info(append_method)
        end_time = time.time()
        logging.info(f"set up {end_time - start_append_time:.3f} seconds")


        with append_method() as append_files:
            # skip writing data files if the dataframe is empty
            if df.shape[0] > 0:
                start_time = time.time()
                data_files = _dataframe_to_data_files(
                    table_metadata=self.table_metadata, write_uuid=append_files.commit_uuid, df=df, io=self._table.io
                )
                end_time = time.time()
                logging.info(f"_dataframe_to_data_files  {end_time - start_time:.3f} seconds")
                start_time = time.time()
                for data_file in data_files:
                    append_files.append_data_file(data_file)
                end_time = time.time()
                logging.info(f"append_data_file {end_time - start_time:.3f} seconds")
        end_append_time = time.time()
        logging.info(f"append_data_file {end_append_time - start_append_time:.3f} seconds")

Here is the result I got:

[2025-03-18T18:35:19.587Z] set up 0.018 seconds [2025-03-18T18:35:19.605Z] _dataframe_to_data_files 0.000 seconds [2025-03-18T18:35:20.342Z] append_data_file 0.838 seconds [2025-03-18T18:35:21.799Z] append_data_file 2.333 seconds [2025-03-18T18:35:22.413Z] Table append operation took 2.950 seconds [2025-03-18T18:35:22.483Z] Successfully appended data to table: inboundrequesteventv2 in 3.393 seconds [2025-03-18T18:35:22.505Z] Wrote to Iceberg in 3.395 seconds [2025-03-18T18:35:22.516Z] Total processing time: 3.398 seconds

HungYangChang avatar Mar 18 '25 18:03 HungYangChang

Hi @HungYangChang - thanks for posting the logs!

A couple of things to unpack here: _dataframe_to_data_files produces an Iterator, which means that the task of actually writing the parquet files isn't done when you log the 0.000 seconds output.

https://github.com/apache/iceberg-python/blob/a294257e6dfe6298640d377e2c96a40400c38950/pyiceberg/io/pyarrow.py#L2563-L2569

Instead, it writes the parquet files when the iterator's elements are appended to append_files, which in your logs is at this point in time:

[2025-03-18T18:35:20.342Z] append_data_file 0.838 seconds

From my observation of the logs, your commit does seem to be taking:

[2025-03-18T18:35:22.413Z] Table append operation took 2.950 seconds

Do you have access to the Lakekeeper logs that gives your information on how long it take for the Rest Catalog to process the commit request? Once it accepts the commit request, the Rest Catalog must write the metadata on its end and then return an HTTP response back to PyIceberg. It would be good to compare this number against the request->response wall time Lakekeeper is reporting for your specific commit request

sungwy avatar Mar 19 '25 01:03 sungwy

I wonder if @c-thiel has any thoughts about the best way to profile this from the Lakekeeper side? My guess is enable tracing logs?

corleyma avatar Mar 21 '25 02:03 corleyma

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Nov 14 '25 00:11 github-actions[bot]

This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'

github-actions[bot] avatar Nov 29 '25 00:11 github-actions[bot]