glow icon indicating copy to clipboard operation
glow copied to clipboard

Simplify writing of sharded VCFs

Open Hoeze opened this issue 2 years ago • 3 comments

From the docs:

For the sharded VCF writer, the sample IDs are inferred from the first row of each partition and must be the same for each row. If the rows do not contain the same samples, provide a complete header of a filesystem path to a VCF file.

Therefore, the sharded VCF writer cannot be used when merging VCF files as it fails because of non-matching sample IDs. Would it be possible to change the behavior by taking the sorted union of all samples in the partition instead?


Another cool feature would be to be able to specify a mapping of sampleID, sample_partition:

# split samples into 10 groups
sampleId_mapping = (
    df
    .select(f.explode("genotypes.sampleId"))
    .repartitionByRange("sampleId", 10)
    .distinct()
    .withColumn("sample_group", f.spark_partition_id())
)

# now write to sharded VCF, partitioned by sampleId and range
(
    df
    .sort("contigName", "start")
    .write
    .format("vcf")
    # explicitly specify the sampleIds
    .save(path, partitionBy=["sample_group"], sampleIds=sampleId_mapping)
)

Resulting folder structure:

output_folder/
|- sample_group=group0/
       |-partxxxx0.vcf
       |-partxxxx1.vcf
       |-...
|- sample_group=group1/
       |-...

Hoeze avatar Jan 13 '22 16:01 Hoeze

I do think that this problem can be solved very efficiently with Delta Lake streaming, however it requires significant development effort to develop a formal merging algorithm. The issue you are describing is the n+1 problem in genomics

For now the best practice to merge samples is to either impute variants (for array data) or to use the GATK's genotypeGVCF algorithm (for exome / whole genome data)

williambrandler avatar Jan 14 '22 18:01 williambrandler

@williambrandler I do not mean the N+1 problem here.

I have ~6000 single-sample VCF files and I'd like to merge them into VCFs per chromosome and batch of 1000 samples. Therefore I would need to: for batch in batches:

  1. select samples in batch
  2. explode genotypes
  3. outer join to manually fill missing genotypes
  4. sort(variant), groupby(variant), collect genotypes to list
  5. write to vcf

This task would be massively simplified if Glow would automatically fill missing genotypes, depending on present sampleIds in the partition. In fact, the bigvcf write does exactly that, but on the whole dataset instead of per partition:

For the big VCF writer, the inferred sample IDs are the distinct set of all sample IDs from the DataFrame.

However, writing to bigvcf does not scale very well for large numbers of samples.

Hoeze avatar Jan 14 '22 19:01 Hoeze

This is a tricky problem

Ideally it requires the use of a formal joint-genotyping algorithm for a process like this (such as genotypeGVCF by the broad institute if you are working with whole genome data)

check out this blog and docs for how genotypeGVCF algorithm was parallelized with Apache Spark https://databricks.com/blog/2019/06/19/accurately-building-genomic-cohorts-at-scale-with-delta-lake-and-spark-sql.html

Or you can run it on a single node, but just a warning, it takes about 6 weeks to process 6k genomes on a single node.

If you do not have access to gVCFs (only VCF), you have to reprocess from the BAM files to gVCF.

The final resort would be to use merging logic similar to what you have described https://glow.readthedocs.io/en/latest/etl/merge.html#joint-genotyping

Also yes at that scale it does no longer make sense to work with flat files such as VCF, keep the pipeline in parquet/delta until you have results that can be exported (summary level stats, GWAS results)

williambrandler avatar Jan 14 '22 20:01 williambrandler

@Hoeze Unfortunately Spark's datasource API doesn't really allow us to run a query over the dataset or a partition before a write AFAIK, and I don't want to add a new requirement that each partition fit in memory for the sharded writer. However, I could provide you with an option to manually specify the sample ids. Technically you can already do that by passing in a full VCF header, but I understand that's a little cumbersome. Would this meet your needs? https://github.com/projectglow/glow/pull/650

The second unit test demonstrates how to get all the sample ids. If you understand your use case correctly, you could repeat the write per sample batch.

henrydavidge avatar Mar 21 '24 10:03 henrydavidge

@henrydavidge Yes, this would be a really helpful addition!

Hoeze avatar Mar 21 '24 11:03 Hoeze

Great. I will get the PR merged and documented.

henrydavidge avatar Mar 22 '24 02:03 henrydavidge