Source code for geno2phenotb.annotate_vcf

"""Annotates VCF."""

import logging
import re
import warnings
from typing import List, Optional

import pandas as pd

__author__ = "Bernhard Reuter, Jules Kreuer"
__copyright__ = "Bernhard Reuter, Jules Kreuer"
__license__ = "LGPL-3.0-only"

_logger = logging.getLogger(__name__)


[docs]def annotate_vcf(in_path: str) -> None: """Complete missing annotations in the 'Subst' column of a MTBseq variant call file (VCF). Parameters ---------- in_path : str Path to the VCF file to annotate/complete. Returns ------- None Writes output to new file in the same directory. """ _logger.debug(f"Annotating VCF: {in_path}") vcf = pd.read_table(in_path, sep="\t", dtype=str) vcf_g = vcf.copy(deep=True) i = 0 while i < vcf.shape[0]: genome_pos = int(vcf.at[i, "#Pos"].strip()) if not 0 < genome_pos <= 4411532: warnings.warn(f"Position {genome_pos} is not in (0,4411532] for {in_path}.") ann: str = vcf.at[i, "Subst"].strip().split(" ")[0] amino_ann = ( r"(^Ala|^Arg|^Asn|^Asp|^Cys|^Gln|^Glu|^Gly|^His|^Ile|^Leu|^Lys|^Met|^Phe|^Pro|^Ser|" r"^Thr|^Trp|^Tyr|^Val|^_)[0-9]+(Ala$|Arg$|Asn$|Asp$|Cys$|Gln$|Glu$|Gly$|His$|Ile$|" r"Leu$|Lys$|Met$|Phe$|Pro$|Ser$|Thr$|Trp$|Tyr$|Val$|_$)" ) if len(ann) > 0: if not bool(re.fullmatch(amino_ann, ann)): warnings.warn( f"Annotation is not regular, but {ann} at position {genome_pos} for {in_path}." ) allel_type: str = vcf.at[i, "Type"].strip() if not bool(re.fullmatch(r"^Ins|Del|SNP$", allel_type)): warnings.warn( "ref_allele is not in [Ins,Del,SNP], " f"but {allel_type} at position {genome_pos} for {in_path}." ) ref_allele: str = vcf.at[i, "Ref"].strip() if not bool(re.fullmatch(r"^[ACGT]$", ref_allele)): warnings.warn( f"ref_allele is not in [A,C,G,T], but {ref_allele} " f"at position {genome_pos} for {in_path}." ) mut_allele: str = vcf.at[i, "Allel"] if not bool(re.fullmatch(r"^[ACGT]|GAP$", mut_allele)): warnings.warn( f"mut_allele is not in [A,C,G,T,GAP], but {mut_allele} " f"at position {genome_pos} for {in_path}." ) if allel_type == "Ins" and not bool(re.fullmatch(amino_ann, ann)): ins_pos_list: List[int] = [] mut_allele_list: List[str] = [] i_list: List[int] = [] while ( i < vcf.shape[0] and vcf.at[i, "Type"].strip() == "Ins" and int(vcf.at[i, "#Pos"].strip()) == genome_pos ): i_list.append(i) ins_pos_list.append(int(vcf.at[i, "Insindex"].strip())) mut_allele = vcf.at[i, "Allel"].strip() if not bool(re.fullmatch(r"^[ACGT]$", mut_allele)): warnings.warn( "mut_allele is not in [A,C,G,T], but %s at position %s for %s." % (mut_allele, vcf.at[i, "#Pos"].strip(), in_path) ) mut_allele_list.append(mut_allele) if bool(re.fullmatch(amino_ann, vcf.at[i, "Subst"].strip().split(" ")[0])): warnings.warn( "Subst is %s for Ins at position %s for %s." % (str(ann), vcf.at[i, "#Pos"].strip(), in_path) ) i += 1 combi = zip(ins_pos_list, mut_allele_list, i_list) zipped_sorted = sorted(combi, key=lambda x: x[0]) ins_pos_list, mut_allele_list, i_list = [list(x) for x in zip(*zipped_sorted)] vcf_g.at[i_list[0], "Subst"] = f"{genome_pos}_ins_{''.join(mut_allele_list)}" elif allel_type == "Del" and not bool(re.fullmatch(amino_ann, ann)): del_len = 0 init_i = i ppp: Optional[int] = None while i < vcf.shape[0] and vcf.at[i, "Type"].strip() == "Del": del_len += 1 if not vcf.at[i, "Allel"] == "GAP": warnings.warn( "Mutated allele isn't GAP for Del at position %s for %s." % (vcf.at[i, "#Pos"].strip(), in_path) ) if del_len > 1: if ppp is None: raise ValueError("prior genome position is undefined.") elif not int(vcf.at[i, "#Pos"].strip()) - 1 == ppp: del_len -= 1 break ppp = int(vcf.at[i, "#Pos"].strip()) i += 1 if del_len > 1: vcf_g.at[init_i, "Subst"] = f"{genome_pos}_{ppp}_del" else: vcf_g.at[init_i, "Subst"] = f"{genome_pos}_del" elif allel_type == "SNP" and not bool(re.fullmatch(amino_ann, ann)): vcf_g.at[i, "Subst"] = f"{genome_pos}{ref_allele}>{mut_allele}" i += 1 elif allel_type == "SNP" and bool(re.fullmatch(amino_ann, ann)): vcf_g.at[i, "Subst"] = ann i += 1 else: warnings.warn( f"Annotation of type {allel_type} at position {genome_pos} is {ann} for {in_path}." ) i += 1 if in_path.endswith(".tab"): out_path = in_path.rstrip(".tab") + "_mod.tsv" vcf_g.to_csv(out_path, sep="\t", index=False) else: warnings.warn(f"Filename issue for vcf file {in_path}.")