from datetime import timedelta
from .constants import MavisNamespace
from .util import log, WeakMavisNamespace
SCHEDULER = MavisNamespace(SGE='SGE', SLURM='SLURM', __name__='~mavis.submit.SCHEDULER')
""":class:`~mavis.constants.MavisNamespace`: scheduler types
- :term:`SGE`
- :term:`SLURM`
"""
STD_OPTIONS = ['memory_limit', 'queue', 'time_limit', 'import_env']
OPTIONS = WeakMavisNamespace(__name__='~mavis.submit.options')
""":class:`~mavis.constants.MavisNamespace`: submission options
- :term:`queue`
- :term:`memory_limit`
- :term:`import_env`
- :term:`time_limit`
- :term:`validation_memory`
- :term:`trans_validation_memory`
- :term:`annotation_memory`
- :term:`scheduler`
"""
OPTIONS.add('queue', '', cast_type=str, defn='the queue jobs are to be submitted to')
OPTIONS.add('memory_limit', 16000, defn='the maximum number of megabytes (MB) any given job is allowed') # 16 GB
OPTIONS.add('import_env', True, defn='flag to import environment variables')
OPTIONS.add('time_limit', 10 * 60 * 60, defn='the time in seconds any given jobs is allowed') # 10 hours
OPTIONS.add('validation_memory', 16000, defn='default memory limit (MB) for the validation stage')
OPTIONS.add('trans_validation_memory', 18000, defn='default memory limit (MB) for the validation stage (for transcriptomes)')
OPTIONS.add('annotation_memory', 12000, defn='default memory limit (MB) for the annotation stage')
OPTIONS.add('scheduler', SCHEDULER.SLURM, defn='The scheduler being used', cast_type=SCHEDULER)
[docs]def build_dependency_string(command, delim, jobs):
if isinstance(jobs, str):
return command.format(jobs)
return command.format(delim.join([str(j) for j in jobs]))
SCHEDULER_CONFIG = MavisNamespace(
SGE=MavisNamespace(
shebang='#!/bin/bash',
submit='qsub -terse',
option_prefix='#$',
jobname='-N {}'.format,
dependency=lambda x: build_dependency_string('-hold_jid {}', ',', x),
queue='-q {}'.format,
memory_limit=lambda x: '-l mem_free={0}G,mem_token={0}G,h_vmem={0}G'.format(x // 1000),
join_output=lambda x: '-j {}'.format('y' if x else 'n'),
import_env=lambda x: '-V',
stdout='-o {}/sge-$JOB_NAME-$JOB_ID.log'.format,
time_limit=lambda x: '-l h_rt={}'.format(str(timedelta(seconds=x)))
),
SLURM=MavisNamespace(
shebang='#!/bin/bash -l',
submit='sbatch',
option_prefix='#SBATCH',
jobname='-J {}'.format,
memory_limit='--mem {}M'.format,
time_limit=lambda x: '-t {}'.format(str(timedelta(seconds=x))),
stdout='-o {}/slurm-%x-%j.log'.format,
dependency=lambda x: build_dependency_string('--dependency=afterok:{}', ':', x),
import_env=lambda x: '--export=ALL',
queue='--partition={}'.format
)
)
[docs]class SubmissionScript:
"""
holds scheduler options and build submissions scripts
"""
def __init__(self, content, scheduler, **kwargs):
self.scheduler = scheduler
self.options = {k: kwargs.pop(k, OPTIONS.get(k, None)) for k in ['jobname', 'stdout'] + STD_OPTIONS}
if kwargs:
raise TypeError('unexpected argument(s):', list(kwargs.keys()))
if self.scheduler not in SCHEDULER_CONFIG:
raise ValueError('invalid scheduler', self.scheduler, 'expected', SCHEDULER_CONFIG.keys())
for option, value in self.options.items():
if value and option not in SCHEDULER_CONFIG[self.scheduler]:
raise ValueError('scheduler', self.scheduler, 'does not support the option', option)
self.content = content
def __getattribute__(self, key):
if key == 'options' or key not in self.options:
return object.__getattribute__(self, key)
return self.options[key]
[docs] def write(self, filepath):
"""
write a submission script to the input path
"""
log('writing:', filepath)
with open(filepath, 'w') as fh:
for line in self.build_header():
fh.write(line + '\n')
fh.write('\n' + self.content + '\n')
return filepath