kale
kale copied to clipboard
Kale SDK: Graph contains a cycle or graph changed during iteration
python: 3.8.8
pip: 21.3.1
kubeflow-kale: 0.7.0
Aiming to use kale sdk to compile (and run) pipeline in on-prem kubeflow environment as per documentation https://docs.arrikto.com/release-1.4/user/kale/sdk/pipelines.html#procedure
example kale_sdk.py
from kale.sdk import pipeline, step
from sklearn.datasets import make_classification
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
@step(name="data_loading")
def load(random_state):
"""Create a random dataset for binary classification."""
rs = int(random_state)
x, y = make_classification(random_state=rs)
return x, y
@step(name="data_split")
def split(x, y):
"""Split the data into train and test sets."""
x, x_test, y, y_test = train_test_split(x, y, test_size=0.1)
return x, x_test, y, y_test
@step(name="model_training")
def train(x, x_test, y, training_iterations):
"""Train a Logistic Regression model."""
iters = int(training_iterations)
model = LogisticRegression(max_iter=iters)
model.fit(x, y)
print(model.predict(x_test))
@pipeline(name="binary-classification", experiment="kale-tutorial")
def ml_pipeline(rs=42, iters=100):
"""Run the ML pipeline."""
x, y = load(rs)
x, x_test, y, y_test = split(x, y)
train(x, x_test, y, iters)
if __name__ == "__main__":
ml_pipeline(rs=42, iters=100)
Issue 1. entry point doesn't seem to work as expected on the module level
python -m kale --help /opt/conda/bin/python: No module named kale.__main__; 'kale' is a package and cannot be directly executed
While using kale
binary seems to only work with notebook NB
kale
usage: kale [-h] --nb NB [--upload_pipeline] [--run_pipeline] [--debug] [--experiment_name EXPERIMENT_NAME] [--pipeline_name PIPELINE_NAME]
[--pipeline_description PIPELINE_DESCRIPTION] [--docker_image DOCKER_IMAGE] [--kfp_host KFP_HOST] [--storage-class-name STORAGE_CLASS_NAME]
[--volume-access-mode VOLUME_ACCESS_MODE]
kale: error: the following arguments are required: --nb
Issue 2. docstring in the @step and @pipeline causes error. (Removing docstrings leads to another issue)
python kale_sdk.py
Traceback (most recent call last):
File "kale_sdk.py", line 47, in <module>
ml_pipeline(rs=42, iters=100)
File "/opt/conda/lib/python3.8/site-packages/kale/sdk/api.py", line 85, in _do
processor = PythonProcessor(func, config)
File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 57, in __init__
self.validate(fn_source)
File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 62, in validate
self._validate_function_body(fn_source)
File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 133, in _validate_function_body
raise RuntimeError("ast.Expr value is not a ast.Call node")
RuntimeError: ast.Expr value is not a ast.Call node
Issue 3: When removing docstrings, and having more than 1 step DAG creation fails.
2022-01-19 17:22:18 Kale podutils:255 [INFO] Getting the base image of container...
2022-01-19 17:22:18 Kale podutils:84 [INFO] Getting the current container name...
2022-01-19 17:22:18 Kale podutils:89 [INFO] <CONTAINER NAME>
2022-01-19 17:22:18 Kale podutils:268 [INFO] Retrieved image: <MT IMAGE>
2022-01-19 17:22:18 Kale kale [INFO] Registering Step 'data_loading'
2022-01-19 17:22:18 Kale kale [INFO] Registering Step 'data_split'
2022-01-19 17:22:18 Kale kale [INFO] Registering Step 'model_training'
Traceback (most recent call last):
File "kale_sdk.py", line 43, in <module>
ml_pipeline(rs=42, iters=100)
File "/opt/conda/lib/python3.8/site-packages/kale/sdk/api.py", line 86, in _do
pipeline_obj = processor.run()
File "/opt/conda/lib/python3.8/site-packages/kale/processors/baseprocessor.py", line 44, in run
self.to_pipeline()
File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 83, in to_pipeline
self.pipeline_fn()
File "kale_sdk.py", line 39, in ml_pipeline
train(x, x_test, y, iters)
File "/opt/conda/lib/python3.8/site-packages/kale/step.py", line 70, in __call__
return execution_handler(self, *args, **kwargs)
File "/opt/conda/lib/python3.8/site-packages/kale/processors/pyprocessor.py", line 90, in _register_step_handler
self.pipeline.add_step(step)
File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 222, in add_step
if step.name in self.steps_names:
File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 243, in steps_names
return [step.name for step in self._topological_sort()]
File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 243, in <listcomp>
return [step.name for step in self._topological_sort()]
File "/opt/conda/lib/python3.8/site-packages/kale/pipeline.py", line 290, in _steps_iterable
for name in step_names:
File "/opt/conda/lib/python3.8/site-packages/networkx/algorithms/dag.py", line 246, in topological_sort
for generation in nx.topological_generations(G):
File "/opt/conda/lib/python3.8/site-packages/networkx/algorithms/dag.py", line 177, in topological_generations
raise nx.NetworkXUnfeasible(
networkx.exception.NetworkXUnfeasible: Graph contains a cycle or graph changed during iteration
Expectation is to compile and run pipeline using sdk defined in the python file as it is outlined in the documentation.
Would appreciate some pointers on how to correctly use the SDK and why I am experiencing these errors.
I figured that it breaks if the variable names of outputs from each step overlap.
Changing variable x to x1; y to y1 names allows dag to succeed.
@pipeline(name="binary-classification", experiment="kale-tutorial")
def ml_pipeline(rs=42, iters=100):
x, y = load(rs)
x1, x_test, y1, y_test = split(x, y)
train(x1, x_test, y1, iters)
Is this the expected behavior in the SDK? Perhaps the example needs to be updated on the documentation?
This is caused by the way pipeline steps are added to DAG in pyprocessor.py:
def _link_step(self, step: Step):
ins_left = set(step.ins.copy())
ins_left.difference_update(set(self.pipeline.pipeline_parameters))
for anc_step in reversed(list(self.pipeline.steps)):
if ins_left.intersection(set(anc_step.outs)):
self.pipeline.add_dependency(anc_step, step)
ins_left.difference_update(set(anc_step.outs))
When _link_step
is called on a given step it already has been placed in the DAG but with no edges.
So when anc_step
becomes equal to step
it is linked to itself because: ins_left.intersection(set(anc_step.outs))
is true.
I believe at a bare minimum Runtime exception
should be raised in PythonProcessor._register_step_handler
if outputs and inputs of a step have non-empty intersection.
I can try to add modification which would allow such situation to occur.
I'm experiencing the first issue as well, did you manage to resolve it?