dlt icon indicating copy to clipboard operation
dlt copied to clipboard

sync state with filesystem destination

Open rudolfix opened this issue 1 year ago • 0 comments

Background filesystem still does not support state sync

dlt will sync local state with destination by downloading the newest set of schemas and retrieving the newest version of the state. This happens when (1) there's no local state at all - in that case the state is recreated (2) the local state exists but the destination dataset was dropped - in that case the local state is wiped and resynced. This is implemented in sync_destination of Pipeline class. dlt relies on SqlJobClient and SqlClient implementations to carry on those tasks, Here we want to extract base operations to an interface that any destination can implement. For example: we can easily add state sync to filesystem and weaviate destinations.

Tasks

    • [x] add new ABC to reference.py that when implemented by JobClient allows to fetch the newest state and schema from the destination
    • [x] define method to retrieve a newest schema by name ie get_newest_schema_from_storage(name) - see current SqlJobClient implementation
    • [x] define method to retrieve newest state ie. load_state_from_destination(pipeline_name: str, sql_client: SqlClientBase[Any]) -> TPipelineState: which is found in state_sync.py
    • [x] current sync_destination detects the situation that dataset was deleted by catching DatabaseUndefinedRelation that happens when (1) dataset does not exist (2) any of the tables that store state or schemas do not exist. we want to preserve this behavior but formalize it. IMO checking upfront is costly so maybe we should raise a specialized exception ie. DatasetDeleted in the methods above and catch this
    • [x] move all relevant SQL code to SqlJobClient and implement new interface. Align all the tests
    • [ ] implement the interface for filesystem. that will require saving the schemas as files and then to retrieve the most recently saved. retrieval of state will be even harder (and may require that we additionally save state as JSON into the load package like we save schema) - we can move this to a separate ticket
    • [x] if the weaviate is merged, implement this for weaviate.
    • [x] extend tests to new destinations. this behavior is well tested and we may parametrize it like we parametrize the destination tests

Notes by Dave

  • [ ] We also need to manage the loads table properly in the filesystem destination, this is kind of abstracted already through complete_load, but as it is now, it can't be properly read back when getting the state.
  • [ ] We need to define how to save all these tables if there only is the filesystem present. On Athena it already works quite well, because we have those schema definitions that are queryable. Most likely we will write out to the same format as the actual loaded data and implement a way to delete/append data that is reliable.

rudolfix avatar Aug 04 '23 09:08 rudolfix