Allow for dynamic SQL filtering of datasets through lazy loading
Description
We are pulling from some upstream tables that have a long history, and when running our feature engineering, training etc pipelines we only want to grab a certain window of data (last year for example). This isn't a static query and can change with each successive run of the pipeline.
Currently, the SQL Datasets only load either a static query or an entire table. I would like to be able to dynamically filter these tables based on certain conditions.
Context
It is important because currently if we are consuming upstream tables with 6 years of history but only care about the last year, it takes way too long for me to then immediately discard >80% of the ingested data.
Possible Implementation
An idea I am considering, thanks to @datajoely's feedback, is to create a new dataset type which when loaded returns an object which when called will actually load the data, working the same way as PartitionedDataSet. This object can be manipulated within a node to add filters to it before finally calling pd.read_sql or the spark equivalent. This is a possible implementation which is working for me. I haven't written up a true Kedro dataset as I saw a lot of extra code which would not actually help illustrate the functionality. The first block of code is the dataset definition, and the second block simulates how it would run in a kedro workflow (without actually implementing any nodes or pipelines, just for illustration).
lazy_load_sql_query_dataset.py
from typing import Optional, List
from sqlalchemy import create_engine, MetaData, Table, Column, String, Date, Float, select
from collections import namedtuple
import pandas as pd
# TODO: condition where `value` is another column
Condition = namedtuple('condition', ['column', 'operator', 'value'])
def build_query(table, columns=None, conditions:Optional[List[Condition]]=None):
"""
Take a SQLAlchemy table object and build a query from it.
Optionally, select columns to load and different conditions to filter on.
:param table: SQLAlchemy table object
:param columns: list of columns to load
:param conditions: list of Condition objects
"""
sql_comparison_to_magic_method_map = {
'=': '__eq__',
'!=': '__ne__',
'>': '__gt__',
'>=': '__ge__',
'<': '__lt__',
'<=': '__le__',
}
if columns is None:
columns = [table]
else:
columns = [table.c[col] for col in columns]
query = select(*columns)
if conditions is not None:
for condition in conditions:
comparison_attribute_name = condition.operator
tried_map = False
comparison_attribute = None
# Hacky way to either get the magic method using the comparison operator
# or the magic method directly
while not comparison_attribute:
try:
comparison_attribute = getattr(table.c[condition.column], comparison_attribute_name)
except AttributeError as er:
if not tried_map:
comparison_attribute_name = sql_comparison_to_magic_method_map[comparison_attribute_name]
tried_map = True
else:
er
query = query.where(comparison_attribute(condition.value))
return query
class LazyLoadSQLQueryDataset():
def __init__(self, table_name, connection_string, load_method=pd.read_sql):
"""
Save table and connection information for later loading.
:param table_name: name of the table to load
:param connection_string: SQLAlchemy connection string
:param load_method: function to use to load the data (for pandas and spark compatibility)
"""
self.table_name = table_name
self.connection_string = connection_string
self.load_method = load_method
def load(self):
"""
Initialize the SQLAlchemy engine and table objects without actually loading the data.
Returns itself, a callable object that can be used to load the data. This is the method which would be called
on node start.
"""
engine = create_engine(self.connection_string)
metadata = MetaData()
table = Table(self.table_name, metadata, autoload_with=engine)
self.engine = engine
self.table = table
return self
def __call__(self, columns=None, conditions=None):
"""
Build a query from the table and load the data using the specified load method.
"""
query = build_query(self.table, columns, conditions)
return self.load_method(query, self.engine)
test_dataset.py
from lazy_load_sql_query_dataset import LazyLoadSQLQueryDataset, Condition
#------
# catalog initialization
stock_table_dataset = LazyLoadSQLQueryDataset("stock_prices", "duckdb:///test_db/test-db-0.1")
#------
# catalog.load
stock_table_dataset_loader = stock_table_dataset.load()
#------
# node run
# define from config
columns_to_load = ['ticker', 'date', 'price_open']
filter_conditions = [Condition('ticker', '=', 'AAPL'), Condition('date', '__ge__', '2023-01-01')]
# call the loader inside a node with the selected columns and conditions
stocks = stock_table_dataset_loader(columns=columns_to_load, conditions=filter_conditions)
# stocks is a loaded pandas dataframe
print(stocks)
Possible Alternatives
Messing with the context and other ways of modifying kedro behavior through templatedconfigloader and hooks, but it's just hacky and not scalable.
@inigohidalgo thank you for putting a lot of thought into this -
I spent last night reading about Ibis which I think may be the modern and correct way to approach the problem space you're trying to work with.
I'm still not sure how to make the syntax work neatly in Kedro world, but I'm hoping it can work like this polars dataset we nearly have merged.
@inigohidalgo while this doesn't solve for your more detailed use case, have you looked at using globals?
This is how I currently handle the use case of updating my query for specific dates, and I believe they can be passed at runtime.
Thanks for the heads up about Ibis @datajoely, it does seem to be the direction we could go in, though it seems to go a step further than what we'd need (full computational backend vs just simple filtering, joining etc). I would note I haven't been able to connect to our Azure SQL databases using our usual connection method (pyodbc), so I'm not sure how mature it is, in case you're considering putting development towards it. I might open an issue in the project regarding this if it seems promising: basically at the moment they only provide the pymssql driver for connecting to mssql DBs, but it doesn't seem to be compatible with our authentication method (Azure AD).
Hey @bpmeek yeah that's the "compromise" solution we'd been using until now, basically just passing it as a general parameter. But it isn't really flexible enough for what we're trying to do a lot of the time.
For the moment I haven't made progress on this since I've been sidetracked on other issues, but it is still very much an important question for my team and me, and I am still considering pursuing my original idea since it seems simple enough to implement through SQLAlchemy which is much more mature and developed.
Hey all, I'm the lead developer of ibis!
We're happy to support alternative methods of connecting to mssql-oriented DBs. We haven't added that capability yet because no one has asked for it!
If that'll tip the scales in favor of ibis then I think we'd be happy to work on adding ODBC support to the MSSQL backend.
Thanks for chiming in @cpcloud! ππ½ We haven't started exploring Ibis for real yet, but if the only limitation is mssql DBs, maybe we could build a proof of concept with PostgreSQL or something else. We'll surely reach out soon if we make progress on this front.
Went ahead and built a small poc using duckdb (link). I really like Ibis' API for achieving what I want, much simpler than SQLAlchemy. I will keep testing it, but it definitely seems like it could be the way forward for us.
I will open a separate issue in the appropriate repo when I investigate further, but: the ibis.Table object works flawlessly on the first node I load it (filter_dates); if I call .execute on it, it returns the filtered dataframe as expected. But if I pass the filtered Table object into another node to then execute, like I do here in ibis_table_to_df it throws AttributeError: 'Backend' object has no attribute 'con'
Full traceback:
Traceback (most recent call last):
File "/home/jovyan/my-conda-envs/py38/lib/python3.8/runpy.py", line 194, in _run_module_as_main
return _run_code(code, main_globals, None,
File "/home/jovyan/my-conda-envs/py38/lib/python3.8/runpy.py", line 87, in _run_code
exec(code, run_globals)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/__main__.py", line 38, in <module>
main()
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/framework/cli/cli.py", line 681, in main
cli_collection(**cli_context)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/click/core.py", line 829, in __call__
return self.main(*args, **kwargs)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/click/core.py", line 782, in main
rv = self.invoke(ctx)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/click/core.py", line 1259, in invoke
return _process_result(sub_ctx.command.invoke(sub_ctx))
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/click/core.py", line 1066, in invoke
return ctx.invoke(self.callback, **ctx.params)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/click/core.py", line 610, in invoke
return callback(*args, **kwargs)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/src/ibis_kedro_poc/cli.py", line 233, in run
session.run(
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/framework/session/session.py", line 403, in run
run_result = runner.run(filtered_pipeline, catalog, run_id)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/runner/runner.py", line 100, in run
self._run(pipeline, catalog, run_id)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/runner/sequential_runner.py", line 90, in _run
run_node(node, catalog, self._is_async, run_id)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/runner/runner.py", line 212, in run_node
node = _run_node_sequential(node, catalog, run_id)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/runner/runner.py", line 297, in _run_node_sequential
outputs = _call_node_run(node, catalog, inputs, is_async, run_id=run_id)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/runner/runner.py", line 265, in _call_node_run
raise exc
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/runner/runner.py", line 255, in _call_node_run
outputs = node.run(inputs)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/pipeline/node.py", line 466, in run
raise exc
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/pipeline/node.py", line 455, in run
outputs = self._run_with_one_input(inputs, self._inputs)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/kedro/pipeline/node.py", line 488, in _run_with_one_input
return self._decorated_func(inputs[node_input])
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/src/ibis_kedro_poc/pipelines/ingest_nodes.py", line 10, in ibis_table_to_df
return data.execute()
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/ibis/expr/types/core.py", line 303, in execute
return self._find_backend(use_default=True).execute(
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/ibis/backends/base/sql/__init__.py", line 253, in execute
with self._safe_raw_sql(sql, **kwargs) as cursor:
File "/home/jovyan/my-conda-envs/py38/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/ibis/backends/base/sql/alchemy/__init__.py", line 129, in _safe_raw_sql
with self.begin() as con:
File "/home/jovyan/my-conda-envs/py38/lib/python3.8/contextlib.py", line 113, in __enter__
return next(self.gen)
File "/home/jovyan/Misc/kedro-datasets/ibis-kedro-poc/.venv/lib/python3.8/site-packages/ibis/backends/base/sql/alchemy/__init__.py", line 165, in begin
with self.con.begin() as bind:
AttributeError: 'Backend' object has no attribute 'con'
(Note: I tested it with both duckdb and sqlite as a backend and ran into the same issue)
I haven't been able to dig too deep into what is causing it, but I suspect it could be related to how kedro passes datasets from node to node (depending on the MemoryDataset copy_mode arg, deepcopy by default), as also seen in this issue kedro-org/kedro#730. Maybe the backend is somehow losing its con object when being deepcopied?
Hi @inigohidalgo, sorry it took some time to get back, but thanks a lot for making this POC! This week I spoke to @datajoely and @marlenezw about Ibis & Kedro and I'm thrilled to see this is moving forward.
Two comments:
- Rather than using
.execute(), which is hardcoded to return apandas.DataFrame, you can be more explicit and use.to_pandas,.to_pyarrowand so on, see https://github.com/ibis-project/ibis/issues/6351#issuecomment-1590882862 - Indeed, looks like the
.conproperty is not part of__init__, but later created on the fly https://github.com/ibis-project/ibis/blob/6d4a34f74bf4880d33495ad94d44d58fdd97c2b0/ibis/backends/base/sql/alchemy/init.py#L122 cc @cpcloud- I'd try out setting the
copy_modearg toassignto see if that makes it pass
- I'd try out setting the
Ideally there should be a repo that only contains the IbisDataSet that can be pip-installed, and then a showcase Kedro project that uses that (rather than putting the code in the extras subpackage). If you need help for the final stretch, ping me and I'll be happy to help!
I'd try out setting the copy_mode arg to assign to see if that makes it pass
Yeah that solves it, but is kind of cumbersome to have to specify copy_mode for each Ibis memorydataset I'd be passing through, as at the moment I am not specifying any memorydatasets at all in my catalog (note my comments in the kedro slack). I suggested more flexibility in the _infer_copy_mode function, as at the moment it is arbitrarily flexible for pandas and numpy, and assigns everything else to deepcopy by default. That is only a tangential issue to IbisDataSet anyways, and I am still interested in implementing it regardless of needing to specify copy_mode in parameters.
I like the idea of sharing the IbisDataSet as a standalone package, and I appreciate the comment regarding .to_pandas and .to_pyarrow, that particularity was highlighted by @gforsyth earlier today :) Although that is part of the node and not part of the dataset, which only returns an ibis.Table which I then operate on.
Do you think the IbisDataSet as-is would be in a state to publish? With the required tests and documentation obviously, though all the necessary methods are implemented already, though potentially in a barebones manner.
Do you think the IbisDataSet as-is would be in a state to publish? With the required tests and documentation obviously, though all the necessary methods are implemented already, though potentially in a barebones manner.
Release early, release often π
I extracted the IbisDataSet and filled out the missing functionality here: https://github.com/inigohidalgo/kedro-ibis-dataset
I also updated the PoC to use the external dataset package instead of the bundled extra: https://github.com/inigohidalgo/ibis-kedro-poc
I'd appreciate feedback on the implementation :)
This is super cool @inigohidalgo - until now SQL has always been a 2nd class citizen in Kedro and this is a huge step forward in terms of usability and robustness.
Longer term it would be cool to think about how to enable more comprehensive ibis support in Kedro. A first step may be to bring this into the kedro-datasets repository under something like ibis.SQLDataFrameDataSet?
I'd love to provide blanket support for all ibis datasets, the problem I'm not sure how to solve is how to manage optional dependencies twice (e.g. pip install ibis[pyspark] and kedro[spark.SparkDataSet]) from both user workflow and project maintenance burden perspectives.
AFAIK @astrojuanlu is the python packaging π so he might have more of an insight into what is possible using the new PEP621 metadata, but bringing it into kedro-datasets would probably complicate that a bit more than keeping it as a separate package (or subpackage within the kedro-extras repo), which would allow for specifying different dependency groups for kedro-ibis-dataset aligned with those Ibis defines
As far as project maintenance goes, a GH action which parses Ibis' dependency groups and reproduces those into kedro-ibis-dataset groups shouldn't be too complicated to implement, though it might be kind of brittle if Ibis decides to move onto a package manager different from poetry. Though I do know the tool I'm using (PDM) has a workflow to import poetry dependencies into PEP621-compatible deps.
Sidenote: is there a better place we could be having this discussion? I guess the package would solve this issue, but maybe visibility is limited buried inside an issue.
I'd love to provide blanket support for all ibis datasets, the problem I'm not sure how to solve is how to manage optional dependencies twice
Yeah I don't think there's a way to have two levels of optional dependencies. We either include those as parts of the optional dependencies groups we already have:
# kedro-datasets/setup.py
extras_require["spark.SparkDataSet"] = [SPARK, HDFS, S3FS, "ibis[pyspark]"]
or we provide another group:
extras_require["spark.SparkDataSet"] = [SPARK, HDFS, S3FS]
extras_require["spark.SparkDataSet_lazy"] = [SPARK, HDFS, S3FS, "ibis[pyspark]"]
Bringing it into kedro-datasets would probably complicate that a bit more than keeping it as a separate package
We don't have a well-defined process on how a dataset should "graduate" to be part of kedro-datasets or whether we should control the maintenance burden in any way, so I'm improvising here. Since everybody can now do pip install kedro-ibis-dataset, I'd say let's promote this so users start playing with it, iron the first issues that arise, and keep this thread open so that we can use those learnings to see how "lazy SQL filtering" can be a core part of Kedro.
Sidenote: is there a better place we could be having this discussion?
We can connect on #plugins-integrations on the Kedro Slack!
I'd love to provide blanket support for all ibis datasets, the problem I'm not sure how to solve is how to manage optional dependencies twice
Yeah I don't think there's a way to have two levels of optional dependencies.
I think the more solid path forward would be to say "all things SQL in Kedro go through Ibis" and make it a "first level" optional dependency. No need to add more levels of indirection I'd say. To confidently commit to that plan, I propose we put this initial dataset in the hands of users and play around with it a bit more.
Dropping this here for future reference https://delta.io/blog/2023-06-15-using-ibis-pyspark-delta-lake/
we did recently add to_delta and read_delta methods to Ibis for DuckDB, Polars, and DataFusion backends. we have this issue for tracking for pyspark: https://github.com/ibis-project/ibis/issues/6383
let us know if prioritizing that would be helpful!
Thatβs an amazing addition for interoperability. I donβt use spark so I canβt comment on that, but the ability to read/write to delta without need of a cluster running is very valuable.Β
That's super cool @lostmygithubaccount
Now that kedro-ibis-dataset demonstrates that this is possible, is there something that we can do at the Framework level to improve this functionality?
I think the next steps are using the package, finding bugs and corner cases, and promote it a bit more heavily. Maybe present a conference talk in 2024 :)
Other than that, any other loose ends? Or can we go ahead and close this issue?
π https://kedro.org/blog/building-scalable-data-pipelines-with-kedro-and-ibis π
Source code: https://github.com/deepyaman/jaffle-shop
So, technically this is possible. And now the question is what do we do about it:
- Should we bring https://github.com/deepyaman/jaffle-shop/blob/main/src/jaffle_shop/datasets/ibis/table_dataset.py to
kedro-datasets? - Should it be a separate, pip-installable package?
A third option is not doing anything, but I'd love to take one of the two paths.
We've discussed sometimes that, since Ibis has lots of different backends, managing that from within kedro-datasets existing layer of optional dependencies might be a mess. But @deepyaman you have a more informed opinion.
cc @merelcht @noklam
So I have 3 cascading questions:
- Do we do a mega
ibis.IbisDataSetthat supports all back-ends, SQL/DataFrame whatever? - If this isn't feasible would separate
ibis.SQLDatasetandibis.DataFrameDatasets make sense? - Lastly how on earth do we keep optional requirements in sync?
- We pick conservative bounds on Kedro side for each back-end and bump our versions frequently
- We don't include them on Kedro's side and defer to the user specifically installing Ibis dependencies the
pip install 'ibis-framework[oracle]' - We write some creative nightly CI script that bumps/releases
kedro-datasetseverytime it spots a new Ibis release.
Longer term I'd love to decrease the number of 1st party datasets we maintain on kedro-datasets and delegate that.
From my experience, at the moment ibis comes with quite a lot of dependencies even by default, so adding it to kedro-datasets imo would be difficult to manage. I think there might be something that can be done on the ibis side to lighten that a bit (@cpcloud), but I wouldn't really see the current dependencies as something desirable to be installed by default.
Keeping it as a separate package and managing that complexity on a more 1:1 basis between the IbisDataset package and Ibis makes more sense to me.
Agree with @inigohidalgo. Installing the union of all supported backends' dependencies by default would place quite a burden on end users.
We're in the middle of removing sqlalchemy as direct dependency in Ibis, which reduces the number of dependencies (direct and transitive) by quite a bit, but even after that with 20+ backends there's still a lot of dependencies if support for all backends were included in a single package.
We have to deal with roughly the same issue on our side, since most end users don't need 20 backends π
Could kedro-datasets take a similar approach to Ibis and use setuptools extras to allow users to install it like pip install kedro-datasets[snowflake] (for example)?
Yes @cpcloud we have something like this currently in our setup.py (sorry, pyproject.toml coming soon!) and the main point I was making is that we don't want to double up.
Could kedro-datasets take a similar approach to Ibis and use setuptools extras to allow users to install it like pip install kedro-datasets[snowflake] (for example)?
Yeah they already do that AFAIK, but here if it were included within kedro-datasets it would have to be
- kedro-datasets[ibis] to actually install the ibis requirements
- kedro-datasets[ibis-mssql], kedro-datasets[ibis-snowflake] etc.
https://github.com/kedro-org/kedro/issues/2374#issuecomment-1598561951
That would be easier to manage if you just pip installed kedro-ibis which directly had the different dependency groups which would align with ibis'.
Ah, right. Thanks for the link, I didn't realize we're dealing with two levels of optionality here.
Hm, I'm not entirely sure what the best option is here. Perhaps including it as part of the existing extras might be okay?
I'd be in favour of having a kedro-ibis separate package, otherwise our choices will be constrained by the existing optional dependencies layer. It will be better for discoverability (see https://github.com/kedro-org/kedro-plugins/issues/401) and for tracking (no need to do anything fancy with telemetry, we have PyPI stats).
Keeping it as a separate package and managing that complexity on a more 1:1 basis between the IbisDataset package and Ibis makes more sense to me.
Essentially this π―
Longer term I'd love to decrease the number of 1st party datasets we maintain on kedro-datasets and delegate that.
In progress: https://github.com/kedro-org/kedro-plugins/issues/535
- Do we do a mega ibis.IbisDataSet that supports all back-ends, SQL/DataFrame whatever?
- If this isn't feasible would separate ibis.SQLDataset and ibis.DataFrameDatasets make sense?
This is the key design question I'd say.
I will defer to @deepyaman for his opinion on different backends, as I have only really worked with mssql and polars backends, but I'm not sure what the functional difference would be between the SQL and Dataframe datasets as the API should (here is where I will defer to Deepyaman and Phillip) be the same. Instantiating the connection will have its own parameters, but I know the team tries to keep ibis.connect as a common entrypoint for all backends.
The main complexity is ibis' different optional dependencies, and that would probably be the main maintenance burden within this hypothetical IbisDataset project, though it could probably be automated relatively easily, as we would only need to keep track of ibis' dependency groups and make the same groups available within IbisDataset just pointing to the correct ibis-framework[group] in each.