metaflow
metaflow copied to clipboard
Feat: Add the option to use shell with the metaflow python Runner
Hi, Raising a pr to allow the subprocesses to be executed directly through the shell.
Locally this had a massive performance improvement for an extreamly lightweight flow for me. 18.618823051452637 to 3.2076590061187744
I'm not 100% on where the increase comes from. I suspect this is as the shell is caching python instead of directly executing it each time.
Thank you
@HarryAnkers, do you have a reproducible example that demonstrates the speed-up? We don't set shell=True for many reasons (security, portability, etc.), and I am curious to dig into what is actually causing the slowdown/speedup. That will help us understand whether this PR is in the right direction or not.
@savingoyal thanks for the reply.
Yes here's a two file setup that shows this issue
test.py
import json
import os
import subprocess
import tempfile
import time
from typing import Any
from metaflow import Runner
def test_a(
workflow_module_path: str,
workflow_params: dict[str, Any],
workflow_input_payload: str,
) -> None:
start_time = time.time()
print(f"Starting runner execution_time={time.time() - start_time}")
with Runner(workflow_module_path, pylint=False) as runner:
running = runner.run(**workflow_params, input_payload=workflow_input_payload)
print(f"Runner run execution_time={time.time() - start_time}")
if running.status == "successful":
print(f"Runner completed execution_time={time.time() - start_time}")
def test_b(
workflow_module_path: str,
workflow_params: dict[str, Any],
workflow_input_payload: str,
) -> None:
start_time = time.time()
print(f"Starting runner workaround execution_time={time.time() - start_time}")
with tempfile.TemporaryDirectory(dir="/tmp") as temp_dir:
tfp_runner_attribute = tempfile.NamedTemporaryFile(dir=temp_dir, delete=False)
print(f"Started runner execution_time={time.time() - start_time}")
subprocess.run(
f"python {workflow_module_path} --no-pylint run --runner-attribute-file '{tfp_runner_attribute.name}' --input_payload '{workflow_input_payload}'",
shell=True,
)
print(f"Runner workaround completed execution_time={time.time() - start_time}")
if __name__ == "__main__":
cwd = os.path.dirname(os.path.abspath(__file__))
workflow_module_path = os.path.join(cwd, "workflow.py")
workflow_input_payload = json.dumps(
{"request_id": "5c76e196-5ef2-443d-a036-25755a9a5fdd"}
)
workflow_params = {"some_key_1": "key_1", "some_key_2": "key_2"}
print(
f"Following both test implementations with workflow_module_path={workflow_module_path}"
)
test_a(
workflow_module_path,
workflow_params,
workflow_input_payload,
)
test_b(
workflow_module_path,
workflow_params,
workflow_input_payload,
)
workflow.py
from metaflow import FlowSpec, JSONType, Parameter, step
from appraisal_api.domain.workflows.metaflow.io_schemas import (
SampleFlowResponse,
SampleFlowResult,
)
class SimpleFlow(FlowSpec):
input_payload = Parameter(
"input_payload",
help="The payload containing the images to be processed.",
required=True,
type=JSONType,
)
some_key_1 = Parameter(
"some_key_1",
help="Some Extra Parameter provided to the workflow",
type=str,
default="key_1",
)
some_key_2 = Parameter(
"some_key_2",
help="Some Extra Parameter provided to the workflow",
type=str,
default="key_2",
)
@step
def start(self) -> None:
print("This is the start step.")
self.next(self.process_data)
@step
def process_data(self) -> None:
print("This is the data processing step.")
self.next(self.end)
@step
def end(self) -> None:
result = SampleFlowResult(img_path="path/to/image.jpg", result="success")
self.output_payload = SampleFlowResponse(
operation="sample_flow",
response=[result],
)
print("This is the end step.")
if __name__ == "__main__":
SimpleFlow()
To run this setup run test.py