Source code for mavis.annotate.main

import json
import os
import re
import time
import warnings

from .constants import DEFAULTS
from .genomic import UsTranscript
from .variant import annotate_events, choose_more_annotated, choose_transcripts_by_priority, determine_prime, flatten_fusion_transcript, flatten_fusion_translation
from ..constants import COLUMNS, PRIME, PROTOCOL, sort_columns
from ..error import DrawingFitError, NotSpecifiedError
from ..illustrate.constants import DEFAULTS as ILLUSTRATION_DEFAULTS
from ..illustrate.constants import DiagramSettings
from ..illustrate.diagram import draw_sv_summary_diagram
from ..util import generate_complete_stamp, log, mkdirp, read_inputs


ACCEPTED_FILTERS = {
    'choose_more_annotated': choose_more_annotated,
    'choose_transcripts_by_priority': choose_transcripts_by_priority
}


[docs]def draw(drawing_config, ann, reference_genome, template_metadata, drawings_directory): """ produces the svg diagram and json legend for a given annotation """ drawing = None legend = None retry_count = 0 draw_fusion_transcript = True draw_reference_transcripts = True initial_width = drawing_config.width while drawing is None: # continue if drawing error and increase width try: canvas, legend_json = draw_sv_summary_diagram( drawing_config, ann, reference_genome=reference_genome, templates=template_metadata, draw_fusion_transcript=draw_fusion_transcript, draw_reference_transcripts=draw_reference_transcripts ) gene_aliases1 = 'NA' gene_aliases2 = 'NA' try: if ann.transcript1.gene.aliases: gene_aliases1 = '-'.join(ann.transcript1.gene.aliases) if ann.transcript1.is_best_transcript: gene_aliases1 = 'b-' + gene_aliases1 except AttributeError: pass try: if ann.transcript2.gene.aliases: gene_aliases2 = '-'.join(ann.transcript2.gene.aliases) if ann.transcript2.is_best_transcript: gene_aliases2 = 'b-' + gene_aliases2 except AttributeError: pass try: if determine_prime(ann.transcript1, ann.break1) == PRIME.THREE: gene_aliases1, gene_aliases2 = gene_aliases2, gene_aliases1 except NotSpecifiedError: pass name = 'mavis_{}-chr{}_chr{}-{}_{}'.format( ann.annotation_id, ann.break1.chr, ann.break2.chr, gene_aliases1, gene_aliases2 ) drawing = os.path.join(drawings_directory, name + '.svg') legend = os.path.join(drawings_directory, name + '.legend.json') log('generating svg:', drawing, time_stamp=False) canvas.saveas(drawing) log('generating legend:', legend, time_stamp=False) with open(legend, 'w') as fh: json.dump(legend_json, fh) break except DrawingFitError as err: drawing_config.width += drawing_config.drawing_width_iter_increase log('extending width by', drawing_config.drawing_width_iter_increase, 'to', drawing_config.width, time_stamp=False) retry_count += 1 if retry_count > drawing_config.max_drawing_retries: if draw_fusion_transcript and draw_reference_transcripts: log('restricting to gene-level only', time_stamp=False) draw_fusion_transcript = False draw_reference_transcripts = False drawing_config.width = initial_width retry_count = 0 else: warnings.warn(str(err)) drawing = True drawing_config.width = initial_width # reset the width return drawing, legend
[docs]def main( inputs, output, library, protocol, reference_genome, annotations, template_metadata, min_domain_mapping_match=DEFAULTS.min_domain_mapping_match, min_orf_size=DEFAULTS.min_orf_size, max_orf_cap=DEFAULTS.max_orf_cap, annotation_filters=DEFAULTS.annotation_filters, start_time=int(time.time()), draw_fusions_only=DEFAULTS.draw_fusions_only, **kwargs ): """ Args: inputs (:class:`List` of :class:`str`): list of input files to read output (str): path to the output directory reference_genome (object): see :func:`~mavis.annotate.file_io.load_reference_genome` annotations(object): see :func:`~mavis.annotate.file_io.load_reference_genes` template_metadata (object): see :func:`~mavis.annotate.file_io.load_templates` min_domain_mapping_match (float): min mapping match percent (0-1) to count a domain as mapped min_orf_size (int): minimum size of an :term:`open reading frame` to keep as a putative translation max_orf_cap (int): the maximum number of :term:`open reading frame` s to collect for any given event """ drawings_directory = os.path.join(output, 'drawings') tabbed_output_file = os.path.join(output, 'annotations.tab') fa_output_file = os.path.join(output, 'annotations.fusion-cdna.fa') annotation_filters = [] if not annotation_filters else annotation_filters.split(',') annotation_filters = [ACCEPTED_FILTERS[a] for a in annotation_filters] mkdirp(drawings_directory) # test that the sequence makes sense for a random transcript bpps = read_inputs( inputs, in_={COLUMNS.protocol: PROTOCOL.values()}, add_default={ COLUMNS.protocol: protocol, COLUMNS.library: library, COLUMNS.stranded: False }, require=[COLUMNS.protocol, COLUMNS.library], expand_strand=False, expand_orient=True, expand_svtype=True ) log('read {} breakpoint pairs'.format(len(bpps))) annotations = annotate_events( bpps, reference_genome=reference_genome, annotations=annotations, min_orf_size=min_orf_size, min_domain_mapping_match=min_domain_mapping_match, max_orf_cap=max_orf_cap, log=log, filters=annotation_filters ) fa_sequence_names = set() # now try generating the svg drawing_config = DiagramSettings(**{k: v for k, v in kwargs.items() if k in ILLUSTRATION_DEFAULTS}) header_req = { COLUMNS.break1_strand, COLUMNS.break2_strand, COLUMNS.fusion_sequence_fasta_file, COLUMNS.fusion_splicing_pattern, COLUMNS.fusion_cdna_coding_start, COLUMNS.fusion_cdna_coding_end, COLUMNS.fusion_sequence_fasta_id, COLUMNS.fusion_mapped_domains, COLUMNS.exon_first_3prime, COLUMNS.exon_last_5prime, COLUMNS.annotation_id, COLUMNS.annotation_figure, COLUMNS.annotation_figure_legend, COLUMNS.cdna_synon, COLUMNS.protein_synon } header = None log('opening for write:', tabbed_output_file) tabbed_fh = open(tabbed_output_file, 'w') log('opening for write:', fa_output_file) fasta_fh = open(fa_output_file, 'w') try: total = len(annotations) for i, ann in enumerate(annotations): ann_row = ann.flatten() ann_row[COLUMNS.fusion_sequence_fasta_file] = fa_output_file if header is None: header_req.update(ann_row.keys()) header = sort_columns(header_req) tabbed_fh.write('\t'.join([str(c) for c in header]) + '\n') log( '({} of {}) current annotation'.format(i + 1, total), ann.annotation_id, ann.transcript1, ann.transcript2, ann.event_type) # get the reference sequences for either transcript ref_cdna_seq = {} ref_protein_seq = {} for ust in [x for x in [ann.transcript1, ann.transcript2] if isinstance(x, UsTranscript)]: name = ust.name for spl_tx in ust.spliced_transcripts: ref_cdna_seq.setdefault(spl_tx.get_seq(reference_genome), set()).add(name) for translation in spl_tx.translations: ref_protein_seq.setdefault(translation.get_aa_seq(reference_genome), set()).add(name) # try building the fusion product rows = [] # add fusion information to the current ann_row for spl_fusion_tx in [] if not ann.fusion else ann.fusion.transcripts: fusion_fa_id = '{}_{}'.format(ann.annotation_id, spl_fusion_tx.splicing_pattern.splice_type) fusion_fa_id = re.sub(r'\s', '-', fusion_fa_id) if fusion_fa_id in fa_sequence_names: raise AssertionError('should not be duplicate fa sequence ids', fusion_fa_id) seq = ann.fusion.get_cdna_seq(spl_fusion_tx.splicing_pattern) fasta_fh.write('> {}\n{}\n'.format(fusion_fa_id, seq)) cdna_synon = ';'.join(sorted(list(ref_cdna_seq.get(seq, set())))) temp_row = {} temp_row.update(ann_row) temp_row.update(flatten_fusion_transcript(spl_fusion_tx)) temp_row[COLUMNS.fusion_sequence_fasta_id] = fusion_fa_id temp_row[COLUMNS.cdna_synon] = cdna_synon if cdna_synon else None if spl_fusion_tx.translations: # duplicate the ann_row for each translation for fusion_translation in spl_fusion_tx.translations: nrow = dict() nrow.update(ann_row) nrow.update(temp_row) aa_seq = fusion_translation.get_aa_seq() protein_synon = ';'.join(sorted(list(ref_protein_seq.get(aa_seq, set())))) nrow[COLUMNS.protein_synon] = protein_synon if protein_synon else None # select the exon nrow.update(flatten_fusion_translation(fusion_translation)) rows.append(nrow) else: temp_row.update(ann_row) rows.append(temp_row) # draw the annotation and add the path to all applicable rows (one drawing for multiple annotations) if ann.fusion or not draw_fusions_only: drawing, legend = draw(drawing_config, ann, reference_genome, template_metadata, drawings_directory) for row in rows + [ann_row]: row[COLUMNS.annotation_figure] = drawing row[COLUMNS.annotation_figure_legend] = legend if not rows: rows = [ann_row] for row in rows: tabbed_fh.write('\t'.join([str(row.get(k, None)) for k in header]) + '\n') generate_complete_stamp(output, log, start_time=start_time) finally: log('closing:', tabbed_output_file) tabbed_fh.close() log('closing:', fa_output_file) fasta_fh.close()