pipelines icon indicating copy to clipboard operation
pipelines copied to clipboard

inputPath data passing with large (100+GB) datasets

Open boarder7395 opened this issue 3 years ago • 10 comments

In the best practices the recommended workflow is each component of the pipeline is self-contained and data is passed through usage of inputPath and outputPath. If external data needs to be pulled then a component should be created that only downloads the data. This is shown in the simple pipeline below: Screen Shot 2020-09-29 at 9 29 11 AM

When designing a pipeline with a component utilizing dask_kubernetes using inputPath and outputPath pattern has a few issues:

  1. Without a storage volume the components are dependent upon the storage of the node.
  2. Data is copied from outputPath of previous pod to inputPath next pod. When using a storage volume this results in data being copied from one location on the volume to another location on the volume.
  3. ? Not sure if data can be passed to multiple pods using inputPath, and outputPath and if so does this copy the data x times?

I came across an experimental feature to use volumes for data passing, but I am not sure the status of this feature since the code is not available in the kfp package unless I update the init.py file in kfp.dsl to include data_passing_methods. https://github.com/kubeflow/pipelines/commit/54a596abd837586e294b03c31d10ac61d9273f1a#diff-19db57446e9b12ad2beb5df35a5094eb Although when I run this I keep getting the following error: invalid spec: templates.download.outputs.parameters.download-location-subpath.valueFrom not specified

Alternative approach: Use volume, pass strings between components outlining where on the volume the data is stored. When using this approach it requires the components to write a file location to an outputPath, and then the next component in the pipeline will use that file location as the path to input data from.

Screen Shot 2020-09-30 at 4 36 23 PM

This solution solves the issue of data storage since the volume can be sized based on the component’s needs. But does not work well with the api.

  1. Add an inputValue that is the output location on the volume.
  2. Write a file with that inputValue inside to pass to the next component.

A third scenario when working with dask_kubernetes or other distributed frameworks: Screen Shot 2020-09-30 at 4 32 42 PM

In this scenario the volume supports ReadWriteMany. The first component downloads data from s3 (or other storage) onto the volume. The second component expects the data to be available on the volume. This component launches some number of dask worker pods that access the data from the volume and do some preprocessing and then writes output to the volume. The final component takes the data written by the dask worker pods and copies that data back to s3 (or other storage). This pattern would reduce the total number of IO operations to (s3 or GCP) since the first and last components of a pipeline would be the only components to access (s3 or GCP) instead of having every component access s3 or GCP.

Am I going about this the right way or did you have a different pattern in your mind when designing the data passing mechanisms? If needed I would be happy to help on any work required to build out this functionality!

boarder7395 avatar Sep 30 '20 20:09 boarder7395

@Ark-kun Adding you on this because looks like you built out the experimental feature for data passing with a volume

Some Updates from today's work: It looks using volumes to pass data is the correct method for my workflow. 54a596a#diff-19db57446e9b12ad2beb5df35a5094eb

Although there are several bugs that prevent that experimental feature from working:

  1. kfp.dsl.data_passing_methods is not importable with the current kfp pypi package when git cloning kfp and installing locally it works fine. Continued looking into this and saw that this feature was added after the kfp 1.0.1 release and should be in available in the next release!
  2. argo 2.3 does not support using the value field (see PR https://github.com/argoproj/argo/pull/1336/files). When I look at the 2.3.0 tag on argo it looks like this change did not make it in until argo 2.4.0
  3. The example in https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/_pipeline.py#L169 still uses V1PersistentVolumeClaim instead of V1PersistentVolumeClaimVolumeSource so that should be updated as well.

The third is an easy fix and happy to submit that PR. As for the second I am trying to figure out if I can change the workflow-controller argo version manually on my cluster but I haven't had any luck to this point. (I temporarily changed the version by modifying the pod directly for testing only and confirmed argo 2.5.0 did support the value field in place of the valueFrom field).

boarder7395 avatar Oct 01 '20 19:10 boarder7395

Should not be a problem to upgrade ur Argo. If u look at the manifest for this repo (instead of kubeflow/manifest) it is on Argo 2.7.5.

eterna2 avatar Oct 02 '20 05:10 eterna2

@Ark-kun I have implemented a solution using the data_passing_methods as discussed but ran into some issues with artifacts. The MLPipeline UI metadata my component outputs does not show up in the Kubeflow UI. Is there a way to exclude the MLPipeline UI metadata from using the volume used in data_passing_methods?

boarder7395 avatar Oct 09 '20 20:10 boarder7395

I was thinking of a similar issue for my organization. Kubeflow Pipelines Pure Components use inputPath to copy data. If the dataset size is large, the file might not fit in the ephermeral storage in node.

However all inputPath and outputPath use /tmp as a prefix. In case of very large data, a possible workaround is to mount a PVC to /tmp directory.

munagekar avatar Dec 14 '20 07:12 munagekar

@Ark-kun Just surfacing this again. I am using the experimental feature for data_passing_methods with a Kubernetes Volume but its current implementation prevents the usage of Artifacts such as UI metadata. I have tested without data_passing using a volume and the artifact is available in the UI, but when I enable it the artifact is no longer present.

A bigger question here is what direction is the data passing going when dealing with large datasets being passed between components. From my understanding the current setup by default runs a step; zips each of the outputs; copies that data to minIO; then the next component copies that data from minIO to the next component.

Whereas the data_passing with a volume can skip all the data copies and directly shares data between components without copying the data.

What work remains to move data_passing_methods from an experimental feature? I am happy to pick up some work on this effort!!

boarder7395 avatar Feb 02 '21 17:02 boarder7395

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Jun 02 '21 17:06 stale[bot]

Hi, is there any update on the issue? Any ETA for the feature or any help needed in some area?

mikwieczorek avatar Jan 12 '22 09:01 mikwieczorek

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Apr 17 '22 06:04 stale[bot]

Is there any update to this situation I was wondering since I am encountering a similar situation

Davidnet avatar Aug 16 '22 21:08 Davidnet

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

github-actions[bot] avatar Jun 25 '24 07:06 github-actions[bot]