Source code for mavis.config

from configparser import ConfigParser, ExtendedInterpolation
import os
import TSV
import re
from . import __version__
from .constants import PROTOCOL, DISEASE_STATUS
from .util import devnull, MavisNamespace, bash_expands
from .validate.constants import DEFAULTS as VALIDATION_DEFAULTS
from .pairing.constants import DEFAULTS as PAIRING_DEFAULTS
from .cluster.constants import DEFAULTS as CLUSTER_DEFAULTS
from .annotate.constants import DEFAULTS as ANNOTATION_DEFAULTS
from .illustrate.constants import DEFAULTS as ILLUSTRATION_DEFAULTS
from .summary.constants import DEFAULTS as SUMMARY_DEFAULTS
from .tools import SUPPORTED_TOOL
from .align import SUPPORTED_ALIGNER

from .bam.stats import compute_genome_bam_stats, compute_transcriptome_bam_stats
from .bam.cache import BamCache

ENV_VAR_PREFIX = 'MAVIS_'


[docs]def cast(value, cast_func): if cast_func == bool: value = TSV.tsv_boolean(value) else: value = cast_func(value) return value
[docs]class LibraryConfig: def __init__( self, library, protocol, disease_status, bam_file, inputs, read_length, median_fragment_size, stdev_fragment_size, stranded_bam, strand_determining_read=2, **kwargs ): self.library = library self.protocol = PROTOCOL.enforce(protocol) self.bam_file = bam_file self.read_length = int(read_length) self.median_fragment_size = int(median_fragment_size) self.stdev_fragment_size = int(stdev_fragment_size) self.stranded_bam = cast(stranded_bam, bool) self.strand_determining_read = int(strand_determining_read) self.disease_status = DISEASE_STATUS.enforce(disease_status) try: self.inputs = [f for f in re.split('[;\s]+', inputs) if f] except TypeError: self.inputs = inputs acceptable = {} acceptable.update(VALIDATION_DEFAULTS.__dict__) for attr, value in kwargs.items(): if attr == 'assembly_max_kmer_size' and value in [None, 'None', '']: # special case setattr(self, attr, None) else: setattr(self, attr, cast(value, type(acceptable[attr]))) if 'MAVIS_FETCH_METHOD_INDIVIDUAL' not in os.environ and 'fetch_method_individual' not in kwargs: if self.protocol == PROTOCOL.TRANS: self.fetch_method_individual = False else: self.fetch_method_individual = True
[docs] def flatten(self): result = {} result.update(self.__dict__) result['inputs'] = '\n'.join(result['inputs']) return result
[docs] @classmethod def build( self, library, protocol, bam_file, inputs, annotations=None, log=devnull, distribution_fraction=0.98, sample_cap=3000, sample_bin_size=1000, sample_size=500, **kwargs ): PROTOCOL.enforce(protocol) if protocol == PROTOCOL.TRANS and annotations is None: raise AttributeError( 'missing required attribute: annotations. Annotations must be given for transcriptomes') bam = BamCache(bam_file) if protocol == PROTOCOL.TRANS: bamstats = compute_transcriptome_bam_stats( bam, annotations=annotations, sample_size=sample_size, sample_cap=sample_cap, distribution_fraction=distribution_fraction, log=log ) elif protocol == PROTOCOL.GENOME: bamstats = compute_genome_bam_stats( bam, sample_size=sample_size, sample_bin_size=sample_bin_size, sample_cap=sample_cap, distribution_fraction=distribution_fraction, log=log ) else: raise ValueError('unrecognized value for protocol', protocol) log(bamstats) return LibraryConfig( library=library, protocol=protocol, bam_file=bam_file, inputs=inputs, median_fragment_size=bamstats.median_fragment_size, stdev_fragment_size=bamstats.stdev_fragment_size, read_length=bamstats.read_length, strand_determining_read=bamstats.strand_determining_read, **kwargs )
[docs]class JobSchedulingConfig: def __init__(self, validate_memory_gb=12, default_memory_gb=10, queue='transabyss.q'): self.validate_memory_gb = validate_memory_gb self.default_memory_gb = default_memory_gb self.queue = queue
[docs] def flatten(self): result = {} result.update(self.__dict__) return result
[docs]class ReferenceFilesConfig: def __init__( self, annotations=None, reference_genome=None, template_metadata=None, masking=None, aligner_reference=None, dgv_annotation=None, low_memory=False ): self.annotations = annotations or os.environ.get(ENV_VAR_PREFIX + 'ANNOTATIONS', None) self.reference_genome = reference_genome or os.environ.get(ENV_VAR_PREFIX + 'REFERENCE_GENOME', None) self.template_metadata = template_metadata or os.environ.get(ENV_VAR_PREFIX + 'TEMPLATE_METADATA', None) self.masking = masking or os.environ.get(ENV_VAR_PREFIX + 'MASKING', None) self.dgv_annotation = dgv_annotation or os.environ.get(ENV_VAR_PREFIX + 'DGV_ANNOTATION', None) self.low_memory = low_memory or os.environ.get(ENV_VAR_PREFIX + 'LOW_MEMORY', None) self.aligner_reference = aligner_reference or os.environ.get(ENV_VAR_PREFIX + 'ALIGNER_REFERENCE', None)
[docs] def flatten(self): result = {} result.update(self.__dict__) return result
[docs]class PairingConfig: def __init__( self, split_call_distance=PAIRING_DEFAULTS.split_call_distance, contig_call_distance=PAIRING_DEFAULTS.contig_call_distance, flanking_call_distance=PAIRING_DEFAULTS.flanking_call_distance, spanning_call_distance=PAIRING_DEFAULTS.spanning_call_distance, max_proximity=CLUSTER_DEFAULTS.max_proximity, low_memory=False ): self.split_call_distance = int(split_call_distance) self.contig_call_distance = int(contig_call_distance) self.flanking_call_distance = int(flanking_call_distance) self.spanning_call_distance = int(spanning_call_distance)
[docs] def flatten(self): result = {} result.update(self.__dict__) return result
[docs]class SummaryConfig: def __init__( self, filter_min_remapped_reads=SUMMARY_DEFAULTS.filter_min_remapped_reads, filter_min_spanning_reads=SUMMARY_DEFAULTS.filter_min_spanning_reads, filter_min_flanking_reads=SUMMARY_DEFAULTS.filter_min_flanking_reads, filter_min_flanking_only_reads=SUMMARY_DEFAULTS.filter_min_flanking_only_reads, filter_min_split_reads=SUMMARY_DEFAULTS.filter_min_split_reads, filter_min_linking_split_reads=SUMMARY_DEFAULTS.filter_min_linking_split_reads, flanking_call_distance=PAIRING_DEFAULTS.flanking_call_distance, split_call_distance=PAIRING_DEFAULTS.split_call_distance, contig_call_distance=PAIRING_DEFAULTS.contig_call_distance, spanning_call_distance=PAIRING_DEFAULTS.spanning_call_distance, ): self.filter_min_remapped_reads = int(filter_min_remapped_reads) self.filter_min_spanning_reads = int(filter_min_spanning_reads) self.filter_min_flanking_reads = int(filter_min_flanking_reads) self.filter_min_flanking_only_reads = int(filter_min_flanking_only_reads) self.filter_min_split_reads = int(filter_min_split_reads) self.filter_min_linking_split_reads = int(filter_min_linking_split_reads)
[docs] def flatten(self): result = {} result.update(self.__dict__) return result
[docs]def write_config(filename, include_defaults=False, libraries=[], conversions={}, log=devnull): """ Args: filename (str): path to the output file include_defaults (bool): True if default parameters should be written to the config, False otherwise libraries (list of LibraryConfig): library configuration sections conversions (dict of list by str): conversion commands by alias name log (function): function to pass output logging to """ config = {} config['reference'] = ReferenceFilesConfig().flatten() if libraries: for lib in libraries: config[lib.library] = lib.flatten() if include_defaults: config['qsub'] = JobSchedulingConfig().flatten() config['validation'] = {} config['validation'].update(VALIDATION_DEFAULTS.__dict__) config['cluster'] = {} config['cluster'].update(CLUSTER_DEFAULTS.__dict__) config['annotation'] = {} config['annotation'].update(ANNOTATION_DEFAULTS.__dict__) config['illustrate'] = {} config['illustrate'].update(ILLUSTRATION_DEFAULTS.__dict__) for sec in ['qsub', 'illustrate', 'validation', 'cluster', 'annotation']: for tag, val in config[sec].items(): env = ENV_VAR_PREFIX + tag.upper() config[sec][tag] = os.environ.get(env, None) or val config['convert'] = {} for alias, command in conversions.items(): config['convert'][alias] = '\n'.join(command) for sec in config: for tag, value in config[sec].items(): if '_regex_' in tag: config[sec][tag] = re.sub('\$', '$$', config[sec][tag]) elif isinstance(value, list): config[sec][tag] = '\n'.join([str(v) for v in value]) else: config[sec][tag] = str(value) conf = ConfigParser() for sec in config: conf[sec] = {} for tag, val in config[sec].items(): conf[sec][tag] = val with open(filename, 'w') as configfile: log('writing:', filename) conf.write(configfile)
[docs]def validate_and_cast_section(section, defaults): d = {} for attr, value in section.items(): if attr not in defaults: raise KeyError('tag not recognized', attr) else: d[attr] = cast(value, type(defaults[attr])) return d
[docs]def read_config(filepath): """ reads the configuration settings from the configuration file Args: filepath (str): path to the input configuration file Returns: class:`list` of :class:`Namespace`: namespace arguments for each library """ parser = ConfigParser(interpolation=ExtendedInterpolation()) parser.read(filepath) # get the library sections and add the default settings library_sections = [] for sec in parser.sections(): if sec not in [ 'validation', 'reference', 'qsub', 'illustrate', 'annotation', 'cluster', 'annotation', 'convert' ]: library_sections.append(sec) parser[sec]['library'] = sec job_sched = JobSchedulingConfig(**(parser['qsub'] if 'qsub' in parser else {})) ref = ReferenceFilesConfig(**(parser['reference'] if 'reference' in parser else {})) pairing = PairingConfig(**(parser['pairing'] if 'pairing' in parser else {})) summary = SummaryConfig(**(parser['summary'] if 'summary' in parser else {})) global_args = {} global_args.update(job_sched.flatten()) global_args.update(ref.flatten()) global_args.update(ILLUSTRATION_DEFAULTS.__dict__) global_args.update(pairing.flatten()) global_args.update(summary.flatten()) global_args.update(ANNOTATION_DEFAULTS.__dict__) global_args.update(CLUSTER_DEFAULTS.__dict__) try: global_args.update(validate_and_cast_section(parser['illustrate'], ILLUSTRATION_DEFAULTS)) except KeyError: pass try: global_args.update(validate_and_cast_section(parser['annotation'], ANNOTATION_DEFAULTS)) except KeyError: pass try: global_args.update(validate_and_cast_section(parser['cluster'], CLUSTER_DEFAULTS)) except KeyError: pass args = {} args.update(VALIDATION_DEFAULTS.__dict__) try: args.update(parser['validation'] if 'validation' in parser else {}) SUPPORTED_ALIGNER.enforce(args['aligner']) except KeyError: pass # check that the reference files all exist for attr, fname in parser['reference'].items(): if not os.path.exists(fname) and attr != 'low_memory': raise KeyError(attr, 'file at', fname, 'does not exist') global_args[attr] = fname convert = {} if 'convert' in parser: for attr, val in parser['convert'].items(): val = [v for v in re.split('[;\s]+', val) if v] if val[0] == 'convert_tool_output': if len(val) < 3 or val[2] not in SUPPORTED_TOOL: raise UserWarning( 'conversion using the built-in convert_tool_output requires specifying the input file and ' 'tool name currently supported tools include:', SUPPORTED_TOOL.values()) inputs = bash_expands(val[1]) if len(inputs) < 1: raise OSError('input file(s) do not exist', val[1]) if len(val) == 4: val[3] = TSV.tsv_boolean(val[3]) elif len(val) > 4: raise UserWarning( 'conversion using the built-in convert_tool_output takes at most 3 arguments') convert[attr] = val sections = [] for sec in library_sections: d = {} d.update(args) d.update(parser[sec]) # now try building the LibraryConfig object try: lc = LibraryConfig(**d) sections.append(lc) continue except TypeError as terr: # missing required argument try: lc = LibraryConfig.build(**d) sections.append(lc) except Exception as err: raise UserWarning('could not build configuration file', terr, err) for sec in sections: for infile in sec.inputs: if not os.path.exists(infile) and infile not in convert: raise UserWarning('input file does not exist and is not a conversion', infile) if len(library_sections) < 1: raise UserWarning('configuration file must have 1 or more library sections') return MavisNamespace(**global_args), sections, MavisNamespace(**convert)
[docs]def add_semi_optional_argument(argname, success_parser, failure_parser, help_msg=''): """ for an argument tries to get the argument default from the environment variable """ env_name = ENV_VAR_PREFIX + argname.upper() help_msg += ' The default for this argument is configured by setting the environment variable {}'.format(env_name) if os.environ.get(env_name, None): success_parser.add_argument('--{}'.format(argname), required=False, default=os.environ[env_name], help=help_msg) else: failure_parser.add_argument('--{}'.format(argname), required=True, help=help_msg)
[docs]def get_env_variable(arg, default, cast_type=None): """ Args: arg (str): the argument/variable name Returns: the setting from the environment variable if given, otherwise the default value """ if cast_type is None: cast_type = type(default) name = ENV_VAR_PREFIX + arg.upper() result = os.environ.get(name, None) if result is not None: return cast(result, cast_type) else: return default
[docs]def augment_parser(parser, optparser, arguments): for arg in arguments: if arg == 'help': optparser.add_argument('-h', '--help', action='help', help='show this help message and exit') elif arg == 'version': optparser.add_argument( '-v', '--version', action='version', version='%(prog)s version ' + __version__, help='Outputs the version number') elif arg == 'annotations': add_semi_optional_argument( arg, optparser, parser, 'Path to the reference annotations of genes, transcript, exons, domains, etc.') elif arg == 'reference_genome': add_semi_optional_argument(arg, optparser, parser, 'Path to the human reference genome fasta file.') optparser.add_argument( '--low_memory', default=get_env_variable('low_memory', False), type=TSV.tsv_boolean, help='if true defaults to indexing vs loading the reference genome') elif arg == 'template_metadata': add_semi_optional_argument(arg, optparser, parser, 'File containing the cytoband template information.') elif arg == 'masking': add_semi_optional_argument(arg, optparser, parser) elif arg == 'aligner': optparser.add_argument( '--' + arg, default=get_env_variable(arg, VALIDATION_DEFAULTS[arg]), choices=SUPPORTED_ALIGNER.values(), help='aligner to use for aligning contigs') elif arg == 'aligner_reference': add_semi_optional_argument( arg, optparser, parser, 'path to the aligner reference file used for aligning the contig sequences.') elif arg == 'dgv_annotation': add_semi_optional_argument( arg, optparser, parser, 'Path to the dgv reference processed to look like the cytoband file.') elif arg == 'config': parser.add_argument('config', 'path to the config file') elif arg == 'stranded_bam': optparser.add_argument( '--stranded_bam', required=True, type=TSV.tsv_boolean, help='indicates that the input bam file is strand specific') elif arg == 'force_overwrite': optparser.add_argument( '-f', '--force_overwrite', default=get_env_variable(arg, False), type=TSV.tsv_boolean, help='set flag to overwrite existing reviewed files') elif arg == 'output_svgs': optparser.add_argument( '--output_svgs', default=get_env_variable(arg, True), type=TSV.tsv_boolean, help='set flag to suppress svg drawings of putative annotations') elif arg == 'uninformative_filter': optparser.add_argument( '--uninformative_filter', default=get_env_variable(arg, CLUSTER_DEFAULTS.uninformative_filter), type=TSV.tsv_boolean, help='If flag is False then the clusters will not be filtered based on lack of annotation' ) elif arg in CLUSTER_DEFAULTS: value = CLUSTER_DEFAULTS[arg] vtype = type(value) if not isinstance(value, bool) else TSV.tsv_boolean optparser.add_argument( '--{}'.format(arg), default=get_env_variable(arg, value), type=vtype, help='see user manual for desc') elif arg in PAIRING_DEFAULTS: optparser.add_argument( '--{}'.format(arg), default=get_env_variable(arg, PAIRING_DEFAULTS[arg]), type=int, help='distance allowed between breakpoint calls when pairing breakpoints of this call method') elif arg in VALIDATION_DEFAULTS: value = VALIDATION_DEFAULTS[arg] vtype = type(value) if not isinstance(value, bool) else TSV.tsv_boolean optparser.add_argument( '--{}'.format(arg), default=get_env_variable(arg, value), type=vtype, help='see user manual for desc') elif arg in ILLUSTRATION_DEFAULTS: value = ILLUSTRATION_DEFAULTS[arg] vtype = type(value) if not isinstance(value, bool) else TSV.tsv_boolean optparser.add_argument( '--{}'.format(arg), default=get_env_variable(arg, value), type=vtype, help='see user manual for desc') elif arg in ANNOTATION_DEFAULTS: value = ANNOTATION_DEFAULTS[arg] vtype = type(value) if not isinstance(value, bool) else TSV.tsv_boolean optparser.add_argument( '--{}'.format(arg), default=get_env_variable(arg, value), type=vtype, help='see user manual for desc') elif arg in SUMMARY_DEFAULTS: value = SUMMARY_DEFAULTS[arg] vtype = type(value) if not isinstance(value, bool) else TSV.tsv_boolean optparser.add_argument( '--{}'.format(arg), default=get_env_variable(arg, value), type=vtype, help='see user manual for desc') elif arg == 'max_proximity': optparser.add_argument( '--{}'.format(arg), default=get_env_variable(arg, CLUSTER_DEFAULTS[arg]), type=int, help='maximum distance away from an annotation before the uninformative filter is applied or the' 'annotation is not considered for a given event') elif arg == 'bam_file': parser.add_argument('--bam_file', help='path to the input bam file', required=True) elif arg == 'read_length': parser.add_argument( '--read_length', type=int, help='the length of the reads in the bam file', required=True) elif arg == 'stdev_fragment_size': parser.add_argument( '--stdev_fragment_size', type=int, help='expected standard deviation in insert sizes', required=True) elif arg == 'median_fragment_size': parser.add_argument( '--median_fragment_size', type=int, help='median inset size for pairs in the bam file', required=True) elif arg == 'library': parser.add_argument('--library', help='library name', required=True) elif arg == 'protocol': parser.add_argument('--protocol', choices=PROTOCOL.values(), help='library protocol', required=True) elif arg == 'disease_status': parser.add_argument( '--disease_status', choices=DISEASE_STATUS.values(), help='library disease status', required=True) else: raise KeyError('invalid argument', arg)