metaflow icon indicating copy to clipboard operation
metaflow copied to clipboard

Feat: Add the option to use shell with the metaflow python Runner

Open HarryAnkers opened this issue 1 year ago • 2 comments

Hi, Raising a pr to allow the subprocesses to be executed directly through the shell.

Locally this had a massive performance improvement for an extreamly lightweight flow for me. 18.618823051452637 to 3.2076590061187744

I'm not 100% on where the increase comes from. I suspect this is as the shell is caching python instead of directly executing it each time.

Thank you

HarryAnkers avatar Sep 30 '24 18:09 HarryAnkers

@HarryAnkers, do you have a reproducible example that demonstrates the speed-up? We don't set shell=True for many reasons (security, portability, etc.), and I am curious to dig into what is actually causing the slowdown/speedup. That will help us understand whether this PR is in the right direction or not.

savingoyal avatar Oct 01 '24 02:10 savingoyal

@savingoyal thanks for the reply.

Yes here's a two file setup that shows this issue

test.py

import json
import os
import subprocess
import tempfile
import time
from typing import Any

from metaflow import Runner


def test_a(
    workflow_module_path: str,
    workflow_params: dict[str, Any],
    workflow_input_payload: str,
) -> None:
    start_time = time.time()
    print(f"Starting runner execution_time={time.time() - start_time}")
    with Runner(workflow_module_path, pylint=False) as runner:
        running = runner.run(**workflow_params, input_payload=workflow_input_payload)
        print(f"Runner run execution_time={time.time() - start_time}")

        if running.status == "successful":
            print(f"Runner completed execution_time={time.time() - start_time}")


def test_b(
    workflow_module_path: str,
    workflow_params: dict[str, Any],
    workflow_input_payload: str,
) -> None:
    start_time = time.time()
    print(f"Starting runner workaround execution_time={time.time() - start_time}")
    with tempfile.TemporaryDirectory(dir="/tmp") as temp_dir:
        tfp_runner_attribute = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)

        print(f"Started runner execution_time={time.time() - start_time}")
        subprocess.run(
            f"python {workflow_module_path} --no-pylint run --runner-attribute-file '{tfp_runner_attribute.name}' --input_payload '{workflow_input_payload}'",
            shell=True,
        )
        print(f"Runner workaround completed execution_time={time.time() - start_time}")


if __name__ == "__main__":
    cwd = os.path.dirname(os.path.abspath(__file__))
    workflow_module_path = os.path.join(cwd, "workflow.py")

    workflow_input_payload = json.dumps(
        {"request_id": "5c76e196-5ef2-443d-a036-25755a9a5fdd"}
    )

    workflow_params = {"some_key_1": "key_1", "some_key_2": "key_2"}
    print(
        f"Following both test implementations with workflow_module_path={workflow_module_path}"
    )
    test_a(
        workflow_module_path,
        workflow_params,
        workflow_input_payload,
    )

    test_b(
        workflow_module_path,
        workflow_params,
        workflow_input_payload,
    )

workflow.py

from metaflow import FlowSpec, JSONType, Parameter, step

from appraisal_api.domain.workflows.metaflow.io_schemas import (
    SampleFlowResponse,
    SampleFlowResult,
)


class SimpleFlow(FlowSpec):
    input_payload = Parameter(
        "input_payload",
        help="The payload containing the images to be processed.",
        required=True,
        type=JSONType,
    )

    some_key_1 = Parameter(
        "some_key_1",
        help="Some Extra Parameter provided to the workflow",
        type=str,
        default="key_1",
    )

    some_key_2 = Parameter(
        "some_key_2",
        help="Some Extra Parameter provided to the workflow",
        type=str,
        default="key_2",
    )

    @step
    def start(self) -> None:
        print("This is the start step.")
        self.next(self.process_data)

    @step
    def process_data(self) -> None:
        print("This is the data processing step.")
        self.next(self.end)

    @step
    def end(self) -> None:
        result = SampleFlowResult(img_path="path/to/image.jpg", result="success")
        self.output_payload = SampleFlowResponse(
            operation="sample_flow",
            response=[result],
        )
        print("This is the end step.")


if __name__ == "__main__":
    SimpleFlow()

To run this setup run test.py

HarryAnkers avatar Oct 01 '24 08:10 HarryAnkers