dlt icon indicating copy to clipboard operation
dlt copied to clipboard

feat/2372 extend dataset querying API for incremental load

Open zilto opened this issue 10 months ago • 5 comments

Implements #2372

zilto avatar Mar 06 '25 16:03 zilto

Deploy Preview for dlt-hub-docs canceled.

Name Link
Latest commit 06d5fd49e056ca96101eb5ca4509abb44ad39dca
Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/67edb1c22a86e500081e61ee

netlify[bot] avatar Mar 06 '25 16:03 netlify[bot]

@zilto after working more on the transformations, I have a few changes to the original ticket: we need one filter_by_load_id_gt(load_id: str, status: Union[int, List[int]] = 0) which should return all rows with a load id greater than the one provided, I need this for incremental loading.

sh-rp avatar Mar 06 '25 17:03 sh-rp

Even one more request: for all methods that filter by status, status should also be able to be "None", in that case no filtering on status happens and the loads table doesn't even get joined.

sh-rp avatar Mar 06 '25 18:03 sh-rp

The change to ReadableIbisRelation.__getitem__ triggers mypy because it doesn't respect the superclass signature. Let me know how I should approach this. Test coverage is not perfect, but I didn't want this PR to be a blocker for longer

zilto avatar Mar 11 '25 13:03 zilto

In terms of user-facing API, we discussed

pipeline = dlt.pipeline(...)
dataset = pipeline.dataset()
reviews = dataset.reviews.incremental()
items = dataset.items
t = reviews.join(items, reviews.item_id == items.id)   

I believe .incremental() on a table (dataset.reviews) doesn't tell you that it's a filter / WHERE operation. It's also hard to distinguish from proxied Ibis methods.

Suggestion

What about filtering the dataset directly? This .filter() method applies to the _dlt_loads table and is propagate to child tables (via root key, and future support for recursive). The convenience methods .list_load_ids() allow to retrieve the desired load_id values.

This would allow the following:

pipeline = dlt.pipeline(...)
dataset = pipeline.dataset()
last_load_id = dataset.last_load_id 
reviews  = dataset.filter(last_load_id).reviews  #filtered
items = dataset.items  # not filtered
t = reviews.join(items, reviews.item_id == items.id)   
pipeline = dlt.pipeline(...)
dataset = pipeline.dataset()
# .filter() returns a dataset with some attributes set
incremental_dataset = pipeline.dataset().filter(dataset.last_load_id)
reviews = incremental_dataset.reviews  #filtered
items = dataset.items  # not filtered
pipeline = dlt.pipeline(...)
dataset = pipeline.dataset(load_ids=["specific-load-id"])
# both received the dataset filter
reviews = dataset.reviews  #filtered
items = dataset.items  # filtered

Changes and specs

  • ReadableDBAPIDataset can store a set of load_id as attribute
  • ReadableDBAPIRelation holds a reference to ReadableDBAPIDataset to access the load_id selection (already implemented)
  • When ReadableDBAPIRelation.query() is called (i.e., execute the lazy query), filter the load table and apply to current table via join or filter
  • ReadableDBAPIDataset gets a convenience method list_load_ids() that returns descending ordered list, can filter by status, and set limit

zilto avatar Apr 01 '25 23:04 zilto

closing in favor #2612

zilto avatar May 07 '25 17:05 zilto