from datetime import datetime
import errno
import math
import os
import random
import re
import subprocess
from mavis.breakpoint import read_bpp_from_input_file
from mavis.constants import PROTOCOL, COLUMNS, sort_columns
from mavis.interval import Interval
from vocab import Vocab
PIPELINE_STEP = Vocab(
ANNOTATE='annotate',
VALIDATE='validate',
PIPELINE='pipeline',
CLUSTER='cluster',
PAIR='pairing',
SUMMARY='summary'
)
[docs]def build_batch_id(prefix='', suffix='', size=6):
date = datetime.now()
m = int(math.pow(10, size) - 1)
return '{prefix}batch{date.year}{date.month:02d}{date.day:02d}r{r:06d}{suffix}'.format(
prefix=prefix, suffix=suffix, date=date, r=random.randint(1, m))
[docs]def get_blat_version():
proc = subprocess.getoutput(['blat'])
for line in proc.split('\n'):
m = re.search('blat - Standalone BLAT v. (\d+(x\d+)?)', line)
if m:
return m.group(1)
raise ValueError("unable to parse blat version number from:'{}'".format(proc))
[docs]def log(*pos, time_stamp=True):
if time_stamp:
print('[{}]'.format(datetime.now()), *pos)
else:
print(' ' * 28, *pos)
[docs]def mkdirp(dirname):
log("creating output directory: '{}'".format(dirname))
try:
os.makedirs(dirname)
except OSError as exc: # Python >2.5: http://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
if exc.errno == errno.EEXIST and os.path.isdir(dirname):
pass
else:
raise exc
return dirname
[docs]def filter_on_overlap(bpps, regions_by_reference_name):
log('filtering', len(bpps), 'on overlaps with regions')
failed = []
passed = []
for bpp in bpps:
overlaps = False
for r in regions_by_reference_name.get(bpp.break1.chr, []):
if Interval.overlaps(r, bpp.break1):
overlaps = True
bpp.data['failure_comment'] = 'overlapped masked region: ' + str(r)
break
for r in regions_by_reference_name.get(bpp.break2.chr, []):
if overlaps:
break
if Interval.overlaps(r, bpp.break2):
overlaps = True
bpp.data[COLUMNS.filter_comment] = 'overlapped masked region: ' + str(r)
if overlaps:
failed.append(bpp)
else:
passed.append(bpp)
log('filtered', len(bpps), 'to', len(passed))
return passed, failed
[docs]def output_tabbed_file(bpps, filename):
header = set()
rows = []
for row in bpps:
try:
row = row.flatten()
except AttributeError:
pass
rows.append(row)
header.update(row)
header = sort_columns(header)
with open(filename, 'w') as fh:
log('writing:', filename)
fh.write('#' + '\t'.join(header) + '\n')
for row in rows:
fh.write('\t'.join([str(row.get(c, None)) for c in header]) + '\n')
[docs]def write_bed_file(filename, bed_rows):
log('writing:', filename)
with open(filename, 'w') as fh:
for bed in bed_rows:
fh.write('\t'.join([str(c) for c in bed]) + '\n')