dlt
dlt copied to clipboard
sync state with filesystem destination
Background filesystem still does not support state sync
dlt
will sync local state with destination by downloading the newest set of schemas and retrieving the newest version of the state. This happens when (1) there's no local state at all - in that case the state is recreated (2) the local state exists but the destination dataset was dropped - in that case the local state is wiped and resynced. This is implemented in sync_destination
of Pipeline
class.
dlt
relies on SqlJobClient
and SqlClient
implementations to carry on those tasks, Here we want to extract base operations to an interface that any destination can implement. For example: we can easily add state sync to filesystem
and weaviate
destinations.
Tasks
-
- [x] add new ABC to
reference.py
that when implemented byJobClient
allows to fetch the newest state and schema from the destination
- [x] add new ABC to
-
- [x] define method to retrieve a newest schema by name ie
get_newest_schema_from_storage(name)
- see current SqlJobClient implementation
- [x] define method to retrieve a newest schema by name ie
-
- [x] define method to retrieve newest state ie.
load_state_from_destination(pipeline_name: str, sql_client: SqlClientBase[Any]) -> TPipelineState:
which is found instate_sync.py
- [x] define method to retrieve newest state ie.
-
- [x] current
sync_destination
detects the situation that dataset was deleted by catchingDatabaseUndefinedRelation
that happens when (1) dataset does not exist (2) any of the tables that store state or schemas do not exist. we want to preserve this behavior but formalize it. IMO checking upfront is costly so maybe we should raise a specialized exception ie. DatasetDeleted in the methods above and catch this
- [x] current
-
- [x] move all relevant SQL code to SqlJobClient and implement new interface. Align all the tests
-
- [ ] implement the interface for
filesystem
. that will require saving the schemas as files and then to retrieve the most recently saved. retrieval of state will be even harder (and may require that we additionally save state as JSON into the load package like we save schema) - we can move this to a separate ticket
- [ ] implement the interface for
-
- [x] if the weaviate is merged, implement this for weaviate.
-
- [x] extend tests to new destinations. this behavior is well tested and we may parametrize it like we parametrize the destination tests
Notes by Dave
- [ ] We also need to manage the loads table properly in the filesystem destination, this is kind of abstracted already through complete_load, but as it is now, it can't be properly read back when getting the state.
- [ ] We need to define how to save all these tables if there only is the filesystem present. On Athena it already works quite well, because we have those schema definitions that are queryable. Most likely we will write out to the same format as the actual loaded data and implement a way to delete/append data that is reliable.