Source code for mavis.pipeline.util

from datetime import datetime
import errno
import math
import os
import random
import re
import subprocess
from mavis.breakpoint import read_bpp_from_input_file
from mavis.constants import PROTOCOL, COLUMNS, sort_columns
from mavis.interval import Interval
from vocab import Vocab


PIPELINE_STEP = Vocab(
    ANNOTATE='annotate',
    VALIDATE='validate',
    PIPELINE='pipeline',
    CLUSTER='cluster',
    PAIR='pairing',
    SUMMARY='summary'
)


[docs]def build_batch_id(prefix='', suffix='', size=6): date = datetime.now() m = int(math.pow(10, size) - 1) return '{prefix}batch{date.year}{date.month:02d}{date.day:02d}r{r:06d}{suffix}'.format( prefix=prefix, suffix=suffix, date=date, r=random.randint(1, m))
[docs]def get_samtools_version(): proc = subprocess.getoutput(['samtools']) for line in proc.split('\n'): m = re.search('Version: (?P<major>\d+)\.(?P<mid>\d+)\.(?P<minor>\d+)', line) if m: return int(m.group('major')), int(m.group('mid')), int(m.group('minor')) raise ValueError('unable to parse samtools version number')
[docs]def get_blat_version(): proc = subprocess.getoutput(['blat']) for line in proc.split('\n'): m = re.search('blat - Standalone BLAT v. (\d+(x\d+)?)', line) if m: return m.group(1) raise ValueError("unable to parse blat version number from:'{}'".format(proc))
[docs]def log(*pos, time_stamp=True): if time_stamp: print('[{}]'.format(datetime.now()), *pos) else: print(' ' * 28, *pos)
[docs]def mkdirp(dirname): log("creating output directory: '{}'".format(dirname)) try: os.makedirs(dirname) except OSError as exc: # Python >2.5: http://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python if exc.errno == errno.EEXIST and os.path.isdir(dirname): pass else: raise exc return dirname
[docs]def filter_on_overlap(bpps, regions_by_reference_name): log('filtering', len(bpps), 'on overlaps with regions') failed = [] passed = [] for bpp in bpps: overlaps = False for r in regions_by_reference_name.get(bpp.break1.chr, []): if Interval.overlaps(r, bpp.break1): overlaps = True bpp.data['failure_comment'] = 'overlapped masked region: ' + str(r) break for r in regions_by_reference_name.get(bpp.break2.chr, []): if overlaps: break if Interval.overlaps(r, bpp.break2): overlaps = True bpp.data[COLUMNS.filter_comment] = 'overlapped masked region: ' + str(r) if overlaps: failed.append(bpp) else: passed.append(bpp) log('filtered', len(bpps), 'to', len(passed)) return passed, failed
[docs]def read_inputs(inputs, force_stranded=False, **kwargs): bpps = [] kwargs.setdefault('require', []) kwargs['require'] = list(set(kwargs['require'] + [COLUMNS.library, COLUMNS.protocol])) kwargs.setdefault('in_', {}) kwargs['in_'][COLUMNS.protocol] = PROTOCOL for finput in inputs: log('loading:', finput) bpps.extend(read_bpp_from_input_file( finput, force_stranded=force_stranded, **kwargs )) log('loaded', len(bpps), 'breakpoint pairs') return bpps
[docs]def output_tabbed_file(bpps, filename): header = set() rows = [] for row in bpps: try: row = row.flatten() except AttributeError: pass rows.append(row) header.update(row) header = sort_columns(header) with open(filename, 'w') as fh: log('writing:', filename) fh.write('#' + '\t'.join(header) + '\n') for row in rows: fh.write('\t'.join([str(row.get(c, None)) for c in header]) + '\n')
[docs]def write_bed_file(filename, bed_rows): log('writing:', filename) with open(filename, 'w') as fh: for bed in bed_rows: fh.write('\t'.join([str(c) for c in bed]) + '\n')