diff --git a/gnomad/utils/sparse_mt.py b/gnomad/utils/sparse_mt.py index 6714bb629..4696027dc 100644 --- a/gnomad/utils/sparse_mt.py +++ b/gnomad/utils/sparse_mt.py @@ -169,6 +169,8 @@ def _get_info_agg_expr( ] = INFO_AGG_FIELDS["array_sum_agg_fields"], prefix: str = "", treat_fields_as_allele_specific: bool = False, + retain_cdfs: bool = False, + cdf_k: int = 200, ) -> Dict[str, hl.expr.Aggregation]: """ Create Aggregators for both site or AS info expression aggregations. @@ -194,7 +196,15 @@ def _get_info_agg_expr( :param array_sum_agg_fields: Fields to aggregate using element-wise summing over an array. :param prefix: Optional prefix for the fields. Used for adding 'AS_' in the AS case. - :param treat_fields_as_allele_specific: Treat info fields as allele-specific. Defaults to False. + :param treat_fields_as_allele_specific: Treat info fields as allele-specific. + Defaults to False. + :param retain_cdfs: If True, retains the cumulative distribution functions (CDFs) + as an annotation for `median_agg_fields`. Keeping the CDFs is useful for + annotations that require calculating the median across combined datasets at a + later stage. Default is False. + :param cdf_k: Parameter controlling the accuracy vs. memory usage tradeoff when + retaining CDFs. A higher value of `cdf_k` results in a more accurate CDF + approximation but increases memory usage and computation time. Default is 200. :return: Dictionary of expression names and their corresponding aggregation Expression. """ @@ -253,6 +263,21 @@ def _agg_list_to_dict( (array_sum_agg_fields, hl.agg.array_sum), ] + if retain_cdfs: + # Note: hl.agg.approx_cdf is a non-deterministic method and cannot be seeded. + # Results may vary with each rerun. + cdf_median_agg_fields = {} + # Store values for each median agg fields in a new dictionary with "_cdf" + # appended to the annotation name. + for k, v in median_agg_fields.items(): + cdf_median_agg_fields[f"{k}_cdf"] = v + # Append the cdf annotations to the aggs list. Set '_raw' to True to return + # a representation of the internal state of the CDF, which allows for mergining + # with other CDFs downstream. + aggs.append( + (cdf_median_agg_fields, lambda x: hl.agg.approx_cdf(x, k=cdf_k, _raw=True)) + ) + # Create aggregators. agg_expr = {} for agg_fields, agg_func in aggs: @@ -263,7 +288,9 @@ def _agg_list_to_dict( # decimal place followed by a comma and the corresponding count for # that value, so we want to sum the rank sum value (first element). # Rename annotation in the form 'AS_RAW_*_RankSum' to 'AS_*_RankSum'. - if k.startswith("AS_RAW_") and k.endswith("RankSum"): + if k.startswith("AS_RAW_") and ( + k.endswith("RankSum") or k.endswith("RankSum_cdf") + ): agg_expr[f"{prefix}{k.replace('_RAW', '')}"] = hl.agg.array_agg( lambda x: agg_func(hl.or_missing(hl.is_defined(x), x[0])), expr ) @@ -360,6 +387,8 @@ def get_as_info_expr( ] = INFO_AGG_FIELDS["array_sum_agg_fields"], alt_alleles_range_array_field: str = "alt_alleles_range_array", treat_fields_as_allele_specific: bool = False, + retain_cdfs: bool = False, + cdf_k: int = 200, ) -> hl.expr.StructExpression: """ Return an allele-specific annotation Struct containing typical VCF INFO fields from GVCF INFO fields stored in the MT entries. @@ -394,6 +423,13 @@ def get_as_info_expr( of alternate alleles e.g., `hl.range(1, hl.len(mt.alleles))` :param treat_fields_as_allele_specific: Treat info fields as allele-specific. Defaults to False. + :param retain_cdfs: If True, retains the cumulative distribution functions (CDFs) + as an annotation for `median_agg_fields`. Keeping the CDFs is useful for + annotations that require calculating the median across combined datasets at a + later stage. Default is False. + :param cdf_k: Parameter controlling the accuracy vs. memory usage tradeoff when + retaining CDFs. A higher value of `cdf_k` results in a more accurate CDF + approximation but increases memory usage and computation time. Default is 200. :return: Expression containing the AS info fields """ if "DP" in list(sum_agg_fields) + list(int32_sum_agg_fields): @@ -411,6 +447,8 @@ def get_as_info_expr( array_sum_agg_fields=array_sum_agg_fields, prefix="" if treat_fields_as_allele_specific else "AS_", treat_fields_as_allele_specific=treat_fields_as_allele_specific, + retain_cdfs=retain_cdfs, + cdf_k=cdf_k, ) if alt_alleles_range_array_field not in mt.row or mt[ @@ -485,6 +523,8 @@ def get_site_info_expr( array_sum_agg_fields: Union[ List[str], Dict[str, hl.expr.ArrayNumericExpression] ] = INFO_AGG_FIELDS["array_sum_agg_fields"], + retain_cdfs: bool = False, + cdf_k: int = 200, ) -> hl.expr.StructExpression: """ Create a site-level annotation Struct aggregating typical VCF INFO fields from GVCF INFO fields stored in the MT entries. @@ -505,6 +545,13 @@ def get_site_info_expr( :param sum_agg_fields: Fields to aggregate using sum. :param int32_sum_agg_fields: Fields to aggregate using sum using int32. :param median_agg_fields: Fields to aggregate using (approximate) median. + :param retain_cdfs: If True, retains the cumulative distribution functions (CDFs) + as an annotation for `median_agg_fields`. Keeping the CDFs is useful for + annotations that require calculating the median across combined datasets at a + later stage. Default is False. + :param cdf_k: Parameter controlling the accuracy vs. memory usage tradeoff when + retaining CDFs. A higher value of `cdf_k` results in a more accurate CDF + approximation but increases memory usage and computation time. Default is 200. :return: Expression containing the site-level info fields """ if "DP" in list(sum_agg_fields) + list(int32_sum_agg_fields): @@ -519,6 +566,8 @@ def get_site_info_expr( int32_sum_agg_fields=int32_sum_agg_fields, median_agg_fields=median_agg_fields, array_sum_agg_fields=array_sum_agg_fields, + retain_cdfs=retain_cdfs, + cdf_k=cdf_k, ) # Add FS and SOR if SB is present @@ -550,6 +599,8 @@ def default_compute_info( n_partitions: Optional[int] = 5000, lowqual_indel_phred_het_prior: int = 40, ac_filter_groups: Optional[Dict[str, hl.Expression]] = None, + retain_cdfs: bool = False, + cdf_k: int = 200, ) -> hl.Table: """ Compute a HT with the typical GATK allele-specific (AS) info fields as well as ACs and lowqual fields. @@ -577,6 +628,13 @@ def default_compute_info( for VQSR. :param ac_filter_groups: Optional dictionary of sample filter expressions to compute additional groupings of ACs. Default is None. + :param retain_cdfs: If True, retains the cumulative distribution functions (CDFs) + as an annotation for `median_agg_fields`. Keeping the CDFs is useful for + annotations that require calculating the median across combined datasets at a + later stage. Default is False. + :param cdf_k: Parameter controlling the accuracy vs. memory usage tradeoff when + retaining CDFs. A higher value of `cdf_k` results in a more accurate CDF + approximation but increases memory usage and computation time. Default is 200. :return: Table with info fields :rtype: Table """ @@ -602,7 +660,7 @@ def default_compute_info( # Compute quasi-AS info expr. if quasi_as_annotations: - info_expr = get_as_info_expr(mt) + info_expr = get_as_info_expr(mt, retain_cdfs=retain_cdfs, cdf_k=cdf_k) # Compute AS info expr using gvcf_info allele specific annotations. if as_annotations: @@ -612,6 +670,8 @@ def default_compute_info( mt, **AS_INFO_AGG_FIELDS, treat_fields_as_allele_specific=True, + retain_cdfs=retain_cdfs, + cdf_k=cdf_k, ) if info_expr is not None: @@ -621,7 +681,7 @@ def default_compute_info( ) if site_annotations: - site_expr = get_site_info_expr(mt) + site_expr = get_site_info_expr(mt, retain_cdfs=retain_cdfs, cdf_k=cdf_k) if info_expr is None: info_expr = site_expr else: