"""
Created: July, 2014
@author: Carolyn Ch\'ng
.. todo::
think about whether logging should be part of module.
- Variant
SNV, Indel, CNV inherits Variant
"""
import collections
import logging
import re
import subprocess
import warnings
from biofx.variants.attributes import Zygosity
DEFAULT_SAMTOOLS = "/gsc/software/linux-x86_64-centos6/samtools-0.1.19/samtools"
DEFAULT_SNV_MPILEUP_PARAMS = ["--ff", "1540", "-BQ0", "-d", "1000000", "-A"]
DEFAULT_INDEL_MPILEUP_PARAMS = [
"--ff",
"1540",
"-BQ0",
"-d",
"1000000",
"-L",
"1000000"]
[docs]class Variant(object):
""" A base class for describing variants. Currently supports small mutations.
Attributes:
chromosome (string)
start (int)
end (int)
ref (string): reference base
alt (string): alternative base
ref_count (int): reference base count
alt_count (int): alternative base count
coverage (int): total coverage
state (string): insertion, deletion, duplication etc.
misc (list):
effects (string)
snpeff_effects
identifier (string): external IDs associated with variant. Usually column 3 in vcf
provenance (string): if applicable, tool used to call variant
cosmic (string): cosmic ID
dbsnp (string): dbSNP ID
bamfile (string): path to bamfile where this variant came from
source (string): source ID where this variant came from
is_special (bool): True if special characters (not ATCG) in ref/alt base
homopolymer (bool): True if homopolymer
clnsig_values (string): bar delimited integers
cgl_pathogenicity (string): PATHOGENIC, VUS, BENIGN
info (?): vcf info
"""
def __init__(self, chromosome, start, end=None):
self.chromosome = chromosome
self.start = int(start)
self.end = int(end) if end is not None else end
self.ref = "*"
self.alt = "*"
self.ref_count = None
self.alt_count = None
self.coverage = None
self.state = None
self.comments = []
self.annotations = None
self.misc = None
self.effects = None
self.snpeff_effects = None
self.identifier = 'NA'
self.provenance = None
self.cosmic = 'NA'
self.dbsnp = 'NA'
self.bamfile = None
self.source = None
self.is_special = False
self.homopolymer = False
self.clnsig_values = False
self.gmaf = None
self.allele_model = None
self.zygosity = None
self.genotype = None
self.clinvar_pathogenicity = None
self.clnsig_values = None
self.cgl_pathogenicity = None
self.info = None
self.base_frequencies = None
# snpeff attributes
self.effect_impact = None
self.functional_class = None
self.codon_change = None
self.gene_id = None
self.gene_symbol = None
self.gene_biotype = None
self.aa_length = None
self.transcript_id = None
self.exon = None
self.hgvs_aa_change = None
self.hgvs_cds_change = None
self.hgvs_effect_type = None
self.classic_aa_change = None
self.classic_effect_type = None
self.alternative_transcripts = None
self.alternative_genes = None
self.transcript_info = None
self.protein_id = None
[docs] def set_start(self, pos):
self.start = int(pos)
[docs] def set_end(self, pos):
self.end = int(pos)
[docs] def is_snp(self):
return False
[docs] def get_chromosome_index(self):
"""1-based"""
chr_ind = {'X': 23, 'Y': 24, 'MT': 25}
if self.chromosome in ['X', 'Y', 'MT']:
return chr_ind[self.chromosome]
else:
return int(self.chromosome)
[docs] def set_effects(self, effects):
"""
Args:
effects (str) : snpeff effect string
"""
self.effects = effects.strip()
[docs] def set_id(self, identifier):
self.identifier = identifier
[docs] def get_key(self):
return "_".join(
map(str, (self.chromosome, self.start, self.ref, self.alt)))
[docs] def get_coordinates(self):
return "{v.chromosome}:{v.start}{v.ref}>{v.alt}".format(v=self)
[docs] def get_pathogenicity(self):
if self.cgl_pathogenicity in ["PATHOGENIC", "PROBABLY_PATHOGENIC"] or \
self.clinvar_pathogenicity == "PATHOGENIC":
pathogenicity = "PATHOGENIC"
elif self.cgl_pathogenicity in ["BENIGN", "PROBABLY_BENIGN"] and self.clinvar_pathogenicity == "BENIGN":
pathogenicity = "BENIGN"
else:
pathogenicity = "VUS"
return pathogenicity
[docs] def set_provenance(self, provenance):
self.provenance = provenance
[docs] def set_cosmic(self, cosmic):
"""
Args:
cosmic (string): cosmic ID
Raises:
ValueError: not a cosmic ID
"""
if cosmic is None:
cosmic = "NA"
if not ((cosmic in ['', 'NA']) or (
all([cid.startswith("COSM") for cid in cosmic.split(",")]))):
raise ValueError("{} is not a valid cosmic ID".format(cosmic))
self.cosmic = cosmic
[docs] def set_dbsnp(self, dbsnp):
"""
Args:
dbsnp (string): dbSNP ID
Raises:
ValueError: not a dbsnp ID
"""
if dbsnp is None:
dbsnp = "NA"
if not ((dbsnp in ['', 'NA']) or (
all([did.startswith("rs") for did in dbsnp.split(",")]))):
raise ValueError("{} is not a valid dbsnp ID".format(dbsnp))
self.dbsnp = dbsnp
[docs] def set_zygosity(self, genotype):
"""
Assign zygosity to variant.
Args:
v (Variant): an instance of :class`biofx.variants.Variants`
genotype (string): genotype from vcf record
Raises:
AssertionError: assume no existing zygosity for variant
AssertionError: assume 1/1 or 0/1 for genotype
"""
self.genotype = genotype
if self.zygosity is not None:
raise AssertionError(
("Variant {0.chromosome}:{0.start}{0.ref}>{0.alt} has an existing zygosity {0.zygosity}. "
"Assumed no existing zygosity, will not set new zygosity.").format(self))
if genotype == "1/1":
self.zygosity = Zygosity.HOMOZYGOUS
elif genotype == "0/1":
self.zygosity = Zygosity.HETEROZYGOUS
else:
raise AssertionError(
("Assumed genotype to be 1/1 or 0/1. "
"Got unexpected value {}. ").format(genotype))
[docs] def set_eff(self, eff_map, hgvs=True, classic=False):
"""
Args:
eff_map (dict): a dictionary with snpeff annotation key values (See output of parse_effect)
hgvs (bool): is hgvs eff_map (parse_effect with hgvs=True)
classic (bool): is classic eff_map
Note: Set both hgvs and classic to True if it is a merged eff map from merge_eff_maps
"""
if eff_map is None:
self.effect_impact = "."
self.functional_class = "."
self.codon_change = "."
self.aa_length = "."
self.gene_symbol = "."
self.gene_biotype = "."
self.transcript_id = "."
self.exon = "."
if hgvs:
self.hgvs_effect_type = "."
self.hgvs_aa_change = "."
self.hgvs_cds_change = "."
if classic:
self.classic_aa_change = "."
self.classic_effect_type = "."
else:
self.effect_impact = eff_map["effect_impact"]
self.functional_class = eff_map["functional_class"]
self.codon_change = eff_map["codon_change"]
self.aa_length = eff_map["amino_acid_length"]
self.gene_symbol = eff_map["gene_symbol"]
self.gene_biotype = eff_map["gene_biotype"]
self.transcript_id = eff_map["transcript"]
self.exon = eff_map["exon"]
if hgvs:
self.hgvs_effect_type = eff_map["hgvs_effect_type"]
self.hgvs_aa_change = eff_map["hgvs_protein_sequence_change"]
self.hgvs_cds_change = eff_map["cds_change"]
if classic:
self.classic_aa_change = eff_map["classic_protein_sequence_change"]
self.classic_effect_type = eff_map["classic_effect_type"]
[docs] def add_misc(self, **kwargs):
"""
add miscellaneous info
Args:
**kwargs: key word args for additional info.
"""
if self.misc is None:
self.misc = kwargs
else:
for k, v in list(kwargs.items()):
if k in self.misc:
raise KeyError("{} previously added!")
self.misc[k] = v
[docs] def normalize_splice_site(self):
"""
Checks hgvs effect type attributes and
change value to a normalized splice site string
if effect is a splice site.
"""
ss = "splice site"
if self.hgvs_effect_type is None:
raise TypeError("hgvs_effect_type is not set!")
elif "splice" in self.hgvs_effect_type.lower():
self.hgvs_cds_change = ss
self.hgvs_aa_change = ss
[docs] def set_alternative(self, eff_maps, model, to_string=True):
"""
Set alternative (not best/canonical) effect descriptions.
Args:
eff_maps (list): a list of dictionaries
model (string): transcript or gene
to_string:
Raises:
ValueError: invalid model
"""
if to_string:
# make eff like string.
# bar-delimited for fields, comma-delimited for each annotation
data = ",".join(["|".join([x["transcript"],
x["gene_symbol"],
x["hgvs_protein_sequence_change"],
x["classic_protein_sequence_change"],
x["effect_impact"],
x["functional_class"],
x["hgvs_effect_type"]]) for x in eff_maps])
else:
data = eff_maps
if model == "transcript":
self.alternative_transcripts = data
elif model == "gene":
self.alternative_genes = data
else:
raise ValueError("Invalid model {} provided. "
"Select one of transcript|gene.".format(model))
[docs] def set_alternative_transcripts(self, eff_maps, to_string=True): # pragma: no cover
"""
.. todo:: merge set alternative transcripts and set alternative genes
Args:
eff_maps (list): a list of dictionaries
to_string:
Returns:
"""
warnings.warn("use set_alternative", PendingDeprecationWarning)
return self.set_alternative(eff_maps, "transcript", to_string)
[docs] def set_alternative_genes(self, eff_maps, to_string=True): # pragma: no cover
"""
.. todo:: merge set alternative transcripts and set alternative genes
Args:
eff_maps:
to_string:
Returns:
"""
warnings.warn("use set_alternative", PendingDeprecationWarning)
return self.set_alternative(eff_maps, "gene", to_string)
[docs] def get_variant_mpileup(
self,
bamfile,
samtools=DEFAULT_SAMTOOLS,
reference="/projects/alignment_references/9606/hg19a/genome/bwa_32/hg19a.fa",
mpileup_params=[
"--ff",
"1540",
"-BQ0"]):
"""
Get mpileup record for variant.
.. todo::
add functionality for region?
move executables to a configs module/file or something like that
Args:
bamfile (string): path to bamfile
samtools (string): samtools exectutable. Default: /gsc/software/linux-x86_64-centos6/samtools-0.1.19
reference (string): path to reference fasta file. Default: /projects/alignment_references/9606/hg19a/genome/bwa_32/GRCh37-lite.fa
mpileup_params (list): list of mpileup parameters
"""
self.bamfile = bamfile
samtools_mpileup = ([samtools,
"mpileup",
"-f",
reference] + mpileup_params + ["-r",
"{}:{}-{}".format(self.chromosome,
self.start,
self.end if self.end else self.start),
self.bamfile])
logging.info(
"Executing command:\n{}".format(
" ".join(samtools_mpileup)))
record = subprocess.check_output(samtools_mpileup)
if not isinstance(record, str):
record = record.decode("utf-8")
logging.debug(record)
if record == "":
logging.warning(
"No coverage at {}:{}-{} in {}".format(
self.chromosome,
self.start,
self.end,
self.bamfile))
return record
[docs] def get_variant_info(self, data, samtools, fasta_file):
"""
Get read support and total coverage for variant.
Args:
self (Variant): an instance of :class:`biofx.variants.Variants.Variant`
data (dict): dictionary with library ID as keys. Required nested keys:
- sample_prefix
- tumour_content
- bam
samtools (basestring): path to samtools executable
fasta_file (basestring): path to fasta file
\
Returns:
variant_info (dict): a dictionary with library IDs as keys and attribute keywords
Raises:
AssertionError: each bam file in bams is assumed to come from a different library
"""
variant_info = {}
ref = self.ref
alt = self.alt
if self.is_snp():
mpileup_params = DEFAULT_SNV_MPILEUP_PARAMS
else: # assume indel
self.vt_normalize() # see variant class for more info
mpileup_params = DEFAULT_INDEL_MPILEUP_PARAMS
for l in data:
sample_prefix = data[l]["sample_prefix"]
if sample_prefix in variant_info:
raise AssertionError(
"One or more bam files is from "
"the same sample {}!".format(sample_prefix))
variant_info[sample_prefix] = {}
mpileup_record = self.get_variant_mpileup(
data[l]["bam"],
samtools=samtools,
mpileup_params=mpileup_params,
reference=fasta_file)
variant_info[sample_prefix]["record"] = mpileup_record
if mpileup_record == "":
logging.warning(
"No mpileup record for {} in {}".format(
self.get_key(), sample_prefix))
if "tumour_content" in data[l]:
variant_info[sample_prefix]["tumour_content"] = data[l]["tumour_content"]
allele_frequencies = self.get_allele_frequencies(mpileup_record)
# use original alt as output
allele_frequencies[alt] = allele_frequencies.pop(self.alt)
variant_info[sample_prefix]["allele_frequencies"] = allele_frequencies
# initialize
Support = collections.namedtuple('Support', ['supported', 'total'])
allele_support = Support(
supported=allele_frequencies[alt],
total=self.coverage)
# use pre-normalized alt as output key
variant_info[sample_prefix]["alt_allele_support"] = {
alt: allele_support}
# set normalized ref and alt back to original ref alt for indels
self.ref = ref
self.alt = alt
return variant_info
[docs] @staticmethod
def has_pathogenicity_conflict(p1, p2):
"""
Check for pathogenicity conflict between two sources.
Args:
p1 (string): clinvar pathogenicity
p2 (string): cgl pathogenicity
Returns:
boolean: True if has conflict
"""
return (p1 == "BENIGN" and p2 == "PATHOGENIC") or (
p1 == "PATHOGENIC" and p2 == "BENIGN")
[docs] def set_allele_model(self, gmaf, maf_cutoff):
"""
Get allele model string from gmaf.
TODO: enumerate this?
Args:
gmaf (float): allele model gmaf
maf_cutoff (float): Cut off for rare/common
Returns:
allele_model (string): allele model string
"""
allele_model = "COMMON"
if gmaf < 0:
allele_model = "UNKNOWN"
elif gmaf <= maf_cutoff:
allele_model = "RARE"
self.allele_model = allele_model
[docs]class Indel(Variant):
"""
Generic Indel class with basic chromosome, start, end attributes, as well as
optional ref/alt alleles, annotation attributes. Inherits methods from Variant.
"""
[docs] def get_length(self):
"""
Get indel length.
.. todo::
think about definitions of start and end
Raises:
ValueError: if reference and/or alternate allele not set
"""
if self.end is None:
if self.alt != "*" and self.ref != "*":
length = abs(len(self.ref) - len(self.alt))
self.end = int(self.start) + length
return length
else:
raise ValueError(
"At least one of ref or alt not set for variant. "
"Cannot compute length with variant end or ref alt alleles")
return int(self.end) - int(self.start)
[docs] def set_state(self, state):
"""
Args:
state (string): indel type, usually from genome validator
Raises:
ValueError: unrecognized state
"""
valid_states = ["ins", "del", "ITD", "dup"]
if state not in valid_states:
raise ValueError(
"{} is not a recognized state (case sensitive).\n"
"Choose one of {} or request a review.".format(
state, ",".join(valid_states)))
self.state = state
[docs] def set_alt(self, alt):
"""
Set alternative allele.
White spaces are stripped off.
Args:
alt (string): alt string
"""
self.end = None
alt = alt.strip()
if any(base not in ['A', 'T', 'C', 'G'] for base in alt):
logging.warning("Special characters seen in {}.".format(alt))
self.is_special = True
self.alt = alt
[docs] def set_ref(self, ref):
self.end = None
ref = ref.strip()
if any(base not in ['A', 'T', 'C', 'G'] for base in ref):
logging.warning("Special characters seen in {}.".format(ref))
self.is_special = True
self.ref = ref
[docs] def vt_normalize(self):
""" Implementation of algorithm in http://genome.sph.umich.edu/wiki/Variant_Normalization.
Modified for samtools/bcftools output - left aligned but not parsimonous.
Raises:
AssertionError: normalization cannot be done if ref alt bases are not set
AssertionError: algorithm failed. shouldn't get here. never observed.
Reference:
http://genome.sph.umich.edu/wiki/Variant_Normalization
See also:
http://bioinformatics.oxfordjournals.org/content/31/13/2202.full.
"""
if self.alt == "*" or self.ref == "*":
raise AssertionError(
"Bases not set. Ref: {0.ref} Alt: {0.alt}".format(self))
normalizing = True
while normalizing:
if len(self.alt) == 1 or len(self.ref) == 1:
normalizing = False
elif self.alt[-1] == self.ref[-1]:
self.alt = self.alt[:-1]
self.ref = self.ref[:-1]
else:
normalizing = False
# assume samtools VCF reports one base before event
if len(self.alt) == 0 or len(self.ref) == 0: # pragma no cover
raise AssertionError(
("Normalization algorithm failed. "
"This can only be used for left aligned, non parsimonous records. "
"Assumed variants are reported one base before event (left)."))
logging.debug("ref: " + self.ref + "alt: " + self.alt)
if (self.ref[0] == self.alt[0]) and (
len(self.ref) >= 2 and len(self.alt) >= 2):
warnings.warn("Removing left most nucleotide")
self.ref = self.ref[1:]
self.alt = self.alt[1:]
if len(set(self.alt[1:] + self.ref[1:])) == 1:
self.homopolymer = True
[docs] def get_allele_frequencies(self, mpileup_record):
"""
Args:
mpileup_record (string): output from :func:`Variants.get_variant_mpileup` or single mpileup record
Returns:
dict: dictionary containing:
ref_count (int): reference allele count. 0 if empty record
alt_count (int): alternate allele count. 0 if empty record
Raises:
ValueError: more than one record provided
ValueError: if reference and/or alternate allele not set
`pileup format specifications <http://samtools.sourceforge.net/pileup.shtml>`_
Note:
Reference counts for indels are currently obtained by subtracting
alternative allele counts from total coverage. Usage of total coverage
(self.coverage) is preferred over reference counts.
"""
if mpileup_record == "":
self.alt_count = 0
self.ref_count = 0
self.coverage = 0
else:
if len(mpileup_record.splitlines()) > 1:
raise ValueError(
("More than one record provided:\n{}\n"
"This function parses one record at a time").format(mpileup_record))
if self.ref == "*" or self.alt == "*":
raise ValueError(
("At least one of ref or alt not set for variant."
"Allele frequencies can only be obtained for indels with specified ref and alts"))
sign = '-' if len(self.ref) > len(self.alt) else '+'
if sign == '-':
base_change = self.ref[len(self.alt):]
else:
base_change = self.alt[len(self.ref):]
entry = mpileup_record
if not isinstance(entry, str):
entry = entry.decode('UTF-8')
entry = entry.strip().split('\t')
chrom = entry[0]
pos = entry[1]
ref = entry[2]
cov = entry[3]
seq = entry[4]
qual = entry[5]
pattern = "\{}{}{}".format(sign, len(base_change), base_change)
matches = re.findall(pattern, seq, flags=re.I)
logging.debug(matches)
if self.alt == ".":
self.alt_count = "NA"
self.ref_count = int(cov)
else:
self.alt_count = len(matches)
self.ref_count = int(cov) - self.alt_count
self.coverage = int(cov)
self.base_frequencies = {
self.ref: self.ref_count,
self.alt: self.alt_count}
return self.base_frequencies
[docs]class SNV(Variant):
"""
Generic SNV class with basic chromosome, start, end attributes, as well as
optional ref/alt alleles, annotation attributes. Inherits from Variant
"""
def __init__(self, chromosome, start, end=None):
if end is not None and end != start:
raise ValueError(
"start and end should be on the same position for {}".format(
self.__class__.__name__))
else:
end = start
Variant.__init__(self, chromosome, start, end)
self.base_frequencies = {}
[docs] def set_alt(self, alt):
"""
Args:
alt (string): alt base
Raises:
ValueError: alt length must be 1 and one of ATCG.
"""
if "," in alt:
if not all(len(x) == 1 for x in alt.split(",")):
raise ValueError(
"alt length must be 1 for {}. {} given.".format(
self.__class__.__name__, alt))
elif not all(x in ['A', 'T', 'C', 'G', "."] for x in alt.split(",")):
raise ValueError(
"alt base must be one of ATCG for {}. {} given.".format(
self.__class__.__name__, alt))
else:
if len(alt) != 1:
raise ValueError(
"alt length must be 1 for {}. {} given.".format(
self.__class__.__name__, alt))
elif alt not in ['A', 'T', 'C', 'G', "."]:
raise ValueError(
"alt base must be one of ATCG for {}. {} given.".format(
self.__class__.__name__, alt))
self.alt = alt
[docs] def set_ref(self, ref):
"""
Args:
ref (string): reference base
Raises:
ValueError: reference length must be 1 and one of ATCG.
"""
if len(ref) != 1:
raise ValueError(
"reference length must be 1 for {}. {} given.".format(
self.__class__.__name__, ref))
elif ref not in ['A', 'T', 'C', 'G']:
raise ValueError("reference base must be one of ATCG for {}. "
"{} given.".format(self.__class__.__name__, ref))
self.ref = ref
[docs] def is_snp(self):
"""
True if is an SNV instance.
Returns:
bool: True on an SNV instance.
"""
return True
[docs] def get_allele_frequencies(self, mpileup_record):
"""
Get allele frequencies for each base at a given position of this SNV.
Currently supports samtools 0.1.18
Args:
mpileup_record (string): output from :func:`Variants.get_variant_mpileup` or single mpileup record
Returns:
base_frequencies (dict): frequency of each base
"""
logging.debug(mpileup_record)
if mpileup_record == "":
self.base_frequencies = {
'A': 0, 'C': 0, 'G': 0, 'T': 0, 'N': 0, ".": "NA"}
self.coverage = 0
return self.base_frequencies
# chunk below from /home/cmayoh/code/PileupParser_CountSnpBases.py,
# with modifications.
numA = 0
numC = 0
numG = 0
numT = 0
numN = 0
entry = mpileup_record.strip().split('\t')
chrom = entry[0]
pos = entry[1]
ref = entry[2]
cov = entry[3]
seq = entry[4]
qual = entry[5]
info = chrom + ':' + pos + ':' + ref
logging.debug(seq)
bases = SNV._parseBases(seq)
logging.debug(bases)
# check to ensure length of parsed bases is correct
if((not(len(bases) == int(cov))) or (not(len(bases) == len(qual)))):
logging.error(
"Error in parsing sequence: {}\n".format(mpileup_record))
return self.base_frequencies
for b in bases:
if(b == '.' or b == ','):
if(ref == 'A'):
numA += 1
if(ref == 'T'):
numT += 1
if(ref == 'C'):
numC += 1
if(ref == 'G'):
numG += 1
if(ref == 'N'):
numN += 1
elif(b == 'A' or b == 'a'):
numA += 1
elif(b == 'C' or b == 'c'):
numC += 1
elif(b == 'G' or b == 'g'):
numG += 1
elif(b == 'T' or b == 't'):
numT += 1
elif(b == 'N' or b == 'n'):
numN += 1
self.coverage = int(cov)
self.base_frequencies = {
'A': numA,
'C': numC,
'G': numG,
'T': numT,
'N': numN,
'.': "NA"}
return self.base_frequencies
[docs] def get_ref_count(self, **kwargs):
"""
Get reference base count.
Args:
**kwargs: used kwargs: mpileup_params
Returns:
int: ref base count
"""
if self.base_frequencies == {}:
if self.bamfile is None:
raise IOError(
"Please get allele frequencies or provide a bam file")
else:
if "mpileup_params" in kwargs:
record = self.get_variant_mpileup(
self.bamfile, mpileup_params=kwargs["mpileup_params"])
else:
record = self.get_variant_mpileup(self.bamfile)
self.get_allele_frequencies(record)
return self.base_frequencies[self.ref]
[docs] def get_alt_count(self, **kwargs):
"""
Args:
**kwargs: used kwargs: mpileup_params
Returns:
int: alt base count
"""
if self.base_frequencies == {}:
if self.bamfile is None:
raise IOError(
"Please get allele frequencies or provide a bam file")
else:
if "mpileup_params" in kwargs:
record = self.get_variant_mpileup(
self.bamfile, mpileup_params=kwargs["mpileup_params"])
else:
record = self.get_variant_mpileup(self.bamfile)
self.get_allele_frequencies(record)
if "," in self.alt:
# return single allele with high frequencies
max_count = max(
self.alt.split(","),
key=lambda x: self.base_frequencies[x])
self.alt = max_count
return self.base_frequencies[self.alt]
return self.base_frequencies[self.alt]
@classmethod
def _parseBases(cls, bases): # pragma: no cover
"""Copied verbatim from /home/cmayoh/code/PileupParser_CountSnpBases.py
Updated with version from https://svn01.bcgsc.ca/svn/omics_custom_oneoffs/trunk/bin/PileupParser_countSnpBases_getBaseQs.py
"""
returnSeq = []
bs = list(bases)
baseList = list('actgnACTGN.,')
count = 0 # used to remove character following ^ and insertions
for i in range(len(bs)):
if(count > 0): # enters if preceding character is ^ or if an insertion occurred
count -= 1
continue
if(bs[i] == '$'):
continue
elif(bs[i] == '>'):
# for handling large gapped alignments in RNAseq
returnSeq.append(bs[i])
continue
elif(bs[i] == '<'):
# for handling large gapped alignments in RNAseq
returnSeq.append(bs[i])
continue
elif(bs[i] == '*'):
returnSeq.append(bs[i])
continue
elif(bs[i] == '+'): # insertion
# determines the length of insertion
n_skip = int(re.search(r'([0-9]+)', ''.join(bs[i:])).group())
count = n_skip + len(str(n_skip))
continue
elif(bs[i] == '-'): # deletion
# determines the length of deletion
n_skip = int(re.search(r'([0-9]+)', ''.join(bs[i:])).group())
count = n_skip + len(str(n_skip))
continue
elif(bs[i] == '^'):
count = 1
continue
elif bs[i] in baseList:
returnSeq.append(bs[i])
else:
returnSeq.append(bs[i])
return returnSeq
[docs]class CNV(Variant):
"""
Generic CNV class with basic chromosome, start, end attributes, as well as
annotation attributes. Inherits from Variant
"""
NEUTRAL = "copy neutral"
GAIN = "copy gain"
LOSS = "copy loss"
def __init__(self, chromosome, start, end):
Variant.__init__(self, chromosome, start, end)
self.copy_number = None
self.overlap = None
self.frequency = None
[docs] def get_type(self, ploidy=2):
""" Infer copy number type
Args:
ploidy (int): ploidy or ploidy model for assessment. Default is 2 (human).
Raise:
ValueError:
"""
if self.copy_number is None:
raise ValueError("Copy number not set")
elif self.copy_number == ploidy:
return CNV.NEUTRAL
elif self.copy_number > ploidy:
return CNV.GAIN
elif self.copy_number < ploidy:
return CNV.LOSS