feat/2372 extend dataset querying API for incremental load
Implements #2372
Deploy Preview for dlt-hub-docs canceled.
| Name | Link |
|---|---|
| Latest commit | 06d5fd49e056ca96101eb5ca4509abb44ad39dca |
| Latest deploy log | https://app.netlify.com/sites/dlt-hub-docs/deploys/67edb1c22a86e500081e61ee |
@zilto after working more on the transformations, I have a few changes to the original ticket:
we need one filter_by_load_id_gt(load_id: str, status: Union[int, List[int]] = 0) which should return all rows with a load id greater than the one provided, I need this for incremental loading.
Even one more request: for all methods that filter by status, status should also be able to be "None", in that case no filtering on status happens and the loads table doesn't even get joined.
The change to ReadableIbisRelation.__getitem__ triggers mypy because it doesn't respect the superclass signature. Let me know how I should approach this. Test coverage is not perfect, but I didn't want this PR to be a blocker for longer
In terms of user-facing API, we discussed
pipeline = dlt.pipeline(...)
dataset = pipeline.dataset()
reviews = dataset.reviews.incremental()
items = dataset.items
t = reviews.join(items, reviews.item_id == items.id)
I believe .incremental() on a table (dataset.reviews) doesn't tell you that it's a filter / WHERE operation. It's also hard to distinguish from proxied Ibis methods.
Suggestion
What about filtering the dataset directly? This .filter() method applies to the _dlt_loads table and is propagate to child tables (via root key, and future support for recursive). The convenience methods .list_load_ids() allow to retrieve the desired load_id values.
This would allow the following:
pipeline = dlt.pipeline(...)
dataset = pipeline.dataset()
last_load_id = dataset.last_load_id
reviews = dataset.filter(last_load_id).reviews #filtered
items = dataset.items # not filtered
t = reviews.join(items, reviews.item_id == items.id)
pipeline = dlt.pipeline(...)
dataset = pipeline.dataset()
# .filter() returns a dataset with some attributes set
incremental_dataset = pipeline.dataset().filter(dataset.last_load_id)
reviews = incremental_dataset.reviews #filtered
items = dataset.items # not filtered
pipeline = dlt.pipeline(...)
dataset = pipeline.dataset(load_ids=["specific-load-id"])
# both received the dataset filter
reviews = dataset.reviews #filtered
items = dataset.items # filtered
Changes and specs
-
ReadableDBAPIDatasetcan store a set of load_id as attribute -
ReadableDBAPIRelationholds a reference toReadableDBAPIDatasetto access the load_id selection (already implemented) - When
ReadableDBAPIRelation.query()is called (i.e., execute the lazy query), filter the load table and apply to current table via join or filter -
ReadableDBAPIDatasetgets a convenience methodlist_load_ids()that returns descending ordered list, can filter by status, and set limit
closing in favor #2612