Source code for geno2phenotb.vcf_columns_extractor_geno

"""Collects variants relevant to the resistance genotype appearing in a vcf."""


import logging
import os
import re
import warnings
from typing import List, Optional, Set, Tuple

import pandas as pd

import geno2phenotb.utils as utils

__author__ = "Bernhard Reuter, Jules Kreuer"
__copyright__ = "Bernhard Reuter, Jules Kreuer"
__license__ = "LGPL-3.0-only"

_logger = logging.getLogger(__name__)


[docs]def vcf_columns_extractor_geno( in_path: str, resistance_variants_set: Set[str] ) -> Tuple[Optional[Set[str]], str]: r""" Collect known resistance variants from a VCF file into a list and return it. This function serves to collect known resistance variants, which are relevant to determine the resistance genotype, from a variant call file into a list `columns` and return it. The variants are collected from the VCF based on a set `resistance_variants_set` of known resistance variants. Additionally all InDels and early stop condons (e.g. :spelling:ignore:`A123_`) in the following genes (not mentioned in the catalog) are extracted as well, since they result in a resistant genotype: ethA=Rv3854c (ETH), pncA=Rv2043c (PZA), gidB/gid=Rv3919c (STR), rpoB=Rv0667 (RIF), Rv0678 (BDQ/CFZ), ald=Rv2780 (DCS), katG=Rv1908c (INH), ddn=Rv3547 (DLM), tlyA=Rv1694 (CAP). Parameters ---------- in_path : str Path to the VCF file to extract variants from. resistance_variants_set : set Set of all known resistance variants. Returns ------- columns : set or None Set of all variants extracted from the given VCF. If no resistance variants were found `columns` will be `None`. identifier : str Identifier (single ENA run accession number or combination thereof) extracted from the VCF file name. """ _logger.debug(f"Extracting VCF geno columns: {in_path}") # Get regexstring of amino acids aminos = utils.get_aminos() # All InDels and early stop condons (e.g. Ala123) in the following genes (not mentioned in the # catalog) result in a genotype=R (additionally to the known resistance variants from the # Masterlist) and are thus extracted as well: # ethA=Rv3854c (ETH), pncA=Rv2043c (PZA), gidB/gid=Rv3919c (STR), rpoB=Rv0667 (RIF), # Rv0678 (BDQ/CFZ), ald=Rv2780 (DCS), katG=Rv1908c (INH), ddn=Rv3547 (DLM), tlyA=Rv1694 (CAP) key_genes = re.compile( r"(^Rv3854c:[0-9]+_ins_[ACGT]+$)|(^Rv3854c:([0-9]+_){0,1}[0-9]+_del$)|(^Rv3854c:" + aminos + "[0-9]+_$)" + "|(^Rv2043c:[0-9]+_ins_[ACGT]+$)|(^Rv2043c:([0-9]+_){0,1}[0-9]+_del$)|(^Rv2043c:" + aminos + "[0-9]+_$)" + "|(^Rv3919c:[0-9]+_ins_[ACGT]+$)|(^Rv3919c:([0-9]+_){0,1}[0-9]+_del$)|(^Rv3919c:" + aminos + "[0-9]+_$)" + "|(^Rv0667:[0-9]+_ins_[ACGT]+$)|(^Rv0667:([0-9]+_){0,1}[0-9]+_del$)|(^Rv0667:" + aminos + "[0-9]+_$)" + "|(^Rv0678:[0-9]+_ins_[ACGT]+$)|(^Rv0678:([0-9]+_){0,1}[0-9]+_del$)|(^Rv0678:" + aminos + "[0-9]+_$)" + "|(^Rv2780:[0-9]+_ins_[ACGT]+$)|(^Rv2780:([0-9]+_){0,1}[0-9]+_del$)|(^Rv2780:" + aminos + "[0-9]+_$)" + "|(^Rv1908c:[0-9]+_ins_[ACGT]+$)|(^Rv1908c:([0-9]+_){0,1}[0-9]+_del$)|(^Rv1908c:" + aminos + "[0-9]+_$)" + "|(^Rv3547:[0-9]+_ins_[ACGT]+$)|(^Rv3547:([0-9]+_){0,1}[0-9]+_del$)|(^Rv3547:" + aminos + "[0-9]+_$)" + "|(^Rv1694:[0-9]+_ins_[ACGT]+$)|(^Rv1694:([0-9]+_){0,1}[0-9]+_del$)|(^Rv1694:" + aminos + "[0-9]+_$)" ) # extract deletions from resi_variants_set deletions: List[str] = [] for var in resistance_variants_set: if bool(re.match(r".*_del$", var)): deletions.append(var) split = os.path.basename(in_path).split("_") identifier_list = [] for x in split: if bool(re.match(r"(^ERR[0-9]+)|(^SRR[0-9]+)", x)): identifier_list.append(x) identifier = "_".join(identifier_list) vcf = pd.read_table( in_path, sep="\t", usecols=["#Pos", "Type", "Subst", "Gene"], dtype=str, ) amino_ann = re.compile(utils.get_amino_ann()) columns: Set[str] = set() for i in range(vcf.shape[0]): genome_pos = int(vcf.at[i, "#Pos"].strip()) if not 0 < genome_pos <= 4411532: warnings.warn(f"Position {genome_pos} is not in (0,4411532] for {in_path}.") allel_type: str = vcf.at[i, "Type"].strip() if not bool(re.fullmatch(r"^Ins|Del|SNP$", allel_type)): warnings.warn( f"""ref_allele is not in [Ins,Del,SNP], but {allel_type} at position {genome_pos} for {in_path}.""" ) ann: str = vcf.at[i, "Subst"].strip() gene: str = vcf.at[i, "Gene"].strip() if allel_type == "Ins" and not bool(re.fullmatch(amino_ann, ann)): if len(ann) > 0: pos = int(ann.split("_")[0]) if not 0 < pos <= 4411532: warnings.warn(f"Position {pos} is not in (0,4411532] for {in_path}.") if len(gene) > 0: variant = f"{gene}:{ann}" else: variant = f"-:{ann}" if bool(re.fullmatch(key_genes, variant)) or {variant} & resistance_variants_set: columns.update([variant]) elif allel_type == "Del" and not bool(re.fullmatch(amino_ann, ann)): if len(ann) > 0: split = ann.split("_") if len(gene) > 0: variant = f"{gene}:{ann}" else: variant = "-:" + ann if len(split) == 2: if not 0 < int(split[0]) <= 4411532: warnings.warn(f"Position {split[0]} is not in (0,4411532] for {in_path}.") if bool(re.fullmatch(key_genes, variant)): columns.update([variant]) else: pos = int(split[0]) for ref in deletions: split1 = ref.split(":") ref_gene = split1[0] ref_ann = split1[1] split1 = ref_ann.split("_") ref_pos = int(split1[0]) if pos == ref_pos and gene == ref_gene: columns.update([variant]) elif len(split) == 3: if not 0 < int(split[0]) < int(split[1]) <= 4411532: warnings.warn( f"Unexpected positions in Del annotation {ann} for {in_path}." ) if bool(re.fullmatch(key_genes, variant)): columns.update([variant]) else: pos1 = int(split[0]) pos2 = int(split[1]) for ref in deletions: split1 = ref.split(":") ref_gene = split1[0] ref_ann = split1[1] split1 = ref_ann.split("_") ref_pos = int(split1[0]) if pos1 <= ref_pos <= pos2 and gene == ref_gene: columns.update([variant]) else: warnings.warn(f"Irregular Del annotation {ann} for {in_path}.") elif allel_type == "SNP" and not bool(re.fullmatch(amino_ann, ann)): if len(ann) > 0: if len(gene) > 0: variant = f"{gene}:{ann}" else: variant = f"-:{ann}" if {variant} & resistance_variants_set: columns.update([variant]) else: warnings.warn(f"Unexpected SNP annotation {ann} for {in_path}.") elif allel_type == "SNP" and bool(re.fullmatch(amino_ann, ann)): if len(gene) > 0: variant = f"{gene}:{ann}" else: variant = f"-:{ann}" if bool(re.fullmatch(key_genes, variant)) or {variant} & resistance_variants_set: columns.update([variant]) else: warnings.warn( f"Annotation of type {allel_type} at position {genome_pos} is {ann} for {in_path}." ) if len(columns) > 0: return columns, identifier else: return None, identifier