kestra
kestra copied to clipboard
Script tasks cannot directly access output files from `io.kestra.plugin.core.flow.ForEachItem` Subflows
Describe the issue
This is the yaml file of my flow using the plugin io.kestra.plugin.core.flow.ForEachItem
. It calls the flow comp.batch which processes a subset of 100 rows of the input file.
id: main_parallel
namespace: comp.main_parallel
tasks:
- id: "connector"
type: io.kestra.plugin.core.flow.Subflow
namespace: comp.connector
flowId: connector
- id: json_to_ion
type: io.kestra.plugin.serdes.json.JsonToIon
from: '{{ outputs.connector.outputs.output_file }}'
newLine: false # regular json
- id: ion_to_jsonl
type: io.kestra.plugin.serdes.json.IonToJson
from: "{{ outputs.json_to_ion.uri }}"
newLine: true # JSON-L
- id: for_each_item
type: io.kestra.plugin.core.flow.ForEachItem
items: "{{ outputs.ion_to_jsonl.uri }}"
batch:
rows: 100
namespace: comp.batch
flowId: batch
wait: true
transmitFailed: true
inputs:
input_file: '{{ taskrun.items }}'
- id: merge_outputs
type: io.kestra.plugin.scripts.shell.Script
taskRunner:
type: io.kestra.plugin.core.runner.Process
outputFiles:
- output.jsonl
script: |
subflow_output_paths="{{ outputs.for_each_item_merge.subflowOutputs }}"
merged_output="output.jsonl"
echo "$subflow_output_paths"
> "$merged_output"
while IFS= read -r subflow_output_line
do
subflow_output_path="$(echo $subflow_output_line | cut -d '"' -f 2)"
cat "$subflow_output_path" >> "$merged_output"
done < "$subflow_output_paths"
I am able to split my input into multiple batches and process these in subprocesses. I see that the subprocesses properly output files. Hereafter I am able to read the file containing the paths to the different batches. However, I am not able to read those files:
The merge_outputs step crashes with the following error:
2024-09-20 19:33:54.563 cat: ‘kestra:///comp/batch/batch/executions/3Ft8Yi2jdLF9pfZnhkdb8Z/tasks/ion-to-jsonl/6zicRSyNtzeMuL9WodJNGc/2161082710634191013.jsonl’: No such file or directory
.
Could it be that the batch outputs arent properly made available in the main task?
Furthermore, I was wondering whether there is an built-in way in kestra which enables (optional) concatenation of the batch-output files. This would save quite some hassle writing this from scratch as I am doing now.
If you need additional information, I'd gladly hear it. Thanks in advance!
Environment
- Kestra Version: 0.18.8
- Operating System: self hosted Kubernetes/local docker container