kedro
kedro copied to clipboard
Dynamic Pipeline
Introduction
We had a discussion about dynamic pipelines from #1993, also partly related to #1963, this issue is to summarise the discussion and lay out the work that we need to do.
Related Issues:
- #2626
A high-level, short overview of the problem(s) you are designing a solution for.
Background
Dynamic Pipeline has been one of the most asked questions, there are various solutions but often they are case-by-case. As a result the solutions comes with all fashion and it has been asked whether Kedro can provide a feature for that.
What is "Dynamic Pipeline"
When people are referring "Dynamic Pipeline", often they are talking about the same thing. We need to make a clear distinction between them before we start to build a solution for it.
We can roughly categorise them into 2 buckets
- Dynamic construction of Pipeline
- Dynamic behavior at runtime
Dynamic construction of Pipeline (easier)
Examples of these are:
- Time series forecasting - Pipeline make prediction for Day 1, next pipeline requires Day 1 prediction as input.
- Hyperparameters tuning
- Combined variable length of features - feature engineering combine N features into 1 DataFrame
- A list of countries - each need to be saved as a catalog entry, the data are then combined in a pipeline for further processing
Dynamic behavior at runtime (harder)
Examples of these are:
- 2nd order pipelines - pipelines generated from some node's output
-
I have a scenario that I would like to run a model training and model evaluation based on labels on dataset. Each Label would trigger an indiviual pipeline.
-
A pipeline that make prediction on 1 user, Fetch a list of N users, then run pipeline on each of them.
-
- Running node conditionally - Run A if B does not exist, otherwise run C
Why is it challenging for Kedro Users?
It needs experience with Kedro, often you need to combine advance features, i.e. TemplatedConfig + Jinja in Catalog + doing some for loop in your pipeline.py
.
In addition, each of the use cases need different solutions. As part of the Kedro's value proposition is the standardisation. There are no well-known pattern for these solution, they are hard to reason and debug with Jinja
What's not in scope
- Non-DAG pipeline - i.e. Github Action, CircleCI type of pipeline.
- Skipping nodes - i.e. if A exist, don't run B and C (a workaround with hooks is possible)
- Dynamic node generation during a run
These two types of pipelines are fundamentally different from the "data-centric" approach of Kedro
What are some possible solutions?
Follow-up
- [ ] https://github.com/kedro-org/kedro-devrel/issues/7
Reference
- https://miro.com/app/board/uXjVMSX0s6s=/
Thanks for this @noklam -- I was looking back at the technical discussion recently and wanted to flag it again, so I'm pleased you did. Would you be comfortable about writing this blog post? Maybe we can put it in a sprint in the next couple upcoming?
@stichbury Yes! I am happy to write something about this.
The Miro board linked above seems to be not viewable by public?
@desmond-dsouza Sorry about that! You are right the link is private, as this is still in draft and there are some internal discussion in the Miro board. I will try to post some screenshot here when we discussed this.
Do you have any view on this? The board is mainly examples, I try to go through all the Slack & Discord question to summarise what are the user problem.
Another question on dynamic pipelines https://www.linen.dev/s/kedro/t/12647827/hello-thank-you-for-this-great-library-i-am-a-ds-working-in-#d170c710-8e6a-4c56-a623-058c3ec33da7
[etape 1] > [etape 2] > [if score_etape2 < X ] > [etape4]
> [if score_etape2 >= X ] > [etape5]
@astrojuanlu This sort of runtime generated DAGs is not supported currently. There are few possible solutions to get around this:
- Embed the if-else condition inside a node
- Use hooks or dynamic injection
We usually advise try to avoid these kind of conditional DAGs as much as possible, because it gets very complicated once you have multiple switches and it is difficult to debug. Having a conditional DAG is not much different from having a if-else
node. i.e.
if score >= 0.5:
return do_a()
else:
return do_b()
The challenge here is the return dataset may be different or not compatible at all.
I haven't done it before, it should be more flexible to do this with the Python API. The code may look like this
# deployment_script.py
result = session.run(tag="some_pipeline")
if result["score"] >= 0.5:
session.run(pipeline="deploy")
else:
session.run(pipeline="retrain")
It does mean that you may lose certain feature and cannot use the Kedro CLI to run this, so use it sparingly.
For me, if we're going to do this properly we need some sort of
- Conditional node
- Some pre-flight checks to validate things
- To push forward with the session store so that the reproducibility concerns can be managed by good logging/time-machine stuff
- [not entirely sure] A way of accessing the catalog in the pipeline registry
X-posting some solution our users is using.
- Create a after_context_created after_catalog_created hook
- Replace the register_pipelines() function with a custom register_dynamic_pipelines(catalog: DataCatalog) function
- If you can pass the catalog to create pipelines, you can access datasets and parameters to dynamically build your pipeline! Use create_pipeline(catalog: DataCatalog) functions to create your pipelines !
Same Hook as above. But, instead of creating pipelines by changing the parameter and dataset names directly we create namespaced pipelines. Along with dataset factories: https://docs.kedro.org/en/stable/data/data_catalog.html#example-3-generalise-datasets-using-namespaces-into-one-dataset-factory - have a look at the catalog.yml. This way , not only is the pipeline dynamic but also your catalog!
Detail solution can be found here.
@noklam To pitch in here, thank you for sharing my solution to this issue tracker! You asked for an example repository, you can find that here: https://github.com/Lodewic/kedro-dynamic-pipeline-hook-example
- [not entirely sure] A way of accessing the catalog in the pipeline registry
@datajoely we are currently looking for a workaround to be able to access parameters from the pipeline registry. In one of our projects (not sure exactly what kedro version was current when it was implemented, def <0.16) we had this as a workaround:
class ProjectContext(KedroContext):
"""Users can override the remaining methods from the parent class here,
or create new ones (e.g. as required by plugins), indicate in settings.py
"""
def _get_pipelines(self) -> Dict[str, Pipeline]:
params = {} if self._extra_params is None else self._extra_params
return create_pipelines(**params)
We are looking for a current "kedroic" way of handling this. I am looking into @Lodewic 's implementation to see how well it fits our usecase, it looks very promising, but a more out of the box solution might be preferable as it requires a certain amount of knowledge about kedro's inner workings which we probably shouldn't expect from general users.
This is neat @inigohidalgo !
This look like the 0.16.x or 0.17.x style of creating pipeline, I actually don't know what happened and why we moved away from this. It used to be possible to access paramaters in create_pipeline
, thus the template is still create_pipeine(**kwargs)
.
Relevant: https://getindata.com/blog/kedro-dynamic-pipelines/ by @marrrcin
In other news, found this on the Dagster Slack:
Seems that the cost of repetitive tedious logic around i/o is much less than trying to piece together the dagster abstraction, given the examples/docs. Feels like every time I try to use frameworks like airflow, kedro, dagster, they all let me down as soon as I start doing anything dynamic.
https://discuss.dagster.io/t/13882234/hello-world-for-dynamic-partition-etl-is-a-nightmare-the-ina#78fea568-9198-48a0-a0ba-42a141f440be
After sharing the blog post on #1606, I was thinking that we should find a more descriptive name for the use case addressed in it. "Dynamic pipelines" seems to imply that the pipelines themselves have some data-dependent runtime behavior or data-dependent structure (the 2 buckets originally devised by @noklam), but taking a pipeline and reusing it with different configurations is hardly "dynamic". We should call this "pipeline reuse" or investigate how other projects (DVC Pipelines, MLFlow recipes, Airflow DAGs) call this concept.
In CI/CD world this sort of thing is often called Matrix job in our examples we want to run something like an "experiment array"
We should note Hydra calls this a multi-run but they also have integration with "Sweepers" which is more intuitive to what we're doing here. The next question raised is how we can make something like Optuna work for our purposes.
The next question raised is how we can make something like Optuna work for our purposes.
When I've used optuna within Kedro I've defined the search space from config but the output was basically just a pickle of the finished optuna run alongside a json with the optimal hyper parameters. Which of optunas features would you see as useful for Kedro?
@inigohidalgo it's more that Hydra counts Optuna, Ax and Nevergrad in the 'sweeper' category
Today @datajoely recommended @marrrcin's approach as an alternative to Ray Tune for parameter sweep https://linen-slack.kedro.org/t/16014653/hello-very-much-new-to-the-ml-world-i-m-trying-to-setup-a-fr#e111a9d2-188c-4cb3-8a64-37f938ad21ff
Are we confident that the DX offered by this approach can compete with this?
search_space = {
"a": tune.grid_search([0.001, 0.01, 0.1, 1.0]),
"b": tune.choice([1, 2, 3]),
}
tuner = tune.Tuner(objective, param_space=search_space)
No but it's does provide a budget version of it - this is what I'm saying about the lack of sweeper integration with dedicated "sweepers" in this comment
I'm now convinced that we should cleanly separate the "dynamic pipelines" conversation as originally stated by @noklam from the parameter sweeping/experimentation/multi-run use case, which is conceptually way simpler and has very clear boundaries and expectations. I propose we continue in https://github.com/kedro-org/kedro/issues/1606
Yup - I think there are two categories as Nok says at the top:
-
"Deterministic pipeline structure generation" (sweeps fall into this, but selection may be the latter)
-
"Runtime dynamic pipeline structure" (Conditional logic introduces combinatorial complexity and possibly makes Kedro turing complete)
There is significant user validation in terms of demand and competitor validation since we see other tools in the space offering this functionality.
I've been a bit outside this discussion, although I'm super interested in the topic. To make sure I understand the two options, I have the following usecase:
I have a pipeline which predicts total demand for a product in a given day, with the day specified as an input parameter to the pipeline.
Some days due to data issues, the prediction will fail, but once the issues are solved in the past, we would like to see how the model would have performed. In order to do this, we have a backfill pipeline set up which loads the predictions dataset, checks for gaps, and launches a pipeline for each missing day. This pipeline as I've described it, is more of an example of the second--harder--view, right? Since the structure of the final pipeline depends on the state of a dataset.
But if on the other hand I simply wanted to define a pipeline which will loop through the last 10 days and run the pipeline with all those last 10 days, regardless of the status of the predictions dataset, would that be an example of 1, where I am just defining a pipeline in a for loop, potentially using code to construct that pipeline based on today's date and whatever number (10 in the example) of days backwards I would want to go which I define through config?
Nice talk on how to do hyperparameter tuning and selection in Flyte https://www.youtube.com/watch?v=UO1gsXuSTzg (key bit starts around 12 mins in)
Optuna + W&B https://colab.research.google.com/drive/1WxLKaJlltThgZyhc7dcZhDQ6cjVQDfil#scrollTo=sHcr30CKybN7
with your permission @datajoely I'm going to copy your comments to #1606, since they're relevant there, and mark them as off-topic here
Docs/content update:
- Creating a separate ticket for the documentation needed for the first part of this storytelling (the pipeline reuse rather than runtime dynamic pipeline usage: https://github.com/kedro-org/kedro/issues/3282
- I've updated https://github.com/kedro-org/kedro-devrel/issues/7 which was to create a blog post but I am deprioritising it in favour of docs changes above.
Another usecase which I'm not sure where it would fall:
I have a time-series problem where I compute a lot of lags, rolling statistics etc. When designing my training pipeline, I have a target number of days I want my master table to include.
Due to the way lags are carried out in pandas, we need to pad our initial queries by the maximum length of lag, as otherwise we would get nulls at the start. This maximum would then be an input to some initial nodes which filter sql tables.
Technically there is no "data" dependency, since it would purely be based on prespecified parameters, but there is a point where a "max" or something needs to be calculated.
On this last point @inigohidalgo a lot of users ask can I run something like kedro run --params target_date:2023-11-01
and whilst its technically possible it's not nice to feed runtime arguments into catalog definitions to dynamically change load behaviour.
Hi everyone,
I wanted to share a somewhat hacky method we implemented for creating a dynamic pipeline. Our pipeline required reprocessing a dataset for previous dates based on the runtime parameter run_date
. Here's a simplified representation of the process:
I'll describe what we ended up doing below. I would appreciate any feedback or recommendations you might have.
Modular Pipelines and Namespaces
First, we leveraged modular pipelines and namespaces to create a dynamic reprocessing capability. The goal was to reprocess datasets for previous dates without rerunning certain parts of the pipeline (specifically the feature engineering boxes, labeled as FE1, FE2, and FE3).
The Reprocess
pipelines were instantiated as follows:
pipes = []
for i in range(1, 6):
t_version = pipeline(
pipe=check_requirements + shape_target + shape_master_table,
namespace=f"t-{i}",
tags=["delta_t"],
)
pipes.append(t_version)
t_n_pipelines = sum(pipes)
In this setup, each reprocessing pipeline (t-1
to t-5
) is created with a unique namespace. This allows us to isolate the processing for different time periods. Notably, the feature engineering steps (FE1, FE2, FE3) do not run in the reprocess parts of the pipeline, as they are only relevant for the initial processing (t=0
).
Next, we created these entries in the catalog.yml
for the Δ_t versions of the dataset:
# A type of SQLQueryDataset used in "Some ETL" box
## t=0 / Original Version
EX_DATASET:
type: "${_datasets.sqlscript}"
credentials: oracle_credentials
filepath: queries/EX_DATASET.sql
query_args:
run_date: ${runtime_params:run_date}
use_case: ${runtime_params:use_case}
## t=Δ Version
"{namespace}.EX_DATASET":
type: "${_datasets.sqlscript}"
credentials: oracle_credentials
filepath: queries/EX_DATASET.sql
query_args:
run_date: "{namespace}"
use_case: ${runtime_params:use_case}
# The same for other types like D_n or resulting GenMT
## t=0 / Original Version
D1:
type: "${_datasets.parquet}"
filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/${runtime_params:run_date}/D1.parquet"
credentials: azure_credentials
## t=Δ Version
"{namespace}.D1":
type: "${_datasets.parquet}"
filepath: "${_azure_base_path}/04_feature/${runtime_params:use_case}/{namespace}/D1.parquet"
credentials: azure_credentials
We initially thought this approach would suffice if we could somehow perform a nested interpolation of the namespace
to its value. However, the resolution of the config happens when the config is loaded before a session is run. The dataset factory placeholders are resolved later when the pipeline is being executed (see Kedro issue 3086).
So Hooks🪝...
Since hooks are stateful objects (see Kedro issue 2690), we created a DatesDeltaToContextHook
to handle the dynamic aspects. Here's what it does:
-
after_context_created
: Creates and stores the namespacedrun_date
parameters.def after_context_created(self, context) -> None: """ Create t-0 -> t-5 of rundate and add to context to generate catalog. """ run_date = context.params.get("run_date", None) self.delta_pattern = r"t-\d+" if run_date: # Calculate t-0 to t-5 of run_date to use in catalog generation self.formatted_dates = self._gen_time_delta(run_date) context.config_loader["parameters"] = { **context.config_loader["parameters"], **self.formatted_dates, }
-
after_catalog_created
: Modifies the dataset instances that match the namespace pattern.def after_catalog_created(self, catalog: DataCatalog) -> None: # return None """ Modify dataset filepaths/sql in the catalog based on the delta run_dates from the parameters. """ _pipelines: Dict[str, Pipeline] = dict(pipelines) LOGGER.info("Enforcing data set pattern discovery...") # Capture all data set names from all pipelines data_set_names = { data_set_name for pipeline in _pipelines.values() for data_set_name in pipeline.datasets() } # filter based on match the pattern t-number (e.g. t-1, t-2, t-3 ...) excluding the ones with `params:` data_set_to_alter = { data_set_name for data_set_name in data_set_names if re.search(self.delta_pattern, data_set_name) and "params:" not in data_set_name } for data_set_name in data_set_to_alter: try: t_delta, _ = data_set_name.split(".") # Enforce data set pattern discovery dataset = catalog._get_dataset(data_set_name) # pylint: disable=protected-access run_date = self.formatted_dates.get(f"{t_delta}.run_date") match dataset: case SQLScriptDataset(): self._update_dataset_sql_query(dataset, run_date) case MemoryDataset(): pass case _: self._update_dataset_filepath(dataset, run_date) except DatasetNotFoundError: continue
Feel free to provide any feedback or suggestions. Thank you!
Thank you for such a clear write up @gitgud5000 - I'm so keen to make this use-case ergonomic and this is so helpful