Incremental Changelog Scan
Feature Request / Improvement
Hello,
The Java API supports performing a table scan for just the incremental changes between two snapshots: https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/IncrementalChangelogScan.html
How feasible would it be to support the same in PyIceberg? Are there any special considerations to make?
Thanks!
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'
Reopening issue as it was marked stale.
This would be great. In the meantime I naively hacked this to get newly appended rows -- seems to work for my use case. Looking at the code, wouldn't this feature be easier to implement if plan_files allowed to pass an optional screenshot_id argument?
https://github.com/apache/iceberg-python/blob/861c5631587f0d54e2550733d0f8557d57f5060a/pyiceberg/table/init.py#L1929-L1937
from typing import Iterable, Optional, Tuple, Union
from pyiceberg.table import (
DataScan, FileScanTask, Table, Properties, ALWAYS_TRUE, EMPTY_DICT, BooleanExpression
)
class AppendScan(DataScan):
start_snapshot_id: int | None = None
@classmethod
def from_table(cls, table: Table,
row_filter: Union[str, BooleanExpression] = ALWAYS_TRUE,
selected_fields: Tuple[str, ...] = ("*",),
case_sensitive: bool = True,
start_snapshot_id: Optional[int] = None,
snapshot_id: Optional[int] = None,
options: Properties = EMPTY_DICT,
limit: Optional[int] = None,
) -> DataScan:
instance = cls(
table_metadata=table.metadata,
io=table.io,
row_filter=row_filter,
selected_fields=selected_fields,
case_sensitive=case_sensitive,
snapshot_id=snapshot_id,
options=options,
limit=limit,
)
instance.start_snapshot_id = start_snapshot_id
return instance
def plan_files(self) -> Iterable[FileScanTask]:
current_plan = super().plan_files()
if self.start_snapshot_id is None:
return current_plan
# We need to filter out the files that were already in the old snapshot
try:
orig_snapshot_id = self.snapshot_id
self.snapshot_id = self.start_snapshot_id
prev_plan = super().plan_files()
return [task for task in current_plan if task not in prev_plan]
# Restore the snapshot id
finally:
self.snapshot_id = orig_snapshot_id
append_scan = AppendScan.from_table(product, start_snapshot_id=product.history()[-2].snapshot_id)
append_scan.to_pandas()
Hi folks, any update on this? I see https://github.com/apache/iceberg-python/pull/533 and https://github.com/apache/iceberg-python/pull/782, but unsure how close they are. Is this being worked on currently?
Reading PyIceberg tables incrementally is crucial for us. Happy to help out here!
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'
not stale! there are a few prs related to this, i'll take a look after the 0.10 release