kestra icon indicating copy to clipboard operation
kestra copied to clipboard

Script tasks cannot directly access output files from `io.kestra.plugin.core.flow.ForEachItem` Subflows

Open yorickdevries opened this issue 5 months ago • 13 comments

Describe the issue

This is the yaml file of my flow using the plugin io.kestra.plugin.core.flow.ForEachItem. It calls the flow comp.batch which processes a subset of 100 rows of the input file.

id: main_parallel
namespace: comp.main_parallel
tasks:
  - id: "connector"
    type: io.kestra.plugin.core.flow.Subflow
    namespace: comp.connector
    flowId: connector

  - id: json_to_ion
    type: io.kestra.plugin.serdes.json.JsonToIon
    from: '{{ outputs.connector.outputs.output_file }}'
    newLine: false # regular json
  - id: ion_to_jsonl
    type: io.kestra.plugin.serdes.json.IonToJson
    from: "{{ outputs.json_to_ion.uri }}"
    newLine: true # JSON-L
  
  - id: for_each_item
    type: io.kestra.plugin.core.flow.ForEachItem
    items: "{{ outputs.ion_to_jsonl.uri }}"
    batch:
      rows: 100
    namespace: comp.batch
    flowId: batch
    wait: true
    transmitFailed: true
    inputs:
      input_file: '{{ taskrun.items }}'

  - id: merge_outputs
    type: io.kestra.plugin.scripts.shell.Script
    taskRunner:
      type: io.kestra.plugin.core.runner.Process
    outputFiles:
      - output.jsonl
    script: |
      subflow_output_paths="{{ outputs.for_each_item_merge.subflowOutputs }}"
      merged_output="output.jsonl"
      echo "$subflow_output_paths"
      
      > "$merged_output"
      while IFS= read -r subflow_output_line
      do
        subflow_output_path="$(echo $subflow_output_line | cut -d '"' -f 2)"
        cat "$subflow_output_path" >> "$merged_output"
      done < "$subflow_output_paths"

I am able to split my input into multiple batches and process these in subprocesses. I see that the subprocesses properly output files. Hereafter I am able to read the file containing the paths to the different batches. However, I am not able to read those files:

The merge_outputs step crashes with the following error: 2024-09-20 19:33:54.563 cat: ‘kestra:///comp/batch/batch/executions/3Ft8Yi2jdLF9pfZnhkdb8Z/tasks/ion-to-jsonl/6zicRSyNtzeMuL9WodJNGc/2161082710634191013.jsonl’: No such file or directory.

Could it be that the batch outputs arent properly made available in the main task?

Furthermore, I was wondering whether there is an built-in way in kestra which enables (optional) concatenation of the batch-output files. This would save quite some hassle writing this from scratch as I am doing now.

If you need additional information, I'd gladly hear it. Thanks in advance!

Environment

  • Kestra Version: 0.18.8
  • Operating System: self hosted Kubernetes/local docker container

yorickdevries avatar Sep 20 '24 17:09 yorickdevries