"""
module which holds all functions relating to loading reference files
"""
import TSV
import re
from .genomic import Gene, Transcript, usTranscript, Exon, Template
from .base import BioInterval, ReferenceName
from .protein import Domain, Translation
from ..interval import Interval
from ..constants import STRAND, GIESMA_STAIN, CODON_SIZE, translate, START_AA, STOP_AA
from ..util import devnull
from Bio import SeqIO
import json
import warnings
[docs]def load_masking_regions(filepath):
"""
reads a file of regions. The expect input format for the file is tab-delimited and
the header should contain the following columns
- chr: the chromosome
- start: start of the region, 1-based inclusive
- end: end of the region, 1-based inclusive
- name: the name/label of the region
For example:
.. code-block:: text
#chr start end name
chr20 25600000 27500000 centromere
Args:
filepath (str): path to the input tab-delimited file
Returns:
:class:`dict` of :class:`list` of :class:`BioInterval` by :class:`str`: a dictionary keyed by chromosome name with values of lists of regions on the chromosome
Example:
>>> m = load_masking_regions('filename')
>>> m['1']
[BioInterval(), BioInterval(), ...]
"""
header, rows = TSV.read_file(
filepath,
require=['chr', 'start', 'end', 'name'],
cast={'start': int, 'end': int, 'chr': ReferenceName}
)
regions = {}
for row in rows:
r = BioInterval(reference_object=row['chr'], start=row['start'], end=row['end'], name=row['name'])
regions.setdefault(r.reference_object, []).append(r)
return regions
[docs]def load_reference_genes(*pos, **kwargs):
warnings.warn('this function has been replaced by load_annotations', DeprecationWarning)
return load_annotations(*pos, **kwargs)
[docs]def load_annotations(filepath, warn=devnull, REFERENCE_GENOME=None, filetype=None, best_transcripts_only=False):
"""
loads gene models from an input file. Expects a tabbed or json file.
Args:
filepath (str): path to the input file
verbose (bool): output extra information to stdout
REFERENCE_GENOME (:class:`dict` of :class:`Bio.SeqRecord` by :class:`str`): dict of reference sequence by
template/chr name
filetype (str): json or tab/tsv. only required if the file type can't be interpolated from the path extenstion
Returns:
:class:`dict` of :class:`list` of :class:`~mavis.annotate.genomic.Gene` by :class:`str`: lists of genes keyed by chromosome name
"""
if filetype is None:
m = re.match('.*\.(?P<ext>tsv|tab|json)$', filepath)
if m:
filetype = m.group('ext')
data = None
if filetype == 'json':
with open(filepath) as fh:
data = json.load(fh)
elif filetype == 'tab' or filetype == 'tsv':
data = convert_tab_to_json(filepath, warn)
else:
raise NotImplementedError('unsupported filetype:', filetype, filepath)
return parse_annotations_json(
data, REFERENCE_GENOME=REFERENCE_GENOME, best_transcripts_only=best_transcripts_only, warn=warn)
[docs]def parse_annotations_json(data, REFERENCE_GENOME=None, best_transcripts_only=False, warn=devnull):
"""
parses a json of annotation information into annotation objects
"""
genes_by_chr = {}
for gene in data['genes']:
if gene['strand'] in ['1', '+']:
gene['strand'] = STRAND.POS
elif gene['strand'] in ['-1', '-']:
gene['strand'] = STRAND.NEG
else:
raise AssertionError('input has unexpected form. strand must be 1 or -1 but found', gene['strand'])
g = Gene(
chr=gene['chr'],
start=gene['start'],
end=gene['end'],
name=gene['name'],
aliases=gene['aliases'],
strand=gene['strand']
)
has_best = False
for transcript in gene['transcripts']:
transcript['is_best_transcript'] = TSV.tsv_boolean(transcript['is_best_transcript'])
transcript.setdefault('exons', [])
exons = [Exon(strand=gene['strand'], **ex) for ex in transcript['exons']]
if len(exons) == 0:
exons = [(transcript['start'], transcript['end'])]
ust = usTranscript(
name=transcript['name'],
gene=g,
exons=exons,
is_best_transcript=transcript['is_best_transcript']
)
if ust.is_best_transcript:
has_best = True
if best_transcripts_only and not ust.is_best_transcript:
continue
g.transcripts.append(ust)
if transcript['cdna_coding_end'] is None or transcript['cdna_coding_start'] is None:
continue
for spl_patt in ust.generate_splicing_patterns():
# make splice transcripts and translations
t = Transcript(ust, spl_patt)
ust.spliced_transcripts.append(t)
tx_length = transcript['cdna_coding_end'] - transcript['cdna_coding_start'] + 1
# check that the translation makes sense before including it
if tx_length % CODON_SIZE != 0:
warn('Ignoring translation. The translated region is not a multiple of three')
continue
tx_length = tx_length // CODON_SIZE
domains = []
for dom in transcript['domains']:
try:
regions = [Interval(r['start'], r['end']) for r in dom['regions']]
regions = Interval.min_nonoverlapping(*regions)
for region in regions:
if region.start < 1 or region.end > tx_length:
raise AssertionError('region cannot be outside the translated length')
domains.append(
Domain(name=dom['name'], data={'desc': dom.get('desc', None)}, regions=regions)
)
except AssertionError as e:
warn(repr(e))
tx = Translation(
transcript['cdna_coding_start'], transcript['cdna_coding_end'], transcript=t, domains=domains
)
if REFERENCE_GENOME and g.chr in REFERENCE_GENOME:
# get the sequence near here to see why these are wrong?
s = ust.get_cdna_seq(t.splicing_pattern, REFERENCE_GENOME)
m = s[tx.start - 1:tx.start + 2]
stop = s[tx.end - CODON_SIZE: tx.end]
if translate(m) != START_AA or translate(stop) != STOP_AA:
warn(
'Sequence error. The sequence computed from the reference does look like '
'a valid translation'
)
continue
t.translations.append(tx)
if not best_transcripts_only or has_best:
genes_by_chr.setdefault(g.chr, []).append(g)
return genes_by_chr
def _load_reference_genes_json(filepath, REFERENCE_GENOME=None, best_transcripts_only=False, warn=devnull):
with open(filepath) as fh:
data = json.load(fh)
return parse_annotations_json(
data, REFERENCE_GENOME=REFERENCE_GENOME, best_transcripts_only=best_transcripts_only, warn=warn)
[docs]def convert_tab_to_json(filepath, warn=devnull):
"""
given a file in the std input format (see below) reads and return a list of genes (and sub-objects)
+-----------------------+---------------------------+-----------------------------------------------------------+
| column name | example | description |
+=======================+===========================+===========================================================+
| ensembl_transcript_id | ENST000001 | |
+-----------------------+---------------------------+-----------------------------------------------------------+
| ensembl_gene_id | ENSG000001 | |
+-----------------------+---------------------------+-----------------------------------------------------------+
| strand | -1 | positive or negative 1 |
+-----------------------+---------------------------+-----------------------------------------------------------+
| cdna_coding_start | 44 | where translation begins relative to the start of the cdna|
+-----------------------+---------------------------+-----------------------------------------------------------+
| cdna_coding_end | 150 | where translation terminates |
+-----------------------+---------------------------+-----------------------------------------------------------+
| genomic_exon_ranges | 100-201;334-412;779-830 | semi-colon demitited exon start/ends |
+-----------------------+---------------------------+-----------------------------------------------------------+
| AA_domain_ranges | DBD:220-251,260-271 | semi-colon delimited list of domains |
+-----------------------+---------------------------+-----------------------------------------------------------+
| hugo_names | KRAS | hugo gene name |
+-----------------------+---------------------------+-----------------------------------------------------------+
Args:
filepath (str): path to the input tab-delimited file
Returns:
:class:`dict` of :class:`list` of :any:`Gene` by :class:`str`: a dictionary keyed by chromosome name with
values of list of genes on the chromosome
Example:
>>> ref = load_reference_genes('filename')
>>> ref['1']
[Gene(), Gene(), ....]
Warning:
does not load translations unless then start with 'M', end with '*' and have a length of multiple 3
"""
def parse_exon_list(row):
if not row:
return []
exons = []
for temp in re.split('[; ]', row):
try:
s, t = temp.split('-')
exons.append({'start': int(s), 'end': int(t)})
except Exception as err:
warn('exon error:', repr(temp), repr(err))
return exons
def parse_domain_list(row):
if not row:
return []
domains = []
for d in row.split(';'):
try:
name, temp = d.rsplit(':')
temp = temp.split(',')
temp = [x.split('-') for x in temp]
regions = [{'start': int(x), 'end': int(y)} for x, y in temp]
domains.append({'name': name, 'regions': regions})
except Exception as err:
warn('error in domain:', d, row, repr(err))
return domains
def nullable_int(row):
try:
row = int(row)
except ValueError:
row = TSV.null(row)
return row
header, rows = TSV.read_file(
filepath,
require=[
'ensembl_gene_id',
'chr',
'ensembl_transcript_id'
],
add={
'cdna_coding_start': 'null',
'cdna_coding_end': 'null',
'AA_domain_ranges': '',
'genomic_exon_ranges': '',
'hugo_names': '',
'transcript_genomic_start': 'null',
'transcript_genomic_end': 'null',
'best_ensembl_transcript_id': 'null'
},
cast={
'genomic_exon_ranges': parse_exon_list,
'AA_domain_ranges': parse_domain_list,
'cdna_coding_end': nullable_int,
'cdna_coding_start': nullable_int,
'transcript_genomic_end': nullable_int,
'transcript_genomic_start': nullable_int,
'gene_start': int,
'gene_end': int
}
)
genes = {}
for row in rows:
g = {
'chr': row['chr'],
'start': row['gene_start'],
'end': row['gene_end'],
'name': row['ensembl_gene_id'],
'strand': row['strand'],
'aliases': row['hugo_names'].split(';') if row['hugo_names'] else [],
'transcripts': []
}
if g['name'] not in genes:
genes[g['name']] = g
else:
g = genes[g['name']]
t = {
'is_best_transcript': row['best_ensembl_transcript_id'] == row['ensembl_transcript_id'],
'name': row['ensembl_transcript_id'],
'exons': row['genomic_exon_ranges'],
'domains': row['AA_domain_ranges'],
'start': row['transcript_genomic_start'],
'end': row['transcript_genomic_end'],
'cdna_coding_start': row['cdna_coding_start'],
'cdna_coding_end': row['cdna_coding_end'],
'aliases': []
}
g['transcripts'].append(t)
return {'genes': genes.values()}
[docs]def load_reference_genome(filename, low_mem=False):
"""
Args:
filename (str): the path to the file containing the input fasta genome
Returns:
:class:`dict` of :class:`Bio.SeqRecord` by :class:`str`: a dictionary representing the sequences in the
fasta file
"""
HUMAN_REFERENCE_GENOME = None
if not low_mem:
with open(filename, 'rU') as fh:
HUMAN_REFERENCE_GENOME = SeqIO.to_dict(SeqIO.parse(fh, 'fasta'))
else:
HUMAN_REFERENCE_GENOME = SeqIO.index(filename, "fasta")
names = list(HUMAN_REFERENCE_GENOME.keys())
# to fix hg38 issues
for template_name in names:
if template_name.startswith('chr'):
truncated = re.sub('^chr', '', template_name)
if truncated in HUMAN_REFERENCE_GENOME:
raise KeyError(
'template names {} and {} are considered equal but both have been defined in the reference'
'loaded'.format(template_name, truncated))
HUMAN_REFERENCE_GENOME.setdefault(truncated, HUMAN_REFERENCE_GENOME[template_name])
else:
prefixed = 'chr' + template_name
if prefixed in HUMAN_REFERENCE_GENOME:
raise KeyError(
'template names {} and {} are considered equal but both have been defined in the reference'
'loaded'.format(template_name, prefixed))
HUMAN_REFERENCE_GENOME.setdefault(prefixed, HUMAN_REFERENCE_GENOME[template_name])
HUMAN_REFERENCE_GENOME[template_name] = HUMAN_REFERENCE_GENOME[template_name].upper()
return HUMAN_REFERENCE_GENOME
[docs]def load_templates(filename):
"""
primarily useful if template drawings are required and is not necessary otherwise
assumes the input file is 0-indexed with [start,end) style. Columns are expected in
the following order, tab-delimited. A header should not be given
1. name
2. start
3. end
4. band_name
5. giesma_stain
for example
.. code-block:: text
chr1 0 2300000 p36.33 gneg
chr1 2300000 5400000 p36.32 gpos25
Args:
filename (str): the path to the file with the cytoband template information
Returns:
:class:`list` of :class:`Template`: list of the templates loaded
"""
header = ['name', 'start', 'end', 'band_name', 'giesma_stain']
header, rows = TSV.read_file(
filename,
header=header,
cast={'start': int, 'end': int},
in_={'giesma_stain': GIESMA_STAIN}
)
bands_by_template = {}
for row in rows:
b = BioInterval(None, row['start'] + 1, row['end'], name=row['band_name'], data=row)
bands_by_template.setdefault(row['name'], []).append(b)
templates = dict()
for tname, bands in bands_by_template.items():
s = min([b.start for b in bands])
t = max([b.end for b in bands])
t = Template(tname, s, t, bands=bands)
templates[t.name] = t
return templates