metaflow icon indicating copy to clipboard operation
metaflow copied to clipboard

Is it possible to use @metaflow_ray with foreach on AWS Batch?

Open vymao opened this issue 4 months ago • 7 comments

I noticed that the multinode case for AWS Batch (which I believe is what is called for @metaflow_ray) seems to rely on UBF_CONTROL , which makes me think that we can't use call a multinode batch step in a foreach task (because then presumably we would be using UBF_TASK). Is this correct?

I'm wondering because my use case has multiple job groups that I would like to submit jobs for, where each job group is a parallelizable set of subjobs. I wanted distinction between the individual job groups themselves so I thought combining the foreach and the metaflow_ray decorator would be useful

vymao avatar Aug 25 '25 18:08 vymao

yes, @metaflow-ray does work on AWS Batch using multi-node. you can nest metaflow-ray within a foreach.

savingoyal avatar Aug 25 '25 18:08 savingoyal

Got it. Do you know what might be causing this error then? It seems to be referencing a path called s3://valthos-compute//jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912 but the only path that exists is s3://valthos-compute/jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912, which itself is empty.


Downloading code package...
--
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/lib/python3.11/site-packages/botocore/context.py", line 123, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/boto3/s3/inject.py", line 223, in download_file
return transfer.download_file(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/boto3/s3/transfer.py", line 406, in download_file
future.result()
File "/usr/local/lib/python3.11/site-packages/s3transfer/futures.py", line 111, in result
return self._coordinator.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/s3transfer/futures.py", line 287, in result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/s3transfer/tasks.py", line 272, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/usr/local/lib/python3.11/site-packages/s3transfer/download.py", line 355, in _submit
response = client.head_object(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 601, in _api_call
return self._make_api_call(operation_name, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/context.py", line 123, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 1074, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
Failed to download code package from s3://valthos-compute/jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912 after 6 tries. Exiting...

vymao avatar Aug 25 '25 18:08 vymao

can you share your a reproducible example? the path it is trying to read from is incorrect.

savingoyal avatar Aug 25 '25 18:08 savingoyal

Sure, this is the portion of my code that creates these jobs.

   @step
    def prepare_design_inputs(self):
        # Load base defaults
        base_filters = read_defaults(self.default_filter_file)
        print("Done reading base filters.")
        base_advanced_settings = read_defaults(self.default_advanced_settings_file)
        print("Done reading base advanced settings.")
        base_target_settings = read_defaults(self.default_target_settings_file)
        print("Done reading base target settings.")

        # First expand job combinations
        expanded_config = expand_all_job_combinations(self.job_config)

        print(expanded_config)

        # Create individual overrides for each job combination
        self.job_overrides = {}

        # Handle the new nested structure: original_job_name -> {expanded_job_name: job_config}
        for original_job_name, job_combinations in expanded_config["jobs"].items():
            self.job_overrides[original_job_name] = {}
            for expanded_job_name, job_config in job_combinations.items():
                # Use helper functions to validate and override settings

                job_advanced_settings = override_advanced_settings(
                    base_advanced_settings, job_config, expanded_job_name
                )
                print("Done overriding advanced settings: ", job_advanced_settings)
                job_target_settings = override_target_settings(
                    base_target_settings, job_config, expanded_job_name
                )
                print("Done overriding target settings: ", job_target_settings)

                # Store the overrides for this specific job combination
                self.job_overrides[original_job_name][expanded_job_name] = {
                    "filters": base_filters.copy(),  # Don't override the base filters for now.
                    "advanced_settings": job_advanced_settings,
                    "target_settings": job_target_settings,
                    "original_job_name": original_job_name,  # Keep reference to original job
                }

        # Prepare job data for foreach - each original job name with its expanded jobs
        self.job_groups = []
        for original_job_name, job_combinations in self.job_overrides.items():
            self.job_groups.append(
                {
                    "original_job_name": original_job_name,
                    "expanded_jobs": job_combinations,
                    "num_jobs": len(job_combinations),
                }
            )

        if not hasattr(self, "job_groups"):
            raise RuntimeError("job_groups not computed; earlier code failed")

        self.next(self.design_job_group, foreach="job_groups")

    @step
    def design_job_group(self):
        """Prepare for Ray execution by determining cluster size."""
        job_group = self.input
        original_job_name = job_group["original_job_name"]
        expanded_jobs = job_group["expanded_jobs"]
        num_jobs = job_group["num_jobs"]

        # Store job group data for the Ray step
        self.original_job_name = original_job_name
        self.expanded_jobs = expanded_jobs
        self.num_jobs = num_jobs

        print(f"Preparing Ray cluster for job group: {original_job_name}")
        print(f"Number of expanded jobs in this group: {num_jobs}")

        # Pass the number of jobs as num_parallel to the Ray step. We add 1 to account for the
        # head node.
        self.next(self.execute_ray_jobs, num_parallel=num_jobs)

    # @secrets(sources=[EnvConfig.get("github_secret_arn")])
    @batch(
        image=ImageConfig.get_image_url("gpu-base"),
        queue=InstanceConfig.get_queue_name("L40S"),
        gpu=1,
    )
    @metaflow_ray(
        # 20 mins timeout for all workers to join
        all_nodes_started_timeout=1200
    )
    @environment(
        vars={
            "GIT_BRANCH": get_current_branch(),
            "GIT_REPO": VALTHOS_MAIN_REPO,
        }
    )
    @step
    def execute_ray_jobs(self):
        """Execute jobs using Ray with dynamically sized cluster."""
        original_job_name = self.original_job_name
        expanded_jobs = self.expanded_jobs
        num_jobs = self.num_jobs

        print(f"Starting Ray cluster for job group: {original_job_name}")
        print(f"Number of expanded jobs in this group: {num_jobs}")

        # Get the specific job for this parallel node
        expanded_job_names = list(expanded_jobs.keys())
        node_index = current.parallel.node_index

        future_results = []
        self.job_results = []

vymao avatar Aug 25 '25 18:08 vymao

Digging myself, I can only find reference to the code package path in question here, where $AWS_BATCH_JOB_NODE_INDEX is used.

vymao avatar Aug 25 '25 19:08 vymao

I believe there is a mismatch between what is passed into the job through METAFLOW_CODE_URL and what the actual secondary command is. One of my nodes has a command like

["bash","-c","true && mkdir -p $PWD/.logs && export PYTHONUNBUFFERED=x MF_PATHSPEC=DesignFlow/1756138917013236/execute_ray_jobs/4 MF_DATASTORE=s3 MF_ATTEMPT=0 MFLOG_STDOUT=$PWD/.logs/mflog_stdout MFLOG_STDERR=$PWD/.logs/mflog_stderr && mflog(){ T=$(date -u -Ins|tr , .); echo \"[MFLOG|0|${T:0:26}Z|task|$T]$1\" >> $MFLOG_STDOUT; echo $1;  } && flush_mflogs(){ python -m metaflow.mflog.save_logs; } && mflog 'Setting up task environment.' && if [ -z $METAFLOW_SKIP_INSTALL_DEPENDENCIES ]; then python -m pip install -qqq --no-compile --no-cache-dir --disable-pip-version-check boto3 requests; fi && mkdir metaflow && cd metaflow && mkdir .metaflow && i=0; while [ $i -le 5 ]; do mflog 'Downloading code package...'; python -c 'import boto3, os; ep=os.getenv(\"METAFLOW_S3_ENDPOINT_URL\"); boto3.client(\"s3\", **({\"endpoint_url\":ep} if ep else {})).download_file(\"valthos-compute\", \"jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912\", \"job.tar\")' && mflog 'Code package downloaded.' && break; sleep 10; i=$((i+1)); done && if [ $i -gt 5 ]; then mflog 'Failed to download code package from s3://valthos-compute/jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912 after 6 tries. Exiting...' && exit 1; fi && TAR_OPTIONS='--warning=no-timestamp' tar -xzf job.tar -C . && export PYTHONPATH=$(pwd)/.mf_code:$(printenv PYTHONPATH) && export METAFLOW_EXTRACTED_ROOT=$(pwd) && mflog 'Task is starting.' && flush_mflogs && (python -u design.py --quiet --datastore s3 --metadata local --environment local --event-logger nullSidecarLogger --monitor nullSidecarMonitor --datastore-root s3://valthos-compute/jobs --package-suffixes .py,.R,.RDS --pylint step execute_ray_jobs --run-id 1756138917013236 --task-id 4 --input-paths ${METAFLOW_INPUT_PATHS_0} --retry-count 0 --max-user-code-retries 0 --namespace user:victormaomain --ubf-context ubf_control --split-index 0 --split-index 0) 1>> >(python -m metaflow.mflog.tee task $MFLOG_STDOUT) 2>> >(python -m metaflow.mflog.tee task $MFLOG_STDERR >&2); c=$?; python -m metaflow.mflog.save_logs; exit $c"]
["bash","-c","true && mkdir -p $PWD/.logs && export PYTHONUNBUFFERED=x MF_PATHSPEC=DesignFlow/1756138917013236/execute_ray_jobs/4-node-$AWS_BATCH_JOB_NODE_INDEX MF_DATASTORE=s3 MF_ATTEMPT=0 MFLOG_STDOUT=$PWD/.logs/mflog_stdout MFLOG_STDERR=$PWD/.logs/mflog_stderr && mflog(){ T=$(date -u -Ins|tr , .); echo \"[MFLOG|0|${T:0:26}Z|task|$T]$1\" >> $MFLOG_STDOUT; echo $1;  } && flush_mflogs(){ python -m metaflow.mflog.save_logs; } && mflog 'Setting up task environment.' && if [ -z $METAFLOW_SKIP_INSTALL_DEPENDENCIES ]; then python -m pip install -qqq --no-compile --no-cache-dir --disable-pip-version-check boto3 requests; fi && mkdir metaflow && cd metaflow && mkdir .metaflow && i=0; while [ $i -le 5 ]; do mflog 'Downloading code package...'; python -c 'import boto3, os; ep=os.getenv(\"METAFLOW_S3_ENDPOINT_URL\"); boto3.client(\"s3\", **({\"endpoint_url\":ep} if ep else {})).download_file(\"valthos-compute\", \"jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912\", \"job.tar\")' && mflog 'Code package downloaded.' && break; sleep 10; i=$((i+1)); done && if [ $i -gt 5 ]; then mflog 'Failed to download code package from s3://valthos-compute/jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912 after 6 tries. Exiting...' && exit 1; fi && TAR_OPTIONS='--warning=no-timestamp' tar -xzf job.tar -C . && export PYTHONPATH=$(pwd)/.mf_code:$(printenv PYTHONPATH) && export METAFLOW_EXTRACTED_ROOT=$(pwd) && mflog 'Task is starting.' && flush_mflogs && (python -u design.py --quiet --datastore s3 --metadata local --environment local --event-logger nullSidecarLogger --monitor nullSidecarMonitor --datastore-root s3://valthos-compute/obs --package-suffixes .py,.R,.RDS --pylint step execute_ray_jobs --run-id 1756138917013236 --task-id 4-node-$AWS_BATCH_JOB_NODE_INDEX --input-paths ${METAFLOW_INPUT_PATHS_0} --retry-count 0 --max-user-code-retries 0 --namespace user:victormaomain --ubf-context ubf_task --split-index 0 --split-index $AWS_BATCH_JOB_NODE_INDEX) 1>> >(python -m metaflow.mflog.tee task $MFLOG_STDOUT) 2>> >(python -m metaflow.mflog.tee task $MFLOG_STDERR >&2); c=$?; python -m metaflow.mflog.save_logs; exit $c"]

However, both nodes have the same METAFLOW_CODE_URL as s3://valthos-compute/jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912

vymao avatar Aug 25 '25 19:08 vymao

I believe I found the error: https://github.com/Netflix/metaflow/pull/2574

vymao avatar Aug 26 '25 21:08 vymao