Source code for mavis.annotate.main

import os
import sys
import json


# local modules
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
from .variant import annotate_events, determine_prime
from ..constants import PROTOCOL, COLUMNS, PRIME
from ..error import DrawingFitError, NotSpecifiedError
from ..illustrate.constants import DiagramSettings
from ..illustrate.constants import DEFAULTS as ILLUSTRATION_DEFAULTS
from ..illustrate.diagram import draw_sv_summary_diagram
from ..pipeline.util import output_tabbed_file, log, build_batch_id, mkdirp, read_inputs


[docs]def main( inputs, output, reference_genome, annotations, template_metadata, min_domain_mapping_match, min_orf_size, max_orf_cap, **kwargs ): """ Args: inputs (:class:`List` of :class:`str`): list of input files to read output (str): path to the output directory reference_genome (object): see :func:`~mavis.annotate.file_io.load_reference_genome` annotations(object): see :func:`~mavis.annotate.file_io.load_reference_genes` template_metadata (object): see :func:`~mavis.annotate.file_io.load_templates` min_domain_mapping_match (float): min mapping match percent (0-1) to count a domain as mapped min_orf_size (int): minimum size of an :term:`open reading frame` to keep as a putative translation max_orf_cap (int): the maximum number of :term:`open reading frame` s to collect for any given event """ DRAWINGS_DIRECTORY = os.path.join(output, 'drawings') TABBED_OUTPUT_FILE = os.path.join(output, 'annotations.tab') FA_OUTPUT_FILE = os.path.join(output, 'annotations.fusion-cdna.fa') mkdirp(DRAWINGS_DIRECTORY) # test that the sequence makes sense for a random transcript bpps = read_inputs(inputs, in_={COLUMNS.protocol: PROTOCOL}) log('read {} breakpoint pairs'.format(len(bpps))) annotations = annotate_events( bpps, reference_genome=reference_genome, annotations=annotations, min_orf_size=min_orf_size, min_domain_mapping_match=min_domain_mapping_match, max_orf_cap=max_orf_cap, log=log ) id_prefix = build_batch_id(prefix='annotation-', suffix='-') rows = [] # hold the row information for the final tsv file fa_sequences = {} # now try generating the svg DS = DiagramSettings(**{k: v for k, v in kwargs.items() if k in ILLUSTRATION_DEFAULTS.__dict__}) for i, ann in enumerate(annotations): ann.data[COLUMNS.annotation_id] = id_prefix + str(i + 1) row = ann.flatten() row[COLUMNS.break1_strand] = ann.transcript1.get_strand() row[COLUMNS.break2_strand] = ann.transcript2.get_strand() row[COLUMNS.fusion_sequence_fasta_file] = FA_OUTPUT_FILE log('current annotation', ann.data[COLUMNS.annotation_id], ann.transcript1, ann.transcript2, ann.event_type) # try building the fusion product ann_rows = [] # add fusion information to the current row transcripts = [] if not ann.fusion else ann.fusion.transcripts for t in transcripts: fusion_fa_id = '{}_{}'.format(ann.data[COLUMNS.annotation_id], t.splicing_pattern.splice_type) if fusion_fa_id in fa_sequences: raise AssertionError('should not be duplicate fa sequence ids', fusion_fa_id) fa_sequences[fusion_fa_id] = ann.fusion.get_cdna_seq(t.splicing_pattern) # duplicate the row for each translation for tl in t.translations: nrow = dict() nrow.update(row) nrow[COLUMNS.fusion_splicing_pattern] = tl.transcript.splicing_pattern.splice_type nrow[COLUMNS.fusion_cdna_coding_start] = tl.start nrow[COLUMNS.fusion_cdna_coding_end] = tl.end nrow[COLUMNS.fusion_sequence_fasta_id] = fusion_fa_id domains = [] for dom in tl.domains: m, t = dom.score_region_mapping() temp = { "name": dom.name, "sequences": dom.get_seqs(), "regions": [ {"start": dr.start, "end": dr.end} for dr in sorted(dom.regions, key=lambda x: x.start) ], "mapping_quality": round(m * 100 / t, 0), "matches": m } domains.append(temp) nrow[COLUMNS.fusion_mapped_domains] = json.dumps(domains) ann_rows.append(nrow) drawing = None retry_count = 0 while drawing is None: # continue if drawing error and increase width try: canvas, legend = draw_sv_summary_diagram( DS, ann, reference_genome=reference_genome, templates=template_metadata) gene_aliases1 = 'NA' gene_aliases2 = 'NA' try: if len(ann.transcript1.gene.aliases) > 0: gene_aliases1 = '-'.join(ann.transcript1.gene.aliases) if ann.transcript1.is_best_transcript: gene_aliases1 = 'b-' + gene_aliases1 except AttributeError: pass try: if len(ann.transcript2.gene.aliases) > 0: gene_aliases2 = '-'.join(ann.transcript2.gene.aliases) if ann.transcript2.is_best_transcript: gene_aliases2 = 'b-' + gene_aliases2 except AttributeError: pass try: if determine_prime(ann.transcript1, ann.break1) == PRIME.THREE: gene_aliases1, gene_aliases2 = gene_aliases2, gene_aliases1 except NotSpecifiedError: pass name = 'mavis_{}_{}.{}_{}.{}'.format( ann.break1.chr, ann.break2.chr, gene_aliases1, gene_aliases2, ann.data[COLUMNS.annotation_id]) drawing = os.path.join(DRAWINGS_DIRECTORY, name + '.svg') l = os.path.join(DRAWINGS_DIRECTORY, name + '.legend.json') for r in ann_rows + [row]: r[COLUMNS.annotation_figure] = drawing r[COLUMNS.annotation_figure_legend] = l log('generating svg:', drawing, time_stamp=False) canvas.saveas(drawing) log('generating legend:', l, time_stamp=False) with open(l, 'w') as fh: json.dump(legend, fh) break except DrawingFitError as err: log('extending width:', DS.width, DS.width + 500, time_stamp=False) DS.width += 500 retry_count += 1 if retry_count > 10: raise err if len(ann_rows) == 0: rows.append(row) else: rows.extend(ann_rows) output_tabbed_file(rows, TABBED_OUTPUT_FILE) with open(FA_OUTPUT_FILE, 'w') as fh: log('writing:', FA_OUTPUT_FILE) for name, seq in sorted(fa_sequences.items()): fh.write('> {}\n'.format(name)) fh.write('{}\n'.format(seq))