Source code for scheduler
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import print_function
import six
import os
import inspect
import importlib
import threading
[docs]
class Scheduler(object):
'''
Allow to submit, kill and get the status of jobs.
The Scheduler class is an abstract class which specifies the jobs
management API. It has several implementations, located in
``soma_workflow.schedulers.*_scheduler``.
A scheduler implementation class can be retrived using the global function
:func:`get_scheduler_implementation`, or instantiated using
:func:`build_scheduler`.
New schedulers can be written to support computing resources types that are
currently not supported (a cluster with a DRMS which has no DRMAA
implementation typicalyly). The various methods of the Scheduler API have
to be overloaded in this case.
'''
parallel_job_submission_info = None
logger = None
is_sleeping = None
jobs_finished_event = None
def __init__(self):
self.parallel_job_submission_info = None
self.is_sleeping = False
self.jobs_finished_event = threading.Event()
def sleep(self):
self.is_sleeping = True
def wake(self):
self.is_sleeping = False
def clean(self):
pass
[docs]
def job_submission(self, jobs):
'''
Submit a Soma-Workflow job
Parameters
----------
jobs: EngineJob or list[EngineJob]
Job to be submitted
Returns
-------
job_id: list[str]
Job id for the scheduling system (DRMAA for example, or native DRMS
identifier).
If some submissions failed, None is in the list instead of the job
id.
'''
raise Exception("Scheduler is an abstract class!")
[docs]
def get_job_status(self, scheduler_job_id):
'''
Parameters
----------
scheduler_job_id: string
Job id for the scheduling system (DRMAA for example)
Returns
-------
status: string
Job status as defined in constants.JOB_STATUS
'''
raise Exception("Scheduler is an abstract class!")
[docs]
def get_job_exit_info(self, scheduler_job_id):
'''
The exit info consists of 4 values returned in a tuple:
**exit_status**: string
one of the constants.JOB_EXIT_STATUS values
**exit_value**: int
exit code of the command (normally 0 in case of success)
**term_sig**: int
termination signal, 0 IF ok
**resource_usage**: bytes
bytes string in the shape
``b'cpupercent=60 mem=13530kb cput=00:00:12'`` etc. Items may include:
* cpupercent
* cput
* mem
* vmem
* ncpus
* walltime
Parameters
----------
scheduler_job_id: string
Job id for the scheduling system (DRMAA for example)
Returns
-------
exit_info: tuple
exit_status, exit_value, term_sig, resource_usage
'''
raise Exception("Scheduler is an abstract class!")
[docs]
def kill_job(self, scheduler_job_id):
'''
Parameters
----------
scheduler_job_id: string
Job id for the scheduling system (DRMAA for example)
'''
raise Exception("Scheduler is an abstract class!")
[docs]
@classmethod
def build_scheduler(cls, config):
''' Create a scheduler of the expected type, using configuration to
parameterize it.
Parameters
----------
config: Configuration
configuration object instance
'''
raise Exception("Scheduler is an abstract class!")
[docs]
def get_scheduler_implementation(scheduler_type):
''' Get the scheduler class implementation corresponding to the expected
one.
Parameters
----------
scheduler_type: str
scheduler type: 'drmaa', 'drmaa2', 'local_basic', 'mpi', or other
custom scheduler
Returns
-------
scheduler_class: Scheduler subclass
'''
from . import schedulers
if scheduler_type == 'local_basic':
scheduler_type = 'local'
sched_dir = os.path.dirname(schedulers.__file__)
if os.path.exists(os.path.join(sched_dir,
'%s_scheduler.py' % scheduler_type)):
sched_mod = '%s_scheduler' % scheduler_type
# try:
module = importlib.import_module('.%s' % sched_mod,
'soma_workflow.schedulers')
sched_list = []
# if there is a __main_scheduler__, just use it
scheduler = getattr(module, '__main_scheduler__', None)
if scheduler is not None:
return scheduler
for element in six.itervalues(module.__dict__):
if element in sched_list:
continue # avoid duplicates
if inspect.isclass(element) and element is not Scheduler \
and issubclass(element, Scheduler):
sched_list.append(element)
if element.__name__.lower() == ('%sscheduler'
% scheduler_type).lower():
# fully matching
return element
if len(sched_list) == 1:
# unambiguous
return sched_list[0]
if len(sched_list) == 0:
print('Warning: module soma_workflow.schedulers.%s contains '
'no scheduler:' % sched_mod)
else:
print('Warning: module soma_workflow.schedulers.%s contains '
'several schedulers:' % sched_mod)
print([s.__name__ for s in sched_list])
# except ImportError:
raise NameError('scheduler type %s is not found' % scheduler_type)
[docs]
def build_scheduler(scheduler_type, config):
''' Create a scheduler of the expected type, using configuration to
parameterize it.
Parameters
----------
scheduler_type: string
type of scheduler to be built
config: Configuration
configuration object
Returns
-------
scheduler: Scheduler
Scheduler instance
'''
scheduler_class = get_scheduler_implementation(scheduler_type)
scheduler = scheduler_class.build_scheduler(config)
return scheduler
[docs]
def get_schedulers_list():
'''
List all available installed schedulers
Returns
-------
schedulers: list
schedulers list. Each item is a tuple (name, enabled)
'''
from . import schedulers
dirname = os.path.dirname(schedulers.__file__)
sched_files = os.listdir(dirname)
schedulers = []
for sched_file in sched_files:
if sched_file.endswith('_scheduler.py'):
sched_mod = sched_file[:-3]
enabled = True
try:
module = importlib.import_module('.%s' % sched_mod,
'soma_workflow.schedulers')
except NotImplementedError:
continue # skip not implemented / unfinished ones
except Exception:
enabled = False
if sched_mod == 'local_scheduler':
sched_mod = 'local_basic_scheduler'
sched = sched_mod[:-10]
schedulers.append((sched, enabled))
return schedulers