kedro-plugins
kedro-plugins copied to clipboard
Unable to save and load from versioned ManagedTableDatasets
Description
I am trying to make use of a versioned ManagedTableDataset so that I can correctly load and save using different versions of a delta table. I'm encountering an error when trying to load from a catalog, because the version is incorrectly configured for a delta table
Context
How has this bug affected you? What were you trying to accomplish?
Steps to Reproduce
conf = {'ds': {'type': 'kedro_datasets.databricks.ManagedTableDataset',
'versioned': True,
'database': 'dev',
'table': 'table',
'write_mode': 'overwrite'},
'ds_2': {'type': 'kedro_datasets.databricks.ManagedTableDataset',
'versioned': True,
'database': 'dev',
'table': 'table',
'write_mode': 'overwrite'}}
catalog = DataCatalog.from_config(conf)
catalog.load("ds")
Expected Result
When creating these datasets, I expect that the load version numbers should be resolved from the current version, the specified version number or 0 if the table doesn't exist.
When calling ManagedTableDataset.save, the load and save version numbers should be incremented accordingly
ds = catalog.ds
ds_2 = catalog.ds_2
#check initial versions
assert ds.resolve_load_version() == 0
assert ds.resolve_save_version() == 1
assert ds_2.resolve_load_version() == 0
assert ds_2.resolve_save_version() == 1
ds.save(spark.createDataFrame(...))
#check ds has updated versions
assert ds.resolve_load_version() == 1
assert ds.resolve_save_version() == 2
#check load version of ds_2 is fixed and save version is updated
assert ds_2.resolve_load_version() == 0
assert ds_2.resolve_save_version() == 2
Actual Result
DatasetError: Failed while loading data from dataset ManagedTableDataset(database=dev, dataframe_type=spark, table=table, version=Version(load=None, save='2024-10-30T10.17.55.613Z'), write_mode=overwrite).
'>=' not supported between instances of 'NoneType' and 'int'
File /local_disk0/.ephemeral_nfs/envs/pythonEnv-74f91739-2f89-4042-8eb7-77d385bb6dde/lib/python3.10/site-packages/kedro_datasets/databricks/_base_table_dataset.py:359, in BaseTableDataset._load(self)
348 """Loads the version of data in the format defined in the init
349 (spark|pandas dataframe).
350
(...)
357 in the format defined in the init.
358 """
--> 359 if self._version and self._version.load >= 0:
360 try:
TypeError: '>=' not supported between instances of 'NoneType' and 'int'
Your Environment
Include as many relevant details about the environment in which you experienced the bug:
- Kedro version used (
pip show kedroorkedro -V): 0.19.9 - Kedro plugin and kedro plugin version used (
pip show kedro-airflow): kedro-datasets - 5.1.0 - Python version used (
python -V): 3.10.12 - Operating system and version: Linux 5.15.0-1073-azure #82~20.04.1-Ubuntu SMP Tue Sep 3 12:27:43 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux
Hey @jstammers, thanks for raising this issue! The _load method used by ManagedTableDataset (inherited from BaseTableDataset) is making this comparison between None with 0 and causing the invalid type comparison you're seeing. While this is not addressed, you can try explicitly setting the initial version of your datasets to prevent it from defaulting to None.
@MinuraPunchihewa Do you have an idea?
I am actually confused that ManagedTableDataset is an AbstractVersionedDataset, which has resolve_load_version() etc. This is designed for file-based data, not for Table. The table versioning is possible with Delta, which is a separate versioning scheme come with Delta and is not compatible with Kedro native one (a special format timestamp).
We can probably implement this for a specific dataset, but before that can I understand a bit more the use case here? Do you plan to use this as the example you stated or you are just checking if it's versioned?
@MinuraPunchihewa Do you have an idea?
I am actually confused that
ManagedTableDatasetis anAbstractVersionedDataset, which hasresolve_load_version()etc. This is designed for file-based data, not for Table. The table versioning is possible withDelta, which is a separate versioning scheme come with Delta and is not compatible with Kedro native one (a special format timestamp).We can probably implement this for a specific dataset, but before that can I understand a bit more the use case here? Do you plan to use this as the example you stated or you are just checking if it's versioned?
@noklam I am happy to take a look at this.
About you comment on AbstractVersionedDataset being used for file-based data. Is that how it is meant to be used? I was under the impression that it can be used for any dataset (or data source rather) that implements versioning? I would love an explanation on how exactly it is designed to be used.
@noklam yes, the use case I have in mind is to be able to load the previous version of a delta table, so that I can perform some validation of the changes to the table after updating it.
As a pipeline, it would look something like
pipeline = Pipeline([
node(update_table, inputs=["table", "staging_table"], outputs="updated_table"),
node(validate_changes, inputs=["table", "updated_table"], outputs="changes")
])
where "table" and "updated_table" reference the same underlying delta table. When calling validate_changes, I expect "updated_table" to load from version n and "table" to load from version n-1
As for inferring the version number, I think the simplest way to do that is to use the following spark SQL statement
current_version = spark.sql("DESCRIBE HISTORY <catalog>.<database>.<table>").select("version").first()[0]
I'd be looking at some PoC to play with Iceberg and versioning and may come back to this a little bit.
@jstammers The other options is do this validation with hook instead of a node (nothing wrong with the current approach as well). How does the node generate the delta change? I see that the nodes has two inputs and split out the "changes" as output. Is this some kind of incremental pipeline?
About you comment on AbstractVersionedDataset being used for file-based data. Is that how it is meant to be used? I was under the impression that it can be used for any dataset (or data source rather) that implements versioning?
From my understanding, it was designed for filebase data. Version is a class that takes load_version and save version.
There are couple of requirements here:
- Version number need to be monotonic, as Kedro requires this to ensure the pipeline running with correct data. i.e. "resolve_load_version" should pick the latest file, right now it's assuming the latest timestamp.
- Optionally, this "load_version" can be specified with a
--load-versionsargument withkedro runper datasets.- This is a less important requirement, because worst case we can choose to not support arbitrary version via CLI for this specific dataset
save_versioncomes from the session_id, which is the timestamp Kedro generates.
Take this example:
my_data:
type: some.FileDataset
path: my_folder/abc.file
versioned: true
This is expected to save file as
my_folder/abc.file/timestamp1/abc.file
my_folder/abc.file/timestamp2/abc.file
my_folder/abc.file/timestamp3/abc.file
Noted that abc.file is both a folder name (slightly awkward), but also the file name. So the assumption here is, a file need to have a path (not necessary true for a Database table), and most table format has their own versioning scheme that was designed with much stronger feature sets (ACID for Delta/Iceberg).
Cc @merelcht