hail
hail copied to clipboard
[query] Hana / SEQR need support optimizing Hail Query code
What happened?
Hana Snow is the engineer for SEQR.
Previously, SEQR used elastic search as its datastore. Unfortunately, elastic search was very expensive because, to get reasonable performance, SEQR indexed nearly every field. The ES index was huge and the VM resources necessary to run an ES instance on that index were expensive (like 1000s USD per month).
I've been supporting Hana as much as I can, but she needs someone who can be more dedicated and responsive than me.
She uses a k8s cluster. She has a SEQR frontend deployment. She also has a Hail deployment (statefulset maybe?). The Hail pod has an SSD mounted read-only. That SSD has all the SEQR data in Hail Table form. There are many tables with annotations (variant metadata, like "probability this variant is damaging" or "likely causes this to happen to the protein"). There are also "per-family" tables which contain all the sequences within a single family. Many queries are directly against a particular family. Those tables are small and quick to read.
There's also one giant table containing all the sequences from all the families. That table is large and expensive to read. A lot of our engineering work has been around making sure queries against that table are fast.
Tim, at one point, had enough of her system locally that he could experiment with running queries on his laptop against his SSD. He hacked on the queries themselves and on Hail itself until the bandwidth was fast enough that the queries should complete fast enough on the full dataset. Fast enough varies but generally a couple tens of seconds is OK.
The work here is to pair with Hana to diagnose performance issues and make changes until the queries are acceptably fast. The first thing I would do is update her to the latest Hail (with the array decoder improvement as well as the memory overhead stuff on which Daniel is working). Then, with Hana's help, test the timing of some queries. If the queries are still too slow, your options are:
- Check the log files and the IR. Are there unnecessary shuffles? Is the code really large? Can we do less work maybe?
- Have Hana help you replicate her setup locally. You just need a slice of the data and enough of SEQR to run a query. Now hook up a profiler. What's slow? Can we do something about that?
Version
0.2.124
Relevant log output
No response
Dan to connect Hana with Ed. Reassign once that's done.
One obvious point of optimization that Dan already identified is the way we were keying our data is causing full shuffles, as we were keying the data by a string variant ID in the form 1-32683987-ACTCTT-A
instead of locus-allele. Changing back to keying on locus-allele fixes this issue for our more straightforward searches, but we have a search that looks for pairs of possible compound heterozygous variants in the same gene, and that still is resulting in 2 full shuffles. I'm a little at a loss for how to fix this, because we are grouping by an unsorted field so I'm not sure how to prevent us from working with an unsorted dataset. The offending code right now is as follows (somewhat simplified for readability):
primary_variants = hl.agg.filter(ch_ht[HAS_ALLOWED_ANNOTATION], hl.agg.collect(ch_ht.row))
secondary_variants = hl.agg.filter(ch_ht[HAS_ALLOWED_SECONDARY_ANNOTATION], hl.agg.collect(ch_ht.row))
ch_ht = ch_ht.group_by('gene_ids').aggregate(v1=primary_variants, v2=secondary_variants)
ch_ht = ch_ht.explode(ch_ht.v1)
ch_ht = ch_ht.explode(ch_ht.v2)
ch_ht = ch_ht.annotate(grouped_variants=hl.sorted([ch_ht.v1, ch_ht.v2], key=lambda v: (v.locus, v.alleles)))
ch_ht = ch_ht.key_by(
locus=ch_ht.grouped_variants[0].locus,
alleles=ch_ht.grouped_variants[0].alleles,
locus2=ch_ht.grouped_variants[1].locus,
alleles2=ch_ht.grouped_variants[1].alleles,
)
ch_ht = ch_ht.distinct()
...
# more filtering and annotating
...
return ch_ht._key_by_assert_sorted('locus', 'alleles')
Hmm. OK, this is an interesting one. You're basically trying to do:
- For each gene-interval, get all the variants in that gene.
- Filter to variants having the primary or secondary allowed annotation.
- Compute the crossproduct of variants matching the primary annotation and variants matching the secondary annotation.
- Filter the cross-product to the upper-right triangle including the diagonal (aka only store one of (x, y) and (y, x)).
- More filtering.
Let me see what the team thinks.
Filter the cross-product to the upper-right triangle including the diagonal (aka only store one of (x, y) and (y, x)).
To be a little pedantic, this filtering is not always filtering to the upper right triangle, as the primary and secondary variants are not necessarily the same set, and are sometimes even fully distinct. But "only store one of (x, y) and (y, x)" is accurate
Just to update, when updating to 0.2.125 I discovered a bug :( https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/array.20index.20out.20of.20bounds.20error.20on.20dict.2Eget
we now have our dev test environment running with hail 0.2.126 and this query took ~92 seconds. So faster, but we do still need some performance enhancements. @ehigham let me know what would be helpful for you to get you started on this effort
@hanars , for my own curiosity, what was the timing on this query before 0.2.126?
104s. So like a 10% improvement
I was able to figure out how to remove all the "network shuffle"s and "coerced sorted dataset"s and that improved the time down to 73 seconds, so a big improvement! I would still hope to improve performance a bit more, being reliably under a minute would be helpful
Here are the logs from that search, let me know what else I can do to help improve the performance or to help you figure it out: hail-search.log
PR is here if you are interested: https://github.com/broadinstitute/seqr/pull/3717
cc: @ehigham
seqr hail search code is here: https://github.com/broadinstitute/seqr/tree/master/hail_search
Running the aiohttp service is just python -m hail_search
. Runs on hail 0.2.126
The data you need is gs://seqr-datasets/v03/GRCh38/SNV_INDEL/runs/manual__2023-11-07T23-31-23.149902+00-00
For running the service, you need a DATASETS_DIR
env variable defined, and that data should be available at $DATASETS_DIR/GRCh38/SNV_INDEL
Post body for a relatively quick search:
{
"genome_version": "GRCh38",
"num_results": 100,
"annotations": {
"in_frame": [
"inframe_insertion",
"inframe_deletion"
],
"missense": [
"stop_lost",
"initiator_codon_variant",
"start_lost",
"protein_altering_variant",
"missense_variant"
],
"nonsense": [
"stop_gained"
],
"splice_ai": "0.2",
"frameshift": [
"frameshift_variant"
],
"structural": [],
"extended_splice_site": [],
"essential_splice_site": [
"splice_donor_variant",
"splice_acceptor_variant"
],
"structural_consequence": [
"LOF",
"DUP_LOF",
"INV_SPAN",
"COPY_GAIN"
]
},
"datasetType": "VARIANTS",
"pathogenicity": {
"hgmd": [
"disease_causing"
],
"clinvar": [
"pathogenic",
"likely_pathogenic"
]
},
"dataset_type": "ALL",
"secondary_dataset_type": null,
"inheritance_mode": "de_novo",
"inheritance_filter": {
"A": "has_alt",
"N": "ref_ref"
},
"sample_data": {
"SNV_INDEL": [
{
"sample_id": "RGP_2436_2_D1",
"individual_guid": "I0097169_rgp_2436_2",
"family_guid": "F041731_rgp_2436",
"project_guid": "R0594_rare_genomes_project_gen",
"affected": "N"
},
{
"sample_id": "RGP_2436_3_D1",
"individual_guid": "I0097170_rgp_2436_3",
"family_guid": "F041731_rgp_2436",
"project_guid": "R0594_rare_genomes_project_gen",
"affected": "A"
},
{
"sample_id": "RGP_2436_1_D1",
"individual_guid": "I0097168_rgp_2436_1",
"family_guid": "F041731_rgp_2436",
"project_guid": "R0594_rare_genomes_project_gen",
"affected": "N"
}
]
},
"sort": "xpos",
"sort_metadata": null,
"frequencies": {
"g1k": {
"ac": null,
"af": 1
},
"exac": {
"ac": null,
"af": 1
},
"topmed": {
"ac": null,
"af": 1
},
"gnomad_svs": {
"ac": null,
"af": 0.001
},
"sv_callset": {
"ac": null,
"af": 0.001
},
"gnomad_exomes": {
"ac": null,
"af": 0.001
},
"gnomad_genomes": {
"ac": null,
"af": 0.001
},
"seqr": {
"ac": null,
"af": 0.01
}
},
"quality_filter": {
"min_ab": 20,
"min_gq": 30,
"min_qs": 50,
"vcf_filter": "pass"
},
"custom_query": null,
"intervals": null,
"exclude_intervals": null,
"gene_ids": null,
"variant_ids": null,
"rs_ids": null
}
Example of a slow search that needs improvement:
{
"genome_version": "GRCh38",
"num_results": 100,
"annotations": {
"other": [
"non_coding_exon_variant"
],
"in_frame": [
"inframe_insertion",
"inframe_deletion"
],
"missense": [
"stop_lost",
"initiator_codon_variant",
"start_lost",
"protein_altering_variant",
"missense_variant"
],
"nonsense": [
"stop_gained"
],
"splice_ai": "0.1",
"frameshift": [
"frameshift_variant"
],
"structural": [
"gCNV_DUP",
"gCNV_DEL"
],
"synonymous": [],
"extended_splice_site": [],
"essential_splice_site": [
"splice_donor_variant",
"splice_acceptor_variant"
],
"structural_consequence": [
"LOF",
"DUP_LOF",
"MSV_EXON_OVR",
"INTRAGENIC_EXON_DUP",
"PARTIAL_EXON_DUP"
]
},
"pathogenicity": {
"hgmd": [
"disease_causing"
],
"clinvar": [
"pathogenic",
"likely_pathogenic",
"vus_or_conflicting"
]
},
"annotations_secondary": {
"other": [
"transcript_ablation",
"transcript_amplification",
"5_prime_UTR_variant",
"3_prime_UTR_variant",
"TFBS_ablation",
"TFBS_amplification",
"TF_binding_site_variant",
"regulatory_region_variant",
"regulatory_region_ablation",
"regulatory_region_amplification",
"non_coding_transcript_exon_variant__canonical"
],
"in_frame": [
"inframe_insertion",
"inframe_deletion"
],
"missense": [
"stop_lost",
"initiator_codon_variant",
"start_lost",
"protein_altering_variant",
"missense_variant"
],
"nonsense": [
"stop_gained"
],
"frameshift": [
"frameshift_variant"
],
"structural": [
"gCNV_DEL",
"gCNV_DUP"
],
"synonymous": [
"synonymous_variant",
"stop_retained_variant"
],
"extended_splice_site": [
"splice_region_variant"
],
"essential_splice_site": [
"splice_donor_variant",
"splice_acceptor_variant"
],
"structural_consequence": [
"LOF",
"DUP_LOF",
"INTRONIC",
"UTR",
"PROMOTER",
"INTRAGENIC_EXON_DUP",
"PARTIAL_EXON_DUP"
]
},
"dataset_type": "ALL",
"secondary_dataset_type": "ALL",
"inheritance_mode": "recessive",
"inheritance_filter": {
"A": null,
"N": null
},
"sample_data": {
"SNV_INDEL": [
{
"sample_id": "RGP_2436_2_D1",
"individual_guid": "I0097169_rgp_2436_2",
"family_guid": "F041731_rgp_2436",
"project_guid": "R0594_rare_genomes_project_gen",
"affected": "N"
},
{
"sample_id": "RGP_2436_3_D1",
"individual_guid": "I0097170_rgp_2436_3",
"family_guid": "F041731_rgp_2436",
"project_guid": "R0594_rare_genomes_project_gen",
"affected": "A"
},
{
"sample_id": "RGP_2436_1_D1",
"individual_guid": "I0097168_rgp_2436_1",
"family_guid": "F041731_rgp_2436",
"project_guid": "R0594_rare_genomes_project_gen",
"affected": "N"
}
]
},
"sort": "xpos",
"sort_metadata": null,
"frequencies": {
"g1k": {
"ac": null,
"af": 1
},
"exac": {
"ac": null,
"af": 1
},
"topmed": {
"ac": null,
"af": 1
},
"gnomad_svs": {
"ac": null,
"af": 0.01
},
"sv_callset": {
"ac": null,
"af": 0.01
},
"gnomad_exomes": {
"ac": null,
"af": 0.01
},
"gnomad_genomes": {
"ac": null,
"af": 0.01
},
"seqr": {
"ac": null,
"af": 0.03
}
},
"quality_filter": {
"min_ab": 10,
"min_gq": 40,
"min_qs": 50
},
"custom_query": null,
"intervals": null,
"exclude_intervals": null,
"gene_ids": null,
"variant_ids": null,
"rs_ids": null
}
Let me know if there is anything else you need that I can help with!
Here are the mean elapsed times for the two queries above as measured on my machine using the local
and spark
backends with localised data.
query | local | spark |
---|---|---|
0 | 7s | 5s |
1 | 20s | 13s |
@hanars, do these relative differences between queries match your observations?
In our dev environment the first query runs in 10s, but the second one runs in 77s, if it ran in 20 we'd be fine with it. As a sanity check, the first query should have 3 results and the second should have 85
One possible point of confusion, there's a Hail Query "LocalBackend" which is a mostly internal thing and that's different than spark-in-local-mode (aka hl.init(..., master='local[*]')
); I'm pretty sure we're using spark-in-local-mode for SEQR things.
Ah the timings were slightly off as I had not downloaded all the data and was using some from hail_search/fixtures
.
After pulling down the SNV_INDELS
data, my updated timings are:
query | results | elapsed |
---|---|---|
0 | 4 | 7s |
1 | 83 | 50s |
I'm pretty sure we're using spark-in-local-mode for SEQR things.
We run hl.init(idempotent=True)
on the hail docker image for seqr. Not sure whether that is running spark-in-local mode or not
the data should be really deterministic so I'm not sure why those counts are off, but at least those counts and times are more accurate to what we are seeing so probably it means your test environment is reasonable. I would recommend against using the hail_search/fixtures
data for anything other than running the test suite, its not designed to play nicely with real data
the data should be really deterministic so I'm not sure why those counts are off, but at least those counts and times are more accurate to what we are seeing so probably it means your test environment is reasonable.
I guess the exact query isn't important, more the code shape and the kinds of operations hail's doing. Given the timings, it may be more-or-less representative.
Dan and I have profiled the second query and have a couple of thoughts.
We've identified one source of slowness in the compiler and generated code that relates to how we handle I/O. This is currently under active development and hope will lead to decent performance improvements. The change itself is significant, however, so I can't comment on timelines for when to expect the work to be complete by.
There may be some code tweaks that do less work as discussed. I'll have a play with the source code in hail_search
and report back.
awesome, thanks for looking into it!
The current logic of key_by
and distinct
to remove duplicate pairs is actually not properly deduplicating pairs in some cases. Since I know that thats not really the approach we want in the long run I didn't bother figuring out why, and instead tried implementing a version of the code that filters out duplicate pairs before exploding, which should result in less work overall. However, I somehow introduced an extra Ordering unsorted dataset with network shuffle
and the overall performance of that search got slower by 15 seconds.
Here is the change I made. Let me know if you have a better approach for filtering out duplicates, or if you see any ways to reorganize this code to make it less shuffle-y https://github.com/broadinstitute/seqr/commit/2e45403efc159b58cec723f86e6de7653d64cf5f
I've taking a similar approach to try to remove unwanted elements before exploding. Sadly, I haven't seen any noticable improvement. I'm also not sure about correctness as I couldn't get your changes in 2e45403 to work (got errors about missing gene_ids
). I saw ~20 fewer results in the second query with the code below.
Here's what I wrote based off master.
def _filter_compound_hets(self):
ch_ht = self._ht
if self._is_recessive_search:
ch_ht = ch_ht.filter(ch_ht.comp_het_family_entries.any(hl.is_defined))
# Get possible pairs of variants within the same gene
ch_ht = ch_ht.annotate(gene_ids=self._gene_ids_expr(ch_ht, comp_het=True))
ch_ht = ch_ht.explode(ch_ht.gene_ids)
# Filter allowed transcripts to the grouped gene
transcript_annotations = {
k: ch_ht[k].filter(lambda t: t.gene_id == ch_ht.gene_ids)
for k in [ALLOWED_TRANSCRIPTS, ALLOWED_SECONDARY_TRANSCRIPTS] if k in ch_ht.row
}
if transcript_annotations:
ch_ht = ch_ht.annotate(**transcript_annotations)
primary_filters = self._get_annotation_filters(ch_ht)
secondary_filters = self._get_annotation_filters(ch_ht, is_secondary=True)
self.unfiltered_comp_het_ht = ch_ht.filter(hl.any(primary_filters + secondary_filters))
if self._has_secondary_annotations and not (primary_filters and secondary_filters):
# In cases where comp het pairs must have different data types, there are no single data type results
return None
primary_variants = hl.agg.filter(hl.any(primary_filters), hl.agg.collect(ch_ht.row))
if secondary_filters:
row_agg = ch_ht.row
if ALLOWED_TRANSCRIPTS in row_agg and ALLOWED_SECONDARY_TRANSCRIPTS in row_agg:
# Ensure main transcripts are properly selected for primary/secondary annotations in variant pairs
row_agg = row_agg.annotate(**{ALLOWED_TRANSCRIPTS: row_agg[ALLOWED_SECONDARY_TRANSCRIPTS]})
secondary_variants = hl.agg.filter(hl.any(secondary_filters), hl.agg.collect(row_agg))
else:
secondary_variants = primary_variants
>>> start of changes:
def unique_combinations(v1s, v2s):
return hl.rbind(
hl.sorted(
hl.array(hl.set(v2s).difference(hl.set(v1s))),
key=lambda x: x.variant_id
),
lambda v2s_uniq:
v1s.flatmap(lambda v1:
v2s_uniq.map(lambda v2: hl.array([v1, v2]))
)
)
ch_ht = ch_ht \
.group_by('gene_ids') \
.aggregate(vs=unique_combinations(primary_variants, secondary_variants)) \
.explode('vs') \
.key_by()
ch_ht = ch_ht.annotate(
valid_families=hl.map(
self._is_valid_comp_het_family,
ch_ht.vs[0].comp_het_family_entries,
ch_ht.vs[1].comp_het_family_entries
)
)
ch_ht = ch_ht.filter(ch_ht.valid_families.any(lambda x: x))
return ch_ht.select(**{
GROUPED_VARIANTS_FIELD: hl.array([
self._annotated_comp_het_variant(ch_ht, ch_ht.vs[k])
for k in [0, 1]
])
})
@staticmethod
def _annotated_comp_het_variant(ch_ht, variant):
return variant.annotate(
gene_id=ch_ht.gene_ids,
family_entries=hl.enumerate(ch_ht.valid_families).filter(
lambda x: x[1]).map(lambda x: variant.comp_het_family_entries[x[0]]),
)
Oh yeah, to get my code to work you need to comment out line 778 gene_id=ch_ht.gene_ids,
in _annotated_comp_het_variant
. It doesn't break search to be missing that annotation, it just has some downstream display affects that I would need to fix if I actually wanted to use the code, but given the performance hit I wasn't sure it was worth figuring that out as this code may be too slow to use
I was not able to get the code you provided here to run either, but one concern I have with it is that the unique combinations are computed per gene, but if you have a pair of variants that are each in the same 2 genes, you would get the pair twice in the results, one for each gene
The error I get when I run the code you provide is
"Key type mismatch: cannot index table with given expressions:
Table key: <<<empty key>>>
Index Expressions: locus<GRCh38>, array<str>, set<str>, array<array<struct{GQ: int32, AB: float64, DP: int32, GT: call, sampleId: str, sampleType: str, individualGuid: str, familyGuid: str, affected_id: int32}>>, array<array<struct{GQ: int32, AB: float64, DP: int32, GT: call, sampleId: str, sampleType: str, individualGuid: str, familyGuid: str, affected_id: int32}>>, struct{z_score: float32}, struct{region_type_ids: array<int32>}, locus<GRCh37>, str, array<struct{amino_acids: str, canonical: int32, codons: str, gene_id: str, hgvsc: str, hgvsp: str, transcript_id: str, biotype_id: int32, consequence_term_ids: array<int32>, is_lof_nagnag: bool, lof_filter_ids: array<int32>, transcript_rank: int32}>, str, int64, struct{PHRED: float32}, struct{alleleId: int32, conflictingPathogenicities: array<struct{pathogenicity_id: int32, count: int32}>, goldStars: int32, pathogenicity_id: int32, assertion_ids: array<int32>}, struct{REVEL_score: float32, VEST4_score: float32, MutPred_score: float32, SIFT_pred_id: int32, Polyphen2_HVAR_pred_id: int32, MutationTaster_pred_id: int32, fathmm_MKL_coding_pred_id: int32}, struct{Eigen_phred: float32}, struct{AF_POPMAX: float32, AF: float32, AC_Adj: int32, AC_Het: int32, AC_Hom: int32, AC_Hemi: int32, AN_Adj: int32}, struct{AF: float32, AN: int32, AC: int32, Hom: int32, AF_POPMAX_OR_GLOBAL: float32, FAF_AF: float32, Hemi: int32}, struct{AF: float32, AN: int32, AC: int32, Hom: int32, AF_POPMAX_OR_GLOBAL: float32, FAF_AF: float32, Hemi: int32}, struct{MPC: float32}, struct{score: float32}, struct{delta_score: float32, splice_consequence_id: int32}, struct{AC: int32, AF: float32, AN: int32, Hom: int32, Het: int32}, struct{accession: str, class_id: int32}, struct{AC: int32, AN: int32, AF: float32, hom: int32}, array<struct{amino_acids: str, canonical: int32, codons: str, gene_id: str, hgvsc: str, hgvsp: str, transcript_id: str, biotype_id: int32, consequence_term_ids: array<int32>, is_lof_nagnag: bool, lof_filter_ids: array<int32>, transcript_rank: int32}>, bool, array<struct{amino_acids: str, canonical: int32, codons: str, gene_id: str, hgvsc: str, hgvsp: str, transcript_id: str, biotype_id: int32, consequence_term_ids: array<int32>, is_lof_nagnag: bool, lof_filter_ids: array<int32>, transcript_rank: int32}>, bool, str"
You'll have to provide me with the full error to help, I don't know where that's coming from. Perhaps I edited another function elsewhere and didn't include it up there.
that are each in the same 2 genes
The same two genes? How can a gene appear twice when you've grouped by the gene id?
So lets say primary variants are A and B and the secondary variants are B and C. And lets say we have genes 1 and 2. Lets say the table grouped by gene ids looks like:
gene | primary | secondary
1 | A, B | B, C
2 | B | B, C
The possible unique permutations by gene would then be
gene | primary | secondary
1 | A | B
1 | A | C
1 | B | C
2 | B | C
So all the permutations by gene are unique, but then when you just pull out the pairs themselves the pair of variant B, C would be returned twice
You're concerned about having the same pair across different genes? That's what the code in master did, right? Which gene do you want the pair to be associated with? Why not add a group-by pair to get them all?
yeah, it keyed by the pairs and then called distinct, which should theoretically get you the list of distinct pairs. Bit it did that at the end after computing and annotating all the possibilities
Oh I see. Thanks for clarifying - I wasn't sure what that bit did! That should be a simple fix, though perhaps at this point not worth it as this is not a fruitful optimisation for this query
Next steps:
- upload the profile, the
mt.describe()
, metadata.json.gz from the MT/HT to the team chat and get feedback (Chris, Patrick take a look).
Decode appears quite slow.
That should be a simple fix, though perhaps at this point not worth it as this is not a fruitful optimization for this query
Agreed, although depending on the time line for a good optimization for this query I may circle back on this, as there is currently a bug in this deduplication so if the actual optimizaion won;t be available for a while it might be worth fixing in the meantime