Is it possible to use @metaflow_ray with foreach on AWS Batch?
I noticed that the multinode case for AWS Batch (which I believe is what is called for @metaflow_ray) seems to rely on UBF_CONTROL , which makes me think that we can't use call a multinode batch step in a foreach task (because then presumably we would be using UBF_TASK). Is this correct?
I'm wondering because my use case has multiple job groups that I would like to submit jobs for, where each job group is a parallelizable set of subjobs. I wanted distinction between the individual job groups themselves so I thought combining the foreach and the metaflow_ray decorator would be useful
yes, @metaflow-ray does work on AWS Batch using multi-node. you can nest metaflow-ray within a foreach.
Got it. Do you know what might be causing this error then? It seems to be referencing a path called s3://valthos-compute//jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912 but the only path that exists is s3://valthos-compute/jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912, which itself is empty.
Downloading code package...
--
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/usr/local/lib/python3.11/site-packages/botocore/context.py", line 123, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/boto3/s3/inject.py", line 223, in download_file
return transfer.download_file(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/boto3/s3/transfer.py", line 406, in download_file
future.result()
File "/usr/local/lib/python3.11/site-packages/s3transfer/futures.py", line 111, in result
return self._coordinator.result()
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/s3transfer/futures.py", line 287, in result
raise self._exception
File "/usr/local/lib/python3.11/site-packages/s3transfer/tasks.py", line 272, in _main
self._submit(transfer_future=transfer_future, **kwargs)
File "/usr/local/lib/python3.11/site-packages/s3transfer/download.py", line 355, in _submit
response = client.head_object(
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 601, in _api_call
return self._make_api_call(operation_name, kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/context.py", line 123, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/botocore/client.py", line 1074, in _make_api_call
raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (404) when calling the HeadObject operation: Not Found
Failed to download code package from s3://valthos-compute/jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912 after 6 tries. Exiting...
can you share your a reproducible example? the path it is trying to read from is incorrect.
Sure, this is the portion of my code that creates these jobs.
@step
def prepare_design_inputs(self):
# Load base defaults
base_filters = read_defaults(self.default_filter_file)
print("Done reading base filters.")
base_advanced_settings = read_defaults(self.default_advanced_settings_file)
print("Done reading base advanced settings.")
base_target_settings = read_defaults(self.default_target_settings_file)
print("Done reading base target settings.")
# First expand job combinations
expanded_config = expand_all_job_combinations(self.job_config)
print(expanded_config)
# Create individual overrides for each job combination
self.job_overrides = {}
# Handle the new nested structure: original_job_name -> {expanded_job_name: job_config}
for original_job_name, job_combinations in expanded_config["jobs"].items():
self.job_overrides[original_job_name] = {}
for expanded_job_name, job_config in job_combinations.items():
# Use helper functions to validate and override settings
job_advanced_settings = override_advanced_settings(
base_advanced_settings, job_config, expanded_job_name
)
print("Done overriding advanced settings: ", job_advanced_settings)
job_target_settings = override_target_settings(
base_target_settings, job_config, expanded_job_name
)
print("Done overriding target settings: ", job_target_settings)
# Store the overrides for this specific job combination
self.job_overrides[original_job_name][expanded_job_name] = {
"filters": base_filters.copy(), # Don't override the base filters for now.
"advanced_settings": job_advanced_settings,
"target_settings": job_target_settings,
"original_job_name": original_job_name, # Keep reference to original job
}
# Prepare job data for foreach - each original job name with its expanded jobs
self.job_groups = []
for original_job_name, job_combinations in self.job_overrides.items():
self.job_groups.append(
{
"original_job_name": original_job_name,
"expanded_jobs": job_combinations,
"num_jobs": len(job_combinations),
}
)
if not hasattr(self, "job_groups"):
raise RuntimeError("job_groups not computed; earlier code failed")
self.next(self.design_job_group, foreach="job_groups")
@step
def design_job_group(self):
"""Prepare for Ray execution by determining cluster size."""
job_group = self.input
original_job_name = job_group["original_job_name"]
expanded_jobs = job_group["expanded_jobs"]
num_jobs = job_group["num_jobs"]
# Store job group data for the Ray step
self.original_job_name = original_job_name
self.expanded_jobs = expanded_jobs
self.num_jobs = num_jobs
print(f"Preparing Ray cluster for job group: {original_job_name}")
print(f"Number of expanded jobs in this group: {num_jobs}")
# Pass the number of jobs as num_parallel to the Ray step. We add 1 to account for the
# head node.
self.next(self.execute_ray_jobs, num_parallel=num_jobs)
# @secrets(sources=[EnvConfig.get("github_secret_arn")])
@batch(
image=ImageConfig.get_image_url("gpu-base"),
queue=InstanceConfig.get_queue_name("L40S"),
gpu=1,
)
@metaflow_ray(
# 20 mins timeout for all workers to join
all_nodes_started_timeout=1200
)
@environment(
vars={
"GIT_BRANCH": get_current_branch(),
"GIT_REPO": VALTHOS_MAIN_REPO,
}
)
@step
def execute_ray_jobs(self):
"""Execute jobs using Ray with dynamically sized cluster."""
original_job_name = self.original_job_name
expanded_jobs = self.expanded_jobs
num_jobs = self.num_jobs
print(f"Starting Ray cluster for job group: {original_job_name}")
print(f"Number of expanded jobs in this group: {num_jobs}")
# Get the specific job for this parallel node
expanded_job_names = list(expanded_jobs.keys())
node_index = current.parallel.node_index
future_results = []
self.job_results = []
Digging myself, I can only find reference to the code package path in question here, where $AWS_BATCH_JOB_NODE_INDEX is used.
I believe there is a mismatch between what is passed into the job through METAFLOW_CODE_URL and what the actual secondary command is. One of my nodes has a command like
["bash","-c","true && mkdir -p $PWD/.logs && export PYTHONUNBUFFERED=x MF_PATHSPEC=DesignFlow/1756138917013236/execute_ray_jobs/4 MF_DATASTORE=s3 MF_ATTEMPT=0 MFLOG_STDOUT=$PWD/.logs/mflog_stdout MFLOG_STDERR=$PWD/.logs/mflog_stderr && mflog(){ T=$(date -u -Ins|tr , .); echo \"[MFLOG|0|${T:0:26}Z|task|$T]$1\" >> $MFLOG_STDOUT; echo $1; } && flush_mflogs(){ python -m metaflow.mflog.save_logs; } && mflog 'Setting up task environment.' && if [ -z $METAFLOW_SKIP_INSTALL_DEPENDENCIES ]; then python -m pip install -qqq --no-compile --no-cache-dir --disable-pip-version-check boto3 requests; fi && mkdir metaflow && cd metaflow && mkdir .metaflow && i=0; while [ $i -le 5 ]; do mflog 'Downloading code package...'; python -c 'import boto3, os; ep=os.getenv(\"METAFLOW_S3_ENDPOINT_URL\"); boto3.client(\"s3\", **({\"endpoint_url\":ep} if ep else {})).download_file(\"valthos-compute\", \"jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912\", \"job.tar\")' && mflog 'Code package downloaded.' && break; sleep 10; i=$((i+1)); done && if [ $i -gt 5 ]; then mflog 'Failed to download code package from s3://valthos-compute/jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912 after 6 tries. Exiting...' && exit 1; fi && TAR_OPTIONS='--warning=no-timestamp' tar -xzf job.tar -C . && export PYTHONPATH=$(pwd)/.mf_code:$(printenv PYTHONPATH) && export METAFLOW_EXTRACTED_ROOT=$(pwd) && mflog 'Task is starting.' && flush_mflogs && (python -u design.py --quiet --datastore s3 --metadata local --environment local --event-logger nullSidecarLogger --monitor nullSidecarMonitor --datastore-root s3://valthos-compute/jobs --package-suffixes .py,.R,.RDS --pylint step execute_ray_jobs --run-id 1756138917013236 --task-id 4 --input-paths ${METAFLOW_INPUT_PATHS_0} --retry-count 0 --max-user-code-retries 0 --namespace user:victormaomain --ubf-context ubf_control --split-index 0 --split-index 0) 1>> >(python -m metaflow.mflog.tee task $MFLOG_STDOUT) 2>> >(python -m metaflow.mflog.tee task $MFLOG_STDERR >&2); c=$?; python -m metaflow.mflog.save_logs; exit $c"]
["bash","-c","true && mkdir -p $PWD/.logs && export PYTHONUNBUFFERED=x MF_PATHSPEC=DesignFlow/1756138917013236/execute_ray_jobs/4-node-$AWS_BATCH_JOB_NODE_INDEX MF_DATASTORE=s3 MF_ATTEMPT=0 MFLOG_STDOUT=$PWD/.logs/mflog_stdout MFLOG_STDERR=$PWD/.logs/mflog_stderr && mflog(){ T=$(date -u -Ins|tr , .); echo \"[MFLOG|0|${T:0:26}Z|task|$T]$1\" >> $MFLOG_STDOUT; echo $1; } && flush_mflogs(){ python -m metaflow.mflog.save_logs; } && mflog 'Setting up task environment.' && if [ -z $METAFLOW_SKIP_INSTALL_DEPENDENCIES ]; then python -m pip install -qqq --no-compile --no-cache-dir --disable-pip-version-check boto3 requests; fi && mkdir metaflow && cd metaflow && mkdir .metaflow && i=0; while [ $i -le 5 ]; do mflog 'Downloading code package...'; python -c 'import boto3, os; ep=os.getenv(\"METAFLOW_S3_ENDPOINT_URL\"); boto3.client(\"s3\", **({\"endpoint_url\":ep} if ep else {})).download_file(\"valthos-compute\", \"jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912\", \"job.tar\")' && mflog 'Code package downloaded.' && break; sleep 10; i=$((i+1)); done && if [ $i -gt 5 ]; then mflog 'Failed to download code package from s3://valthos-compute/jobs/DesignFlow/data/65/654-node-$AWS_BATCH_JOB_NODE_INDEXfc01a321fbdc7665d25d54-node-$AWS_BATCH_JOB_NODE_INDEXd654-node-$AWS_BATCH_JOB_NODE_INDEXe7164-node-$AWS_BATCH_JOB_NODE_INDEX0a4-node-$AWS_BATCH_JOB_NODE_INDEX912 after 6 tries. Exiting...' && exit 1; fi && TAR_OPTIONS='--warning=no-timestamp' tar -xzf job.tar -C . && export PYTHONPATH=$(pwd)/.mf_code:$(printenv PYTHONPATH) && export METAFLOW_EXTRACTED_ROOT=$(pwd) && mflog 'Task is starting.' && flush_mflogs && (python -u design.py --quiet --datastore s3 --metadata local --environment local --event-logger nullSidecarLogger --monitor nullSidecarMonitor --datastore-root s3://valthos-compute/obs --package-suffixes .py,.R,.RDS --pylint step execute_ray_jobs --run-id 1756138917013236 --task-id 4-node-$AWS_BATCH_JOB_NODE_INDEX --input-paths ${METAFLOW_INPUT_PATHS_0} --retry-count 0 --max-user-code-retries 0 --namespace user:victormaomain --ubf-context ubf_task --split-index 0 --split-index $AWS_BATCH_JOB_NODE_INDEX) 1>> >(python -m metaflow.mflog.tee task $MFLOG_STDOUT) 2>> >(python -m metaflow.mflog.tee task $MFLOG_STDERR >&2); c=$?; python -m metaflow.mflog.save_logs; exit $c"]
However, both nodes have the same METAFLOW_CODE_URL as s3://valthos-compute/jobs/DesignFlow/data/65/654fc01a321fbdc7665d25d54d654e71640a4912
I believe I found the error: https://github.com/Netflix/metaflow/pull/2574