Source code for geno2phenotb.preprocess

"""
Wrapper for all preprocessing steps.

These functions include the assembly and variant calling using MTBseq as well as the
genotype and lineage collection.
"""

import logging
import os
import shutil
import warnings
from subprocess import check_call
from tempfile import TemporaryDirectory
from typing import Optional, Set, Tuple

import numpy as np
import pandas as pd

import geno2phenotb.utils as utils
from geno2phenotb.annotate_vcf import annotate_vcf
from geno2phenotb.vcf_columns_extractor import vcf_columns_extractor
from geno2phenotb.vcf_columns_extractor_geno import vcf_columns_extractor_geno

__author__ = "Bernhard Reuter, Jules Kreuer"
__copyright__ = "Bernhard Reuter, Jules Kreuer"
__license__ = "LGPL-3.0-only"

_logger = logging.getLogger(__name__)


[docs]def run_mtbseq(fastq_dir: str, sample_id: str) -> None: """ Execute MTBseq for a single isolate. Parameters ---------- fastq_dir : str Path to the directory were the FASTQ file(s) belonging to a single isolate are located. sample_id : str SampleID, i.e. run accession (ERR/SRR). Returns ------- None """ _logger.info("Running MTBSeq.") exec_mtbseq_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), "exec_mtbseq.sh") _logger.debug(f"MTBSeq Exec Path: {exec_mtbseq_path}") _logger.debug(f"MTBSeq Args:\n\t\tFastQ Dir:\t{fastq_dir}\n\t\tSample " f"ID:\t{sample_id}") check_call([f"{exec_mtbseq_path} {fastq_dir} {sample_id} 1"], shell=True) _logger.debug("MTBSeq Done.")
[docs]def determine_genotype( catalog_variants: pd.DataFrame, resistance_variants: Set[str] ) -> Tuple[pd.Series, pd.Series]: r""" Determine the genotype of an isolate. Use resistance variants from the FZB catalog plus all InDels and early stop condons (e.g. :spelling:ignore:`A123_`) in the following genes (not mentioned in the catalog, i.e., additionally to the known resistance variants from the Masterlist) to assign a resistant genotype: ethA=Rv3854c (ETH), pncA=Rv2043c (PZA), gidB/gid=Rv3919c (STR), rpoB=Rv0667 (RIF), Rv0678 (BDQ/CFZ), ald=Rv2780 (DCS), katG=Rv1908c (INH), ddn=Rv3547 (DLM), tlyA=Rv1694 (CAP). Parameters ---------- catalog_variants : pd.DataFrame DataFrame of resistance-related variants from the FZB catalog. There is one column per drug. The column contains the variants from the FZB catalog that are related with resistance against the drug. resistance_variants : Set[str] Resistance-related variants that were found for an isolate. Returns ------- genotypes : pd.Series[float] Series of per-drug genotypes. geno_variants : pd.Series[str] Series of resistance-related variants per drug. """ _logger.info("Determining genotype.") key_genes = utils.get_key_genes() drugs = catalog_variants.columns variants_series: pd.Series = pd.Series(list(resistance_variants), dtype=str) genotypes: pd.Series = pd.Series( np.zeros(len(drugs)), index=[f"{x}_geno" for x in drugs], dtype=float, ) geno_variants: pd.Series = pd.Series(index=[f"{x}_geno" for x in drugs], dtype=str) if not geno_variants.isna().all(): raise ValueError("found_resistance_variants initialization error.") for drug in drugs: drug_geno = f"{drug}_geno" set_ = resistance_variants & set(catalog_variants.loc[:, drug]).difference({""}) if set_: genotypes.at[drug_geno] = 1.0 geno_variants.at[drug_geno] = ",".join(list(set_)) if drug in key_genes.keys(): mask = variants_series.str.match("|".join(key_genes[drug]), na=False) if np.any(mask): genotypes.at[drug_geno] = 1.0 list_ = variants_series.loc[mask].to_list() if isinstance(geno_variants.at[drug_geno], str): list_.append(geno_variants.at[drug_geno]) geno_variants.at[drug_geno] = ",".join(list_) else: geno_variants.at[drug_geno] = ",".join(list_) return genotypes, geno_variants
[docs]def collect_lineages(classification_file: str) -> pd.Series: """ Collect lineage classification. Parameters ---------- classification_file : str Full path to the Strain_Classification.tab file. Returns ------- lineage_classification : pd.Series Series of floats from {0.0, 1.0} denoting, if the isolate is classified to belong to a lineage, i.e. `classification["lineage X"]=1.0`, or not, i.e. `classification["lineage X"]=0.0`. """ _logger.info("Collecting lineages.") _logger.debug(f"Classification file: {classification_file}") # Get list of lineages lineages = utils.get_lineages() # Reads table ans strips values temp = pd.read_table( classification_file, dtype="object", keep_default_na=False, na_values=["", "'-"] ).applymap(lambda x: utils.stripper(x)) if not temp.shape[0] == 1: raise ValueError("classification table has more than 1 row.") classification: pd.Series = temp.squeeze() del temp lineage_classification: pd.Series = pd.Series( np.zeros(len(lineages)), index=lineages, dtype=float ) if not classification.empty: lineage: float = classification.at["Coll lineage (easy)"] if f"Lineage {lineage}" in lineage_classification.index: lineage_classification.at[f"Lineage {lineage}"] = 1.0 else: warnings.warn(f"Lineage {lineage} extracted from {classification_file} is unknown.") else: raise ValueError(f"{classification_file} is empty.") return lineage_classification
[docs]def preprocess( fastq_dir: str, output_dir: str, sample_id: str, skip_mtbseq: bool = False, ) -> Optional[pd.Series]: """ Runs all preprocessing steps. Parameters ---------- fastq_dir : str Path to directory containing the FASTQ files. output_dir : str Path to output directory. Two files are written to this directory. A file named '<sample_id>_resistant_genotype_variants.tsv' with resistance-related variants per drug and a file named '<sample_id>_extracted_features.tsv' with per-drug genotypes. sample_id : str Sample ID. skip_mtbseq : bool, default=False Do not run MTBSeq but use preprocessed data. Returns ------- features : pd.Series, default=None The features (incl. called variants, lineage classification, and genotypes) extracted from the supplied FASTQ file(s). """ # Align/Analyze FASTQ (this results in variant call files (VCFs)): # Use temp directory with TemporaryDirectory(prefix="geno2phenotb_pre_") as mtbseq_output_dir: fastq_files = [] if skip_mtbseq: # Use folder with preprocessed MTBSeq results instead of computing them. mtbseq_output_dir = fastq_dir else: # Use temporary directory and run MTBSeq # Check for correct naming and existence of fastq files. fastq_files = utils.check_fastq_filenames(fastq_dir, sample_id) # Copy fastq files to temp dir. _logger.debug(f"Copying fastq files to tempdir: {mtbseq_output_dir}") for file_name in fastq_files: _logger.debug(f"Copying: {file_name}") file_path = os.path.join(fastq_dir, file_name) shutil.copy(file_path, mtbseq_output_dir) # Execute MTBSeq. run_mtbseq(mtbseq_output_dir, sample_id) # Resistance Variants resi_vars_file = os.path.join( os.path.dirname(os.path.realpath(__file__)), "static", "FZB_catalogue_2020-05-10_BRg_new_abbrevs_resistance_variants_only.tsv", ) if not os.path.isfile(resi_vars_file): raise FileNotFoundError(f"{resi_vars_file} isn't a regular file or doesn't exist.") low_freq_vcf_dir = os.path.join(mtbseq_output_dir, "Called_low_freq") _logger.debug(f"Low-Frequency VCF Dir: {low_freq_vcf_dir}") # Low-Frequency VCF low_freq_vcf_path = os.path.join( low_freq_vcf_dir, f"{sample_id}_X.gatk_position_variants_cf2_cr2_fr15_ph4_outmode001.tab", ) if not os.path.isfile(low_freq_vcf_path): raise FileNotFoundError(f"{low_freq_vcf_path} isn't a regular file or doesn't exist.") # Annotate low-frequency VCF: annotate_vcf(low_freq_vcf_path) # Collect all variants from the annotated low-frequency VCF: annotated_vcf_path = os.path.join( low_freq_vcf_dir, f"{sample_id}_X.gatk_position_variants_cf2_cr2_fr15_ph4_outmode001_mod.tsv", ) if not os.path.isfile(annotated_vcf_path): raise FileNotFoundError(f"{annotated_vcf_path} isn't a regular file or doesn't exist.") variants_lf, _ = vcf_columns_extractor(annotated_vcf_path) features: Optional[pd.Series] if variants_lf: features = pd.Series(np.ones(len(variants_lf)), index=sorted(variants_lf), dtype=float) else: features = None warnings.warn(f"No variants were extracted from {annotated_vcf_path}") # Collect lineage classification info: classific_path = os.path.join( mtbseq_output_dir, "Classification", "Strain_Classification.tab" ) if not os.path.isfile(classific_path): raise ValueError(f"{classific_path} isn't a regular file or doesn't exist.") lineage_classification = collect_lineages(classific_path) if features is not None: features = features.append( lineage_classification, ignore_index=False, verify_integrity=True ) else: features = lineage_classification # Annotate super-low-frequency VCF: super_freq_vcf_dir = os.path.join(mtbseq_output_dir, "Called_super_low_freq") super_freq_vcf_path = os.path.join( super_freq_vcf_dir, f"{sample_id}_X.gatk_position_variants_cf2_cr2_fr1_ph1_outmode001.tab", ) if not os.path.isfile(super_freq_vcf_path): raise FileNotFoundError(f"{super_freq_vcf_path} isn't a regular file or doesn't exist.") annotate_vcf(super_freq_vcf_path) # Load the resistance associated variants from the catalog: catalog_variants = pd.read_table(resi_vars_file, sep="\t", dtype=str).fillna("") catalog_variants_set = set() for col in catalog_variants.columns: catalog_variants_set.update(catalog_variants.loc[:, col].to_list()) catalog_variants_set = catalog_variants_set.difference({""}) # Collect only resistance-related variants from the annotated super-low-frequency VCF: super_annotated_vcf_path = os.path.join( super_freq_vcf_dir, f"{sample_id}_X.gatk_position_variants_cf2_cr2_fr1_ph1_outmode001_mod.tsv", ) if not os.path.isfile(super_annotated_vcf_path): raise FileNotFoundError( f"{super_annotated_vcf_path} isn't a regular file or doesn't exist." ) resistance_variants_slf, _ = vcf_columns_extractor_geno( super_annotated_vcf_path, catalog_variants_set ) if resistance_variants_slf is None: resistance_variants_slf = set() # Create output folder os.makedirs(output_dir, exist_ok=True) # Determine the genotype from the super-low-frequency variants: genotypes, geno_variants = determine_genotype(catalog_variants, resistance_variants_slf) super_res_geno_var = os.path.join( output_dir, f"{sample_id}_resistant_genotype_variants.tsv" ) geno_variants.to_csv(super_res_geno_var, sep="\t", header=False) if features is not None: features = features.append(genotypes, ignore_index=False, verify_integrity=True) fn = os.path.join(output_dir, f"{sample_id}_extracted_features.tsv") features.to_csv(fn, sep="\t", header=False) else: raise ValueError(f"No features for {sample_id}") return features