[Feature Request] Facilitate composition of "sub-pipelines," e.g. component collections
I'm requesting constructs to facilitate the composition of sub-pipelines. This would enable platform owners using TFX to provide predefined sub-pipeline "recipes" that users can then piece together—passing relevant artifact references from one sub-pipeline to another—to form their final pipelines.
I'd be happy to contribute work towards this effort on behalf of my team if the TFX team does not have something incoming.
While it's technically possible to do this, I believe that a native solution would make for a better ease-of-use story. Consider, for instance, the following code snippet, which accomplishes the task I'm describing using vanilla dictionaries:
def pipe0(cmp0, cmp1) -> Dict[str, BaseComponent]:
foo = ComponentFoo(cmp0.outputs.output)
bar = ComponentBar(
arg0=foo.outputs.something,
arg1=cmp1.outputs.output,
)
return dict(foo=foo, bar=bar)
def pipe1(cmp0) -> Dict[str, BaseComponent]:
return dict(
baz=ComponentBaz(arg0=cmp0.outputs.foo)
)
def new_pipeline() -> pipeline.Pipeline:
cmp0 = SomeComponent()
cmp1 = SomeOtherComponent()
sp0 = pipe0(cmp0, cmp1)
sp1 = pipe1(sp0['foo'])
return pipeline.Pipeline(
pipeline_name='composed pipeline',
components=[cmp0, cmp1] + itertools.flatten(sp0.values(), sp1.values()),
)
There are several issues with this approach:
- Considerable boilerplate in each of
pipe0andpipe1—each Component needs to be manually assigned a key in the dictionary; - Resorts to arbitrary mapping between string keys and their associated Component (could use
component_idproperty, but point still somewhat holds); - No good way to aid discoverability of what constitutes any given sub-pipeline.
- Use of dictionaries to provide named accessors requires the cumbersome use of
flattenanddict.values()in order to combine into the finalpipeline.PipelineComponent collection.
Imagine, then, some state where providers of such sub-pipelines need not worry about maintaining such consistency in their boilerplate implementations, and can instead defer to a standard SubPipeline construct that provides easy access to the constituent Component(s) through some combination of component type (if no duplicates), component_id (if multiple Components of the same type).
def pipe0(cmp0, cmp1) -> SubPipeline:
foo = ComponentFoo(
name='foo',
arg0=cmp0.outputs.output,
arg1=10,
)
bar = ComponentFoo(
name='bar',
arg0=foo.outputs.something,
arg1=cmp1.outputs.output,
)
return SubPipeline(foo, bar)
def pipe1(cmp0) -> SubPipeline:
return SubPipeline(ComponentBaz(arg0=cmp0.outputs.foo))
def new_pipeline() -> pipeline.Pipeline:
cmp0 = SomeComponent()
cmp1 = SomeOtherComponent()
sp0 = pipe0(cmp0, cmp1)
sp1 = pipe1(sp0.get_component(ComponentFoo, 'bar'))
# only one type of ComponentBaz, so no need to use name
cmp2 = OneMoreComponent(arg=sp1.get_component(ComponentBaz))
return pipeline.Pipeline(
pipeline_name='composed pipeline',
components=[cmp0, cmp1, cmp2] + sp0 + sp1,
)
Does the TFX team have any thoughts around this particular proposal? I'd be curious to hear if you are all open to such an offering.
Hi, thanks for the proposal, we will discuss about this sub pipeline concept with the team
could you provide more context about your use case? based on my understanding, the 'sub-pipeline' here is not a hierarchical pipeline structure (e.g., sub-dag in airflow), it's more like part of the pipeline?
FYI, the componet_id is basically class name + optional unique name for multiple instance.
based on my understanding, the 'sub-pipeline' here is not a hierarchical pipeline structure...it's more like part of the pipeline?
That's what I had in mind, yes. Essentially, it'd be an archetypal part of a pipeline that can be parametrized and combined with other parts to form the final, over-arching pipeline.
Could you provide more context about your use case?
Happy to.
To first provide some context, my team at Twitter maintains an internal ML-centric workflow orchestration platform. Within it, we prescribe specific ways of carrying out certain self-contained tasks, much as TFX does with its own provided Components. We currently do this with pre-built Airflow Operators that our users either use directly or inherit from; with an eventual shift towards a TFX-centric platform, we intend to do the same with pre-built Components.
We'd like to take this approach to the next level to similarly prescribe certain workflows. This would be particularly useful for commonly-encountered, higher-level operations that wouldn't fit neatly into the Component paradigm without resulting in bloated, over-extended scope. That way, instead of my team having to provide documentation along the lines of "to solve this problem, take Components A, B, and C, and wire them up this specific way," we'd then be able to tell users to simply take some_prepackaged_subpipeline(with_args) and wire that up to the rest of their pipeline, like illustrated above.
Hope that helps illustrate our motivations in this feature request. Let me know if there's anything I can expand on.
Hi @jinnovation, thanks for bringing this up as a feature request.
As @1025KB pointed out, your case case seems satiable with a good pattern of flattened pipeline constructing methods. We are definitely happy to provide assistance and more helper functions to make this easier, as I think this mostly lands on pipeline-DSL compilation phase and the complexity should be controllable.
Meanwhile, the idea of supporting a more hierarchical sub-pipeline has been raised in the past, to support some possibly different set of features, including but not limited to:
- reusing a pipeline someone else built;
- embed a serialized pipeline into current pipeline;
- more complex condition and loop support (i.e, loop over a list of components rather than a single one);
However, this also bears quite some complexity over caching, inferring input/output signature, and cross-orchestrator portability. If your team feel interested in this topic, we can have a deep dive and help out to flush it as a formal goal.
Please let me know what you think.
...your case case seems satiable with a good pattern of flattened pipeline constructing methods. We are definitely happy to provide assistance and more helper functions to make this easier, as I think this mostly lands on pipeline-DSL compilation phase and the complexity should be controllable.
Appreciate it. That's pretty much in line with my understanding of the scope here, so I'm glad your team also sees this as relatively manageable.
...the idea of supporting a more hierarchical sub-pipeline has been raised in the past, to support some possibly different set of features...
I'd say that our immediate use case would be plenty satisfied by "flattened pipeline constructing methods," as you put it. The idea of overall pipeline reuse, however, does seem to naturally follow, so I'd be interested in any related issues/proposals, past or future.
To first provide some context, my team at Twitter maintains an internal ML-centric workflow orchestration platform. Within it, we prescribe specific ways of carrying out certain self-contained tasks, much as TFX does with its own provided Components. We currently do this with pre-built Airflow Operators that our users either use directly or inherit from; with an eventual shift towards a TFX-centric platform, we intend to do the same with pre-built Components.
We'd like to take this approach to the next level to similarly prescribe certain workflows. This would be particularly useful for commonly-encountered, higher-level operations that wouldn't fit neatly into the Component paradigm without resulting in bloated, over-extended scope. That way, instead of my team having to provide documentation along the lines of "to solve this problem, take Components A, B, and C, and wire them up this specific way," we'd then be able to tell users to simply take
some_prepackaged_subpipeline(with_args)and wire that up to the rest of their pipeline, like illustrated above.
This seems to be pretty close to the concept of "graph components" in Kubeflow Pipelines. To the pipeline author they look the same as single components (they're loaded and instantiated the same way). https://github.com/kubeflow/pipelines/blob/b79c9e15ebf664cca3d5861c74360ec854f9486f/sdk/python/tests/components/test_python_pipeline_to_graph_component.py#L42
@jinnovation @zhitaoli Can you please close the issue as the discussion as it is not valid. Thanks!
Hi, feel free to close this