scrnaseq icon indicating copy to clipboard operation
scrnaseq copied to clipboard

cellranger_count.py: capture stderr and stdout

Open nick-youngblut opened this issue 1 year ago • 4 comments

Description of feature

cellranger_count.py currently just uses subprocess.run for running cellranger count, but it does not capture and write out the subprocess stdout and stderr, so all that is returned to the user during a failed job is:

Traceback (most recent call last):
  File "/home/nickyoungblut/dev/nextflow/scrnaseq/work/8a/e399f792d79e21d811f4d2698751fa/.command.sh", line 57, in <module>
    run(
  File "/opt/conda/lib/python3.10/subprocess.py", line 526, in run
    raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['cellranger', 'count', '--id', 'pbmc8k_s1', '--fastqs', 'fastq_all', '--transcriptome', 'refdata-gex-GRCh38-2024-A', '--localcores', '12', '--localmem', '72', '--chemistry', 'SC3Pv2', '--create-bam', 'true', '--expect-cells', '10000']' returned non-zero exit status 1.

It would be helpful if stderr and stdout were captured and returned. For example:

result = run(
    # fmt: off
    [
        "cellranger", "count",
        "--id", "${prefix}",
        "--fastqs", str(fastq_all),
        "--transcriptome", "${reference.name}",
        "--localcores", "${task.cpus}",
        "--localmem", "${task.memory.toGiga()}",
        *shlex.split("""${args}""")
    ],
    # fmt: on
    check=True,
    capture_output=True,
    text=True  # to get the output as strings
)

# Access the stdout and stderr
stdout = result.stdout
stderr = result.stderr

# Write the stdout and stderr
[...]

An alternative:

from subprocess import run, CalledProcessError
import shlex

def run_subprocess(command):
    try:
        result = run(
            shlex.split(command),
            check=True,
            capture_output=True,
            text=True
        )
        return result.stdout, result.stderr
    except CalledProcessError as e:
        print(f"Command '{e.cmd}' failed with return code {e.returncode}")
        print(f"stdout: {e.stdout}")
        print(f"stderr: {e.stderr}")
        raise

# Example usage
command = "cellranger count --id sample_id --fastqs ./fastq_all --transcriptome reference --localcores 4 --localmem 16"
stdout, stderr = run_subprocess(command)

nick-youngblut avatar Jun 24 '24 18:06 nick-youngblut

Also, it appears that since cellranger count is called from within the cellranger_count.py job, a 137 exit status (lack of memory) for the cellranger count job will be "reported" by the cellranger_count.py job as just an exit status of 1.

This is important for retrying processes with escalated resources:

    errorStrategy = { task.exitStatus in ((130..145) + 104) ? 'retry' : 'finish' }
    maxRetries    = 1
    maxErrors     = '-1'

...since exit values of 1 will not trigger a retry.

nick-youngblut avatar Jun 24 '24 18:06 nick-youngblut

Thanks for raising the issue, agree stderr/stdout and exit code should be forwarded. This should be fixed at the nf-core/modules level and likely also affects the spaceranger and cellranger multi modules that share the python script.

I will follow up on this eventually, but I have only very limited time I can put into nf-core at the moment -- so if you want to speed it up a PR to modules would be appreciated :)

grst avatar Jun 25 '24 06:06 grst

@grst do you know if cellranger count actually returns at 137 exit if there is a lack of memory for the job?

I am using -process.scratch ram-disk, which requires more memory for the job, but the current release of the cellranger-count nf-core module just returns an exit of 1, so the job will never retry with more memory:

    withLabel:process_high {
        cpus   = { check_max( 12    * task.attempt, 'cpus'    ) }
        memory = { check_max( 72.GB * task.attempt, 'memory'  ) }
        time   = { check_max( 16.h  * task.attempt, 'time'    ) }
    }

I tried updating the cellranger_count.py template:

cellranger_count.py
#!/usr/bin/env python3
"""
Automatically rename staged files for input into cellranger count.

Copyright (c) Gregor Sturm 2023 - MIT License
"""
from subprocess import run, CalledProcessError, SubprocessError
from pathlib import Path
from textwrap import dedent
import shlex
import re
import sys


def chunk_iter(seq, size):
    """
    Iterate over `seq` in chunks of `size`
    Args:
        seq: iterable, the sequence to chunk
        size: int, the size of the chunks
    Returns:
        generator: the chunks of `seq`
    """
    return (seq[pos : pos + size] for pos in range(0, len(seq), size))


def run_subprocess(command):
    """
    Run a subprocess command and return stdout and stderr as strings.
    Args:
        command: list of strings, the command to run
    Returns:
        tuple of strings: (stdout, stderr)
    """
    try:
        # Run the command and capture stdout and stderr as strings
        result = run(
            command,
            check=True,
            capture_output=True,
            text=True
        )
        return result.stdout, result.stderr
    except CalledProcessError as e:
        # Print the error message and exit with the return code of the subprocess
        print(f"Command '{e.cmd}' failed with return code {e.returncode}")
        print(f"#--- STDOUT ---#\\n{e.stdout}")
        print(f"#--- STDERR ---#\\n{e.stderr}")
        sys.exit(e.returncode)
    except SubprocessError as e:
        # Print the error message and exit with return code 1
        print(f"Subprocess error: {str(e)}")
        sys.exit(1)

# Set the sample ID to the pipeline meta.id
sample_id = "${meta.id}"

# Get fastqs, ordered by path. Files are staged into
#   - "fastq_001/{original_name.fastq.gz}"
#   - "fastq_002/{oritinal_name.fastq.gz}"
#   - ...
# Since we require fastq files in the input channel to be ordered such that a R1/R2 pair
# of files follows each other, ordering will get us a sequence of [R1, R2, R1, R2, ...]
fastqs = sorted(Path(".").glob("fastq_*/*"))
assert len(fastqs) % 2 == 0

# Target directory in which the renamed fastqs will be placed
fastq_all = Path("./fastq_all")
fastq_all.mkdir(exist_ok=True)

# Match R1 in the filename, but only if it is followed by a non-digit or non-character
# match "file_R1.fastq.gz", "file.R1_000.fastq.gz", etc. but
# do not match "SRR12345", "file_INFIXR12", etc
filename_pattern =  r'([^a-zA-Z0-9])R1([^a-zA-Z0-9])'

for i, (r1, r2) in enumerate(chunk_iter(fastqs, 2), start=1):
    # double escapes are required because nextflow processes this python 'template'
    if re.sub(filename_pattern, r'\\1R2\\2', r1.name) != r2.name:
        raise AssertionError(
            dedent(
                f"""\
                We expect R1 and R2 of the same sample to have the same filename except for R1/R2.
                This has been checked by replacing "R1" with "R2" in the first filename and comparing it to the second filename.
                If you believe this check shouldn't have failed on your filenames, please report an issue on GitHub!

                Files involved:
                    - {r1}
                    - {r2}
                """
            )
        )
    r1.rename(fastq_all / f"{sample_id}_S1_L{i:03d}_R1_001.fastq.gz")
    r2.rename(fastq_all / f"{sample_id}_S1_L{i:03d}_R2_001.fastq.gz")

# Run `cellranger count`
run_subprocess(
    [
        "cellranger", "count",
        "--id", "${prefix}",
        "--fastqs", str(fastq_all),
        "--transcriptome", "${reference.name}",
        "--localcores", "${task.cpus}",
        "--localmem", "${task.memory.toGiga()}",
        *shlex.split("""${args}""")
    ]
)

# Output `cellranger count` version information
proc_stdout,proc_stderr = run_subprocess(
    ["cellranger", "-V"]
)
version = proc_stdout.replace("cellranger cellranger-", "")

# Write the version information to a file
with open("versions.yml", "w") as f:
    f.write('"${task.process}":\\n')
    f.write(f'  cellranger: "{version}"\\n')

...but the cellranger count process in scrnaseq still returns an exit status of 1 when there is insufficient memory.

nick-youngblut avatar Jun 27 '24 02:06 nick-youngblut

No idea. But it shouldn't be hard to capture the exit code from the subprocess call and then do sys.exit(exitcode) in Python.

grst avatar Jun 28 '24 05:06 grst