# -*- coding: utf-8 -*-
from __future__ import with_statement, print_function
from __future__ import absolute_import
'''
author: Soizic Laguitton
organization: I2BM, Neurospin, Gif-sur-Yvette, France
organization: CATI, France
organization: IFR 49<http://www.ifr49.org
license: CeCILL version 2, http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
'''
from ..scheduler import Scheduler
import sys
# try:
# try:
# subprocess32 seems to have its own zmq implementation,
# which proves to be incompatible (too old) with some other modules
# (such as ipython). We load the "official" one first to avoid problems
# import zmq
# except:
# pass
# import subprocess32 as subprocess
# import subprocess as _subprocess
# if hasattr(_subprocess, '_args_from_interpreter_flags'):
# get this private function which is used somewhere in
# multiprocessing
# subprocess._args_from_interpreter_flags \
#= _subprocess._args_from_interpreter_flags
# del _subprocess
# import sys
# sys.modules['subprocess'] = sys.modules['subprocess32']
# print('using subprocess32')
# except ImportError:
# print('subprocess32 could not be loaded - processes may hangup')
# import subprocess
# import ..subprocess
from .. import subprocess
import threading
import time
import os
import signal
import ctypes
import atexit
import six
import logging
try:
# psutil is used to correctly kill a job with its children processes
import psutil
have_psutil = True
except ImportError:
have_psutil = False
import soma_workflow.constants as constants
from soma_workflow.configuration import LocalSchedulerCfg
from soma_workflow.configuration import default_cpu_number, cpu_count
[docs]
class LocalSchedulerError(Exception):
pass
[docs]
class LocalScheduler(Scheduler):
'''
Allow to submit, kill and get the status of jobs.
Run on one machine without dependencies.
* _proc_nb *int*
* _queue *list of scheduler jobs ids*
* _jobs *dictionary job_id -> soma_workflow.engine_types.EngineJob*
* _processes *dictionary job_id -> subprocess.Popen*
* _status *dictionary job_id -> job status as defined in constants*
* _exit_info * dictionay job_id -> exit info*
* _loop *thread*
* _interval *int*
* _lock *threading.RLock*
'''
parallel_job_submission_info = None
# logger = None
_proc_nb = None
_max_proc_nb = None
_queue = None
_jobs = None
_processes = None
_status = None
_exit_info = None
_loop = None
_interval = None
_lock = None
_lasttime = None
_lastidle = None
def __init__(self, proc_nb=default_cpu_number(), interval=0.05,
max_proc_nb=0):
super(LocalScheduler, self).__init__()
self.parallel_job_submission_info = None
self._proc_nb = proc_nb
self._max_proc_nb = max_proc_nb
self._interval = interval
self._queue = []
self._jobs = {}
self._processes = {}
self._status = {}
self._exit_info = {}
self._lock = threading.RLock()
self.stop_thread_loop = False
def loop(self):
while not self.stop_thread_loop:
self._iterate()
self._loop = threading.Thread(name="scheduler_loop",
target=loop,
args=[self])
self._ended_jobs = set()
self._stop_process_loop = False
self._poll_interval = 0.01
self._poll_event = threading.Event()
self._poll_thread = threading.Thread(name='poll_thread',
target=self.poll_processes)
self._poll_thread.daemon = True
self._poll_thread.start()
self._loop.daemon = True
self._loop.start()
atexit.register(LocalScheduler.end_scheduler_thread, self)
if LocalScheduler.logger is None:
LocalScheduler.logger = logging.getLogger('engine.Scheduler')
def change_proc_nb(self, proc_nb):
with self._lock:
self._proc_nb = proc_nb
def change_max_proc_nb(self, proc_nb):
with self._lock:
self._max_proc_nb = proc_nb
def change_interval(self, interval):
with self._lock:
self._interval = interval
def end_scheduler_thread(self):
with self._lock:
self.stop_thread_loop = True
self._stop_process_loop = True
self._loop.join()
self._poll_thread.join()
# print("Soma scheduler thread ended nicely.")
def poll_processes(self):
while True:
if self._stop_process_loop:
break
with self._lock:
processes = dict(self._processes)
fire_event = False
for job_id, process in processes.items():
ret_value = process.poll()
if ret_value != None:
with self._lock:
self._ended_jobs.add(job_id)
self._exit_info[job_id] = (
constants.FINISHED_REGULARLY,
ret_value,
None,
None)
fire_event = True
if fire_event:
self._poll_event.set()
time.sleep(self._poll_interval)
def _iterate(self):
## Nothing to do if the queue is empty and nothing is running
#with self._lock:
#if not self._queue and not self._processes:
#return
# print("#############################")
# Control the running jobs
fire_event = False
with self._lock:
ended_jobs = self._ended_jobs
self._ended_jobs = set()
self._poll_event.clear()
# update for the ended job
for job_id in ended_jobs:
# print("updated job_id " + repr(job_id) + " status DONE")
self._status[job_id] = constants.DONE
job = self._jobs.get(job_id)
if job is not None and job.signal_end:
fire_event = True
try:
del self._processes[job_id]
except KeyError:
continue # deleted by another means
# run new jobs
skipped_jobs = []
with self._lock:
queue = self._queue
self._queue = []
while queue:
job_id = queue.pop(0)
with self._lock:
job = self._jobs.get(job_id)
if job is None:
continue
# print("new job " + repr(job.job_id))
if job.is_engine_execution:
# barrier jobs are not actually run using Popen:
# they succeed immediately.
self._exit_info[job.drmaa_id] = (constants.FINISHED_REGULARLY,
0,
None,
None)
self._status[job.drmaa_id] = constants.DONE
else:
ncpu = self._cpu_for_job(job)
# print('job:', job.command, ', cpus:', ncpu, file=sys.stderr)
if not self._can_submit_new_job(ncpu):
# print('cannot submit.', file=sys.stderr)
skipped_jobs.append(job_id) # postponed
if ncpu == 1: # no other job will be able to run now
break
else:
continue
# print('submitting.', file=sys.stderr)
process = LocalScheduler.create_process(job)
if process is None:
LocalScheduler.logger.error(
'command process is None:' + job.name)
self._exit_info[job.drmaa_id] = (constants.EXIT_ABORTED,
None,
None,
None)
self._status[job.drmaa_id] = constants.FAILED
else:
self._processes[job.drmaa_id] = process
self._status[job.drmaa_id] = constants.RUNNING
# here:
# - skipped_jobs contains jobs that could not be set running for
# lack of available CPUs
# - queue contains remaining jobs when all CPUs have been loaded
# (then the rest is not processed)
# - self._queue can have been added new jobs in the meantime
with self._lock:
self._queue = skipped_jobs + queue + self._queue
if fire_event:
# signal that we can process more jobs
self.jobs_finished_event.set()
self._poll_event.wait(self._interval)
#time.sleep(1)
def _cpu_for_job(self, job):
parallel_job_info = job.parallel_job_info
if parallel_job_info is None:
return 1
ncpu = parallel_job_info.get('nodes_number', 1) \
* parallel_job_info.get('cpu_per_node', 1)
return ncpu
def _can_submit_new_job(self, ncpu=1):
n = sum([self._cpu_for_job(self._jobs[j])
for j in self._processes])
n += ncpu
if n <= self._proc_nb:
return True
max_proc_nb = self._max_proc_nb
if max_proc_nb == 0:
if have_psutil:
max_proc_nb = cpu_count()
else:
max_proc_nb = cpu_count() - 1
if n <= max_proc_nb and self.is_available_cpu(ncpu):
return True
return False
@staticmethod
def is_available_cpu(ncpu=1):
# OK if there is at least one half CPU left idle
if have_psutil:
if LocalScheduler._lasttime is None \
or time.time() - LocalScheduler._lasttime > 0.1:
LocalScheduler._lasttime = time.time()
LocalScheduler._lastidle = psutil.cpu_times_percent().idle \
* psutil.cpu_count() * 0.01
# we allow to run a new job if:
# * the job is monocore and there is 20% of a CPU left
# * there is at least 80% of a CPU. This is arbitrary and needs
# tweaking but it's not so easy since if the job asks for more
# CPU than there are actually, it must not be stuck forever
# anyway, and we can't really forecast if the machine load will
# actually decrease in the future. But it can certainly be better
# than this...
if (ncpu == 1 and LocalScheduler._lastidle > 0.2) \
or (LocalScheduler._lastidle > 0.8):
# decrease artificially idle because we will submit a job,
# and next calls should take it into account
# until another measurement is done.
LocalScheduler._lastidle -= ncpu # increase load artificially
return True
return False
# no psutil: get to the upper limit.
return True
[docs]
@staticmethod
def create_process(engine_job):
'''
* engine_job *EngineJob*
* returns: *Subprocess process*
'''
command = engine_job.plain_command()
env = engine_job.env
stdout = engine_job.plain_stdout()
stdout_file = None
if stdout:
os.makedirs(os.path.dirname(stdout), exist_ok=True)
try:
stdout_file = open(stdout, "wb")
except Exception as e:
LocalScheduler.logger.error(
'exception while opening command stdout:' + repr(stdout))
LocalScheduler.logger.error('command:' + repr(command))
LocalScheduler.logger.error('exception:' + repr(e))
return None
stderr = engine_job.plain_stderr()
stderr_file = None
if stderr:
os.makedirs(os.path.dirname(stderr), exist_ok=True)
try:
stderr_file = open(stderr, "wb")
except Exception as e:
if stdout_file:
stdout_file.close()
LocalScheduler.logger.error(
'exception while opening command stderr:' + repr(stderr))
LocalScheduler.logger.error('command:' + repr(command))
LocalScheduler.logger.error('exception:' + repr(e))
return None
stdin = engine_job.plain_stdin()
stdin_file = None
if stdin:
try:
stdin_file = open(stdin, "rb")
except Exception as e:
if stderr:
s = '%s: %s \n' % (type(e), e)
stderr_file.write(s)
elif stdout:
s = '%s: %s \n' % (type(e), e)
stdout_file.write(s)
if stderr_file:
stderr_file.close()
if stdout_file:
stdout_file.close()
LocalScheduler.logger.error(
'exception while opening command stdin:' + repr(stdin))
LocalScheduler.logger.error('command:' + repr(command))
LocalScheduler.logger.error('exception:' + repr(e))
return None
working_directory = engine_job.plain_working_directory()
try:
if not have_psutil and sys.platform != 'win32':
# if psutil is not here, use process group/session, to allow killing
# children processes as well. see
# http://stackoverflow.com/questions/4789837/how-to-terminate-a-python-subprocess-launched-with-shell-true
kwargs = {'preexec_fn': os.setsid}
else:
kwargs = {}
if env is not None:
env2 = dict(os.environ)
if sys.platform.startswith('win') and six.PY2:
# windows cannot use unicode strings as env values
env2.update([(k.encode('utf-8'), v.encode('utf-8'))
for k, v in six.iteritems(env)])
else:
env2.update(env)
env = env2
LocalScheduler.logger.debug('run command:' + repr(command))
LocalScheduler.logger.debug('with env:' + repr(env))
# ensure all args are strings
command = [str(c) for c in command]
process = subprocess.Popen(command,
stdin=stdin_file,
stdout=stdout_file,
stderr=stderr_file,
cwd=working_directory,
env=env,
**kwargs)
if stderr_file:
stderr_file.close()
if stdout_file:
stdout_file.close()
except Exception as e:
LocalScheduler.logger.error(
'exception while starting command:' + repr(e))
LocalScheduler.logger.error('command:' + repr(command))
if stderr:
s = ('%s: %s \n' % (type(e), e)).encode()
stderr_file.write(s)
elif stdout:
s = ('%s: %s \n' % (type(e), e)).encode()
stdout_file.write(s)
if stderr_file:
stderr_file.close()
if stdout_file:
stdout_file.close()
return None
return process
[docs]
def job_submission(self, jobs):
'''
* job *EngineJob*
* return: *string*
Job id for the scheduling system (DRMAA for example)
'''
drmaa_ids = []
queues = set()
for job in jobs:
if not job.job_id or job.job_id == -1:
raise LocalSchedulerError("Invalid job: no id")
with self._lock:
for job in jobs:
# print("job submission " + repr(job.job_id))
drmaa_id = str(job.job_id)
drmaa_ids.append(drmaa_id)
self._queue.append(drmaa_id)
self._jobs[drmaa_id] = job
self._status[drmaa_id] = constants.QUEUED_ACTIVE
self._queue.sort(key=lambda job_id: self._jobs[job_id].priority,
reverse=True)
return drmaa_ids
[docs]
def get_job_status(self, scheduler_job_id):
'''
* scheduler_job_id *string*
Job id for the scheduling system (DRMAA for example)
* return: *string*
Job status as defined in constants.JOB_STATUS
'''
if not scheduler_job_id in self._status:
raise LocalSchedulerError("Unknown job %s." % scheduler_job_id)
status = self._status[scheduler_job_id]
return status
[docs]
def get_job_exit_info(self, scheduler_job_id):
'''
This function is called only once per job by the engine, thus it also deletes references to the job in internal tables.
* scheduler_job_id *string*
Job id for the scheduling system (DRMAA for example)
* return: *tuple*
exit_status, exit_value, term_sig, resource_usage
'''
# TBI errors
with self._lock:
exit_info = self._exit_info[scheduler_job_id]
del self._exit_info[scheduler_job_id]
del self._status[scheduler_job_id]
del self._jobs[scheduler_job_id]
return exit_info
[docs]
def kill_job(self, scheduler_job_id):
'''
* scheduler_job_id *string*
Job id for the scheduling system (DRMAA for example)
'''
# TBI Errors
with self._lock:
process = self._processes.get(scheduler_job_id)
if process is not None:
# print(" => kill the process ")
if have_psutil:
kill_process_tree(process.pid)
# wait for actual termination, to avoid process writing files after
# we return from here.
process.communicate()
else:
# psutil not available
if sys.platform == 'win32':
# children processes will probably not be killed
# immediately.
process.kill()
else:
# kill process group, to kill children processes as well
# see
# http://stackoverflow.com/questions/4789837/how-to-terminate-a-python-subprocess-launched-with-shell-true
try:
os.killpg(process.pid, signal.SIGKILL)
except ProcessLookupError:
pass # it certainly finished in the meantime
# wait for actual termination, to avoid process writing files after
# we return from here.
process.communicate()
del self._processes[scheduler_job_id]
self._status[scheduler_job_id] = constants.FAILED
self._exit_info[scheduler_job_id] = (constants.USER_KILLED,
None,
None,
None)
elif scheduler_job_id in self._queue:
# print(" => removed from queue ")
self._queue.remove(scheduler_job_id)
del self._jobs[scheduler_job_id]
self._status[scheduler_job_id] = constants.FAILED
self._exit_info[scheduler_job_id] = (constants.EXIT_ABORTED,
None,
None,
None)
[docs]
def kill_process_tree(pid):
"""
Kill a process with its children.
Needs psutil to get children list
"""
process = psutil.Process(pid)
for proc in process.children(recursive=True):
proc.kill()
process.kill()
# set the main scheduler for this module
__main_scheduler__ = ConfiguredLocalScheduler