flyte icon indicating copy to clipboard operation
flyte copied to clipboard

Added literal offloading for array node map tasks

Open pmahindrakar-oss opened this issue 1 year ago • 1 comments

Tracking issue

RFC https://github.com/flyteorg/flyte/pull/5103 This PR tackles the mapped o/p results exceeeding the offloaded min size

Why are the changes needed?

Without these changes we hit grpc limit when propeller tries to create an execution by passing inputs inline which it got from output of large map task

Here you can see the failure where we are hitting the default 4MB grpc message size limit This is the line where it fails https://github.com/flyteorg/flyte/blob/7136919f271ec1a5b0fa0ccd29175191d5ceffee/flytepropeller/pkg/controller/nodes/subworkflow/launchplan/admin.go#L151

[UserError] failed to launch workflow, caused by: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (26624663 vs. 4194304)

See the testing details for the workflow used. In particular this section

def ref_wf(mbs: int):
    processed_results = my_wf(mbs=mbs)  # Promise of List[str]
    big_inputs_wf_lp(input=processed_results) 

Where we have large task output being returned in processed results and is being passed to launchplan and propeller sends these inputs inline and hence if the value is too large then we exceed the grpc limit

What changes were proposed in this pull request?

  • Adds literal offloading config which provides a way to define min and max offloading size for the literal and also minmum supported SDK version
  • After the outputs are gathered from the map tasks and before writing the outputs of the combined literal, we write the data to offloaded location and clear out the literal value and store the offloaded location in the literal and in that way when we create execution from propeller the inputs are small and only contain references to the data and not the actual data.

How was this patch tested?

Using the following workflow which generates ~ 20 MB map output when launched with input =20 for the following workflow ref_wf

Workflow used for testing

import logging
from typing import List
from flytekit import map_task, task, workflow, LaunchPlan

# Set the logging level to DEBUG
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger("flytekit")
logger.setLevel(logging.DEBUG)

# Task to generate 1MB of string
@task
def my_1mb_task(i: str) -> str:
    return f"Hello world {i}" * 100 * 1024

# Task to generate a list of strings
@task
def generate_strs(count: int) -> List[str]:
    return ["a"] * count

# Workflow to process the list of strings and return processed results
@workflow
def my_wf(mbs: int) -> List[str]:
    strs = generate_strs(count=mbs)
    return map_task(my_1mb_task)(i=strs)

# Task to handle inputs (with metadata printing)
@task
def noop_with_inputs(input: List[str]):
    # Print metadata about the input
    print("Input Metadata:")
    print(f"Number of items: {len(input)}")
    print(f"Total size: {sum(len(s) for s in input)} bytes")

# Workflow to handle big inputs
@workflow
def big_inputs_wf(input: List[str]):
    noop_with_inputs(input=input)

# LaunchPlan for big inputs workflow
big_inputs_wf_lp = LaunchPlan.get_or_create(name="big_inputs_wf_lp", workflow=big_inputs_wf)

# Main workflow that orchestrates other workflows
@workflow
def ref_wf(mbs: int):
    processed_results = my_wf(mbs=mbs)  # Promise of List[str]
    big_inputs_wf_lp(input=processed_results) 

Before the change receive the following error

[UserError] failed to launch workflow, caused by: rpc error: code = ResourceExhausted desc = grpc: received message larger than max (26624663 vs. 4194304)

After the Change

The workflow succeeds

Following is the subworkflow crd which shows the offloaded uri being used as input when propeller launches and execution

inputs:
  literals:
    input:
      collection:
        literals:
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
        - scalar:
            primitive:
              stringValue: ""
      sizeBytes: "26624325"
      uri: s3://my-s3-bucket/metadata/propeller/flytesnacks-development-ak9rtgqwlwnvhbjqsmrk/n0/data/0/n1/0/o0_offloaded_metadata.pb

Notice the zero value of the literal and we preserve the type for compilation reasons so that we can validate the type before launching the execution.

Logs from flytekit reading the offloaded literal and passing this in subworkflow. This uses the flytekit changes from here

https://github.com/flyteorg/flytekit/pull/2685 which reads the offloaded literal if present

Download data to local from s3://my-s3-bucket/metadata/propeller/flytesnacks-development-ak9rtgqwlwnvhbjqsmrk/n0/data/0/n1/0/o0_offloaded_metadata.pb. [Time: 0.081294s]", "taskName": null}

Setup process

Screenshots

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [ ] All new and existing tests passed.
  • [ ] All commits are signed-off.

Related PRs

Docs link

pmahindrakar-oss avatar Aug 27 '24 06:08 pmahindrakar-oss

Codecov Report

Attention: Patch coverage is 80.59701% with 26 lines in your changes missing coverage. Please review.

Project coverage is 36.29%. Comparing base (89efcc6) to head (4a03f91). Report is 165 commits behind head on master.

Files with missing lines Patch % Lines
...lytepropeller/pkg/controller/nodes/common/utils.go 78.18% 8 Missing and 4 partials :warning:
flytepropeller/pkg/controller/config/config.go 75.00% 4 Missing :warning:
...propeller/pkg/apis/flyteworkflow/v1alpha1/iface.go 0.00% 2 Missing :warning:
...epropeller/pkg/compiler/transformers/k8s/inputs.go 66.66% 2 Missing :warning:
flytepropeller/pkg/controller/controller.go 0.00% 2 Missing :warning:
...ytepropeller/pkg/controller/nodes/array/handler.go 71.42% 1 Missing and 1 partial :warning:
flytepropeller/pkg/controller/nodes/executor.go 50.00% 1 Missing and 1 partial :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5697      +/-   ##
==========================================
+ Coverage   36.21%   36.29%   +0.07%     
==========================================
  Files        1303     1305       +2     
  Lines      109568   109991     +423     
==========================================
+ Hits        39683    39924     +241     
- Misses      65764    65912     +148     
- Partials     4121     4155      +34     
Flag Coverage Δ
unittests-datacatalog 51.37% <ø> (ø)
unittests-flyteadmin 55.62% <100.00%> (-0.01%) :arrow_down:
unittests-flytecopilot 12.17% <ø> (ø)
unittests-flytectl 62.21% <ø> (-0.05%) :arrow_down:
unittests-flyteidl 7.12% <ø> (ø)
unittests-flyteplugins 53.35% <ø> (ø)
unittests-flytepropeller 41.87% <75.92%> (+0.11%) :arrow_up:
unittests-flytestdlib 55.21% <ø> (-0.15%) :arrow_down:

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

codecov[bot] avatar Aug 27 '24 06:08 codecov[bot]

High-level question - it looks like we're using semver comparison for the flytekit version for the specific task that is executing. Is this right? For example we have a workflow with and ArrayNode (n0) that writes sends data to a TaskNode (n1). We're checking if the flytekit version of the ArrayNode task for n0 supports offloading. IIUC that task will execute normally, we will transparently offload, and we are really concerned whether the task for n1 supports offloading.

@hamersaw I agree with this concern and i think we had raised a similar concern during the spec review @katrogan and we had decided to allow things to fail for these heterogeneous scenarios .

Still trying to get to understand the type system and propeller code but If we need to support this what do you think would be needed here. We could fail fast at execution time if we know this is bound to happen and prevent it from happening ?

pmahindrakar-oss avatar Aug 30 '24 16:08 pmahindrakar-oss

@pmahindrakar-oss

Still trying to get to understand the type system and propeller code but If we need to support this what do you think would be needed here.

This is really difficult, because it's unclear if we want to fail just because propeller supports offloading and the task (or downstream tasks) do not. If the outputs aren't large enough to offload, we would want it to run successfully rather than fail even though it could succeed right?

If we absolutely need to validate the flytekit version of tasks to support offloading the only place that makes sense to me is when propeller begins evaluation (in the handleReadyWorkflow function). We can't do it during registration or compilation because flyteadmin doesn't know if propeller has offloading enabled. In this function we could iterate through all tasks in the WorkflowSpec and validate the flytekit versions the same way you already have. This would have to parse through SubWorkflows and LaunchPlans as well. Of course, this still wouldn't catch issues with dynamics since we do not know what the underlying workflow looks like.

We could fail fast at execution time if we know this is bound to happen and prevent it from happening ?

This would be much easier. In the node executor we copy inputs from previous nodes when transitioning the node from NotYetStarted to Queued here if the inputs are offloaded literals and the node is a TaskNode that does not have a compatible flytekit version we can fail. It might make more sense to do this in the TaskHandler, or PodPlugin? Offloaded literals won't work for things like Spark, Dask, Agents, etc right? I have to admit I wasn't able to read the doc in depth.

hamersaw avatar Aug 30 '24 20:08 hamersaw

Thanks @hamersaw . Would want to avoid complicated logic if possible and rely on a rollout process to solve it. Since we are enabling offloading behind a config, there is controlled rollout that is possible . By default i will keep this disabled and have notes to enable this only when all the workflows/tasks/launchplans have been upgraded to use the new SDK version. This statement though sounds very heavy handed and not sure how many of our users would be willing to do this, but given that this solves a very peculiar problem when the data outputed by the task is large, i am assuming the user has already worked around this by breaking his workflows or increasing the grpc message size and when he is trying to enable this would be aware of the release note actions and hence my point of allowing things to fail for this heterogeneous case .

If we cant rely on this process and we want automated checks then we should think about how we can make this fully work (even dynamic case) for old workflows without failing them

Offloaded literals won't work for things like Spark, Dask, Agents, etc right

yes this is not supported for plugins . I dont think we have even scoped this for plugins in general as we are tackling the map task as the first usecase of this broad problem with large datasets

cc : @katrogan

pmahindrakar-oss avatar Aug 30 '24 21:08 pmahindrakar-oss

This would be much easier. In the node executor we copy inputs from previous nodes when transitioning the node from NotYetStarted to Queued here if the inputs are offloaded literals and the node is a TaskNode that does not have a compatible flytekit version we can fail. It might make more sense to do this in the TaskHandler, or PodPlugin? Offloaded literals won't work for things like Spark, Dask, Agents, etc right? I have to admit I wasn't able to read the doc in depth.

This sounds like a reasonable, we don't attempt to type check at the onset of execution but can lazily decide to fail if a downstream task appears incapable of consuming an offloaded output but only in the scenario where we have offloaded an output.

Re: plugins: I believe spark, agents and flytekit plugins should continue to work (they use flytekit type transformers to unmarshal literals which is the SDK check we're gating against). For example, spark calls pyflyte execute: https://github.com/flyteorg/flytekit/blob/master/plugins/flytekit-spark/flytekitplugins/spark/task.py#L221

cc @eapolinario would love for you to double check this

There is follow up work to make copilot data reading work that is slated for after this (thank you @pmahindrakar-oss )

katrogan avatar Sep 03 '24 13:09 katrogan

@katrogan @hamersaw tested with new changes which now does version checks in heterogeneous tasks scenario and heres the result Screenshot 2024-09-05 at 5 09 01 PM

pmahindrakar-oss avatar Sep 06 '24 00:09 pmahindrakar-oss