Source code for mavis.util
from argparse import Namespace
from datetime import datetime
import errno
from functools import partial
from glob import glob
import itertools
import os
import re
import time
from braceexpand import braceexpand
from tab import tab
from .breakpoint import Breakpoint, BreakpointPair
from .constants import COLUMNS, ORIENT, PROTOCOL, sort_columns, STRAND, SVTYPE, MavisNamespace
from .error import InvalidRearrangement
from .interval import Interval
ENV_VAR_PREFIX = 'MAVIS_'
[docs]def cast(value, cast_func):
"""
cast a value to a given type
Example:
>>> cast('1', int)
1
"""
if cast_func == bool:
value = tab.cast_boolean(value)
else:
value = cast_func(value)
return value
[docs]def soft_cast(value, cast_type):
"""
cast a value to a given type, if the cast fails, cast to null
Example:
>>> cast(None, int)
None
>>> cast('', int)
None
"""
try:
return cast(value, cast_type)
except (TypeError, ValueError):
pass
return tab.cast_null(value)
[docs]def get_env_variable(arg, default, cast_type=None):
"""
Args:
arg (str): the argument/variable name
Returns:
the setting from the environment variable if given, otherwise the default value
"""
if cast_type is None:
cast_type = type(default)
name = ENV_VAR_PREFIX + str(arg).upper()
result = os.environ.get(name, None)
if result is not None:
return cast(result, cast_type)
return default
[docs]class WeakMavisNamespace(MavisNamespace):
def __getattribute__(self, attr):
return get_env_variable(attr, object.__getattribute__(self, attr))
[docs]class ChrListString(list):
def __init__(self, string):
if not isinstance(string, str):
for item in string:
self.append(item)
else:
delim = '\s+' if ';' not in string else ';'
items = [i for i in re.split(delim, string) if i]
for item in items:
self.append(item)
def __contains__(self, item):
if list.__len__(self) == 0:
return True
else:
return list.__contains__(self, item)
[docs]def bash_expands(expression):
"""
expand a file glob expression, allowing bash-style brackets.
Returns:
list: a list of files
Example:
>>> bash_expands('./{test,doc}/*py')
[...]
"""
result = []
for name in braceexpand(expression):
for fname in glob(name):
result.append(fname)
return result
[docs]def log_arguments(args):
"""
output the arguments to the console
Args:
args (Namespace): the namespace to print arguments for
"""
log('arguments')
for arg, val in sorted(args.items()):
if isinstance(val, list):
if not val:
log(arg, '= []', time_stamp=False)
continue
log(arg, '= [', time_stamp=False)
for v in val:
log('\t', repr(v), time_stamp=False)
log(']', time_stamp=False)
elif any([isinstance(val, typ) for typ in [str, int, float, bool, tuple]]) or val is None:
log(arg, '=', repr(val), time_stamp=False)
else:
log(arg, '=', object.__repr__(val), time_stamp=False)
[docs]def log(*pos, time_stamp=True):
if time_stamp:
print('[{}]'.format(datetime.now()), *pos)
else:
print(' ' * 28, *pos)
[docs]def mkdirp(dirname):
"""
Make a directory or path of directories. Suppresses the error that is normally raised when the directory already exists
"""
log("creating output directory: '{}'".format(dirname))
try:
os.makedirs(dirname)
except OSError as exc: # Python >2.5: http://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
if exc.errno == errno.EEXIST and os.path.isdir(dirname):
pass
else:
raise exc
return dirname
[docs]def filter_on_overlap(bpps, regions_by_reference_name):
"""
filter a set of breakpoint pairs based on overlap with a set of genomic regions
Args:
bpps (:class:`list` of :class:`~mavis.breakpoint.BreakpointPair`): list of breakpoint pairs to be filtered
regions_by_reference_name (:class:`dict` of :class:`list` of :class:`~mavis.annotate.base.BioInterval` by :class:`str`): regions to filter against
"""
log('filtering from', len(bpps), 'using overlaps with regions filter')
failed = []
passed = []
for bpp in bpps:
overlaps = False
for r in regions_by_reference_name.get(bpp.break1.chr, []):
if Interval.overlaps(r, bpp.break1):
overlaps = True
bpp.data[COLUMNS.filter_comment] = 'overlapped masked region: ' + str(r)
break
for r in regions_by_reference_name.get(bpp.break2.chr, []):
if overlaps:
break
if Interval.overlaps(r, bpp.break2):
overlaps = True
bpp.data[COLUMNS.filter_comment] = 'overlapped masked region: ' + str(r)
if overlaps:
failed.append(bpp)
else:
passed.append(bpp)
log('filtered from', len(bpps), 'down to', len(passed), '(removed {})'.format(len(failed)))
return passed, failed
[docs]def read_inputs(inputs, **kwargs):
bpps = []
kwargs.setdefault('require', [])
kwargs['require'] = list(set(kwargs['require'] + [COLUMNS.protocol]))
kwargs.setdefault('in_', {})
kwargs['in_'][COLUMNS.protocol] = PROTOCOL.values()
for expr in inputs:
for finput in bash_expands(expr):
try:
log('loading:', finput)
bpps.extend(read_bpp_from_input_file(
finput,
**kwargs
))
except tab.EmptyFileError:
log('ignoring empty file:', finput)
log('loaded', len(bpps), 'breakpoint pairs')
return bpps
[docs]def output_tabbed_file(bpps, filename, header=None):
if header is None:
custom_header = False
header = set()
else:
custom_header = True
rows = []
for row in bpps:
if not isinstance(row, dict):
row = row.flatten()
rows.append(row)
if not custom_header:
header.update(row.keys())
header = sort_columns(header)
with open(filename, 'w') as fh:
log('writing:', filename)
fh.write('#' + '\t'.join(header) + '\n')
for row in rows:
fh.write('\t'.join([str(row.get(c, None)) for c in header]) + '\n')
[docs]def write_bed_file(filename, bed_rows):
log('writing:', filename)
with open(filename, 'w') as fh:
for bed in bed_rows:
fh.write('\t'.join([str(c) for c in bed]) + '\n')
[docs]def generate_complete_stamp(output_dir, log=devnull, prefix='MAVIS.', start_time=None):
"""
writes a complete stamp, optionally including the run time if start_time is given
Args:
output_dir (str): path to the output dir the stamp should be written in
log (function): function to print logging messages to
prefix (str): prefix for the stamp name
start_time (int): the start time
Return:
str: path to the complete stamp
Example:
>>> generate_complete_stamp('some_output_dir')
'some_output_dir/MAVIS.COMPLETE'
"""
stamp = os.path.join(output_dir, str(prefix) + 'COMPLETE')
log('complete:', stamp)
with open(stamp, 'w') as fh:
if start_time is not None:
duration = int(time.time()) - start_time
hours = duration - duration % 3600
minutes = duration - hours - (duration - hours) % 60
seconds = duration - hours - minutes
fh.write('run time (hh/mm/ss): {}:{:02d}:{:02d}\n'.format(hours // 3600, minutes // 60, seconds))
fh.write('run time (s): {}\n'.format(duration))
return stamp
[docs]def filter_uninformative(annotations_by_chr, breakpoint_pairs, max_proximity=5000):
result = []
filtered = []
for bpp in breakpoint_pairs:
# loop over the annotations
overlaps_gene = False
window1 = Interval(bpp.break1.start - max_proximity, bpp.break1.end + max_proximity)
window2 = Interval(bpp.break2.start - max_proximity, bpp.break2.end + max_proximity)
for gene in annotations_by_chr.get(bpp.break1.chr, []):
if Interval.overlaps(gene, window1):
overlaps_gene = True
break
for gene in annotations_by_chr.get(bpp.break2.chr, []):
if Interval.overlaps(gene, window2):
overlaps_gene = True
break
if overlaps_gene:
result.append(bpp)
else:
filtered.append(bpp)
return result, filtered
[docs]def unique_exists(pattern, allow_none=False, get_newest=False):
result = bash_expands(pattern)
if len(result) == 1:
return result[0]
elif result:
if get_newest:
return max(result, key=lambda x: os.stat(x).st_mtime)
raise OSError('duplicate results:', result)
elif allow_none:
return None
else:
raise OSError('no result found', pattern)
[docs]def read_bpp_from_input_file(filename, expand_orient=False, expand_strand=False, expand_svtype=False, **kwargs):
"""
reads a file using the tab module. Each row is converted to a breakpoint pair and
other column data is stored in the data attribute
Args:
filename (str): path to the input file
expand_ns (bool): expand not specified orient/strand settings to all specific version
(for strand this is only applied if the bam itself is stranded)
explicit_strand (bool): used to stop unstranded breakpoint pairs from losing input strand information
Returns:
:class:`list` of :any:`BreakpointPair`: a list of pairs
Example:
>>> read_bpp_from_input_file('filename')
[BreakpointPair(), BreakpointPair(), ...]
One can also validate other expected columns that will go in the data attribute using the usual arguments
to the tab.read_file function
.. code-block:: python
>>> read_bpp_from_input_file('filename', cast={'index': int})
[BreakpointPair(), BreakpointPair(), ...]
"""
def soft_null_cast(value):
try:
tab.cast_null(value)
except TypeError:
return value
kwargs['require'] = set() if 'require' not in kwargs else set(kwargs['require'])
kwargs['require'].update({COLUMNS.break1_chromosome, COLUMNS.break2_chromosome})
kwargs.setdefault('cast', {}).update(
{
COLUMNS.break1_position_start: int,
COLUMNS.break1_position_end: int,
COLUMNS.break2_position_start: int,
COLUMNS.break2_position_end: int,
COLUMNS.opposing_strands: partial(soft_cast, cast_type=bool),
COLUMNS.stranded: tab.cast_boolean,
COLUMNS.untemplated_seq: soft_null_cast,
COLUMNS.break1_chromosome: lambda x: re.sub('^chr', '', x),
COLUMNS.break2_chromosome: lambda x: re.sub('^chr', '', x)
})
kwargs.setdefault('add_default', {}).update({
COLUMNS.untemplated_seq: None,
COLUMNS.break1_orientation: ORIENT.NS,
COLUMNS.break1_strand: STRAND.NS,
COLUMNS.break2_orientation: ORIENT.NS,
COLUMNS.break2_strand: STRAND.NS,
COLUMNS.opposing_strands: None
})
kwargs.setdefault('in_', {}).update(
{
COLUMNS.break1_orientation: ORIENT.values(),
COLUMNS.break1_strand: STRAND.values(),
COLUMNS.break2_orientation: ORIENT.values(),
COLUMNS.break2_strand: STRAND.values()
})
_, rows = tab.read_file(
filename, suppress_index=True,
**kwargs
)
restricted = [
COLUMNS.break1_chromosome,
COLUMNS.break1_position_start,
COLUMNS.break1_position_end,
COLUMNS.break1_strand,
COLUMNS.break1_orientation,
COLUMNS.break2_chromosome,
COLUMNS.break2_position_start,
COLUMNS.break2_position_end,
COLUMNS.break2_strand,
COLUMNS.break2_orientation,
COLUMNS.stranded,
COLUMNS.opposing_strands,
COLUMNS.untemplated_seq
]
pairs = []
for line_index, row in enumerate(rows):
row['line_no'] = line_index + 1
if '_index' in row:
del row['_index']
for attr, val in row.items():
row[attr] = soft_null_cast(val)
for attr in row:
if attr in [COLUMNS.cluster_id, COLUMNS.annotation_id, COLUMNS.validation_id]:
if not re.match('^([A-Za-z0-9-]+|)(;[A-Za-z0-9-]+)*$', row[attr]):
raise AssertionError(
'error in column', attr, 'All mavis pipeline step ids must satisfy the regex:',
'^([A-Za-z0-9-]+|)(;[A-Za-z0-9-]+)*$', row[attr])
stranded = row[COLUMNS.stranded]
strand1 = row[COLUMNS.break1_strand] if stranded else STRAND.NS
strand2 = row[COLUMNS.break2_strand] if stranded else STRAND.NS
temp = []
expand_strand = stranded and expand_strand
event_type = [None]
if row.get(COLUMNS.event_type, None) not in [None, 'None']:
try:
event_type = row[COLUMNS.event_type].split(';')
for putative_event_type in event_type:
SVTYPE.enforce(putative_event_type)
except KeyError:
pass
for orient1, orient2, strand1, strand2, putative_event_type in itertools.product(
ORIENT.expand(row[COLUMNS.break1_orientation]) if expand_orient else [row[COLUMNS.break1_orientation]],
ORIENT.expand(row[COLUMNS.break2_orientation]) if expand_orient else [row[COLUMNS.break2_orientation]],
STRAND.expand(strand1) if expand_strand and stranded else [strand1],
STRAND.expand(strand2) if expand_strand and stranded else [strand2],
event_type
):
try:
break1 = Breakpoint(
row[COLUMNS.break1_chromosome],
row[COLUMNS.break1_position_start],
row[COLUMNS.break1_position_end],
strand=strand1,
orient=orient1
)
break2 = Breakpoint(
row[COLUMNS.break2_chromosome],
row[COLUMNS.break2_position_start],
row[COLUMNS.break2_position_end],
strand=strand2,
orient=orient2
)
data = {k: v for k, v in row.items() if k not in restricted}
bpp = BreakpointPair(
break1,
break2,
opposing_strands=row[COLUMNS.opposing_strands],
untemplated_seq=row[COLUMNS.untemplated_seq],
stranded=row[COLUMNS.stranded],
)
bpp.data.update(data)
if putative_event_type is not None:
bpp.data[COLUMNS.event_type] = putative_event_type
if putative_event_type not in BreakpointPair.classify(bpp):
raise InvalidRearrangement(
'error: expected one of', BreakpointPair.classify(bpp),
'but found', putative_event_type, str(bpp), row)
if expand_svtype and putative_event_type is None:
for svtype in BreakpointPair.classify(bpp, discriminate=True):
new_bpp = bpp.copy()
new_bpp.data[COLUMNS.event_type] = svtype
temp.append(new_bpp)
else:
temp.append(bpp)
except InvalidRearrangement as err:
if not any([expand_strand, expand_svtype, expand_orient]):
raise err
except AssertionError as err:
if not expand_strand:
raise err
if not temp:
raise InvalidRearrangement('could not produce a valid rearrangement', row)
else:
pairs.extend(temp)
return pairs