Source code for soma_workflow.schedulers.pbspro_scheduler

'''
organization: I2BM, Neurospin, Gif-sur-Yvette, France

license:CeCILL version 2, http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
'''

from ..scheduler import Scheduler
import sys
import logging
import os
import socket
import json
import soma_workflow.constants as constants
from soma_workflow.errors import DRMError, ExitTimeoutException
from soma_workflow import configuration
import tempfile
from soma_workflow import subprocess
import time
import shutil


class JobTemplate:

    def __init__(self, remoteCommand, outputPath=None, errorPath=None, **kwargs):
        self.remoteCommand = remoteCommand
        self.outputPath = outputPath
        self.errorPath = errorPath
        self.inputPath = None
        self.jobName = ''
        self.queue = None
        self.env = None
        self.nativeSpecification = None
        self.joinFiles = False
        self.workingDirectory = None
        self.__dict__.update(kwargs)

    def build_pbs_script(self, script_file=None):
        if not script_file:
            script_file_ = tempfile.mkstemp()
            script_file = script_file_[1]
            os.close(script_file_[0])
        with open(script_file, 'w') as f:
            f.write('#!/bin/sh\n')
            # PBS options
            if self.jobName:
                special_chars = " ()&:.,;/'%[]"
                job_name = self.jobName
                for special_char in special_chars:
                    job_name = job_name.replace(special_char, '_')
                f.write('#PBS -N %s\n' % job_name)
            if self.queue:
                f.write('#PBS -q %s\n' % self.queue)
            # if self.env:
                # var = ','.join(['"%s=%s"' % (k, v.replace('"', '\\"'))
                                # for k, v in self.env.items()])
                # f.write('#PBS -v %s\n' % var)
            if self.nativeSpecification:
                native_spec = self.nativeSpecification
                if not isinstance(native_spec, list):
                    native_spec = [native_spec]
                for spec in native_spec:
                    f.write('#PBS %s\n' % spec)
            if self.joinFiles:
                f.write('#PBS -j oe\n')

            # env variables
            if self.env:
                for var, value in self.env.items():
                    f.write(f'export {var}={value}\n')

            # working directory
            if self.workingDirectory:
                f.write('cd "%s"\n' % self.workingDirectory)

            # commandline, ensure all args are strings
            escaped_command = [str(x).replace('"', '\\"')
                               for x in self.remoteCommand]
            redirect = ''
            if self.inputPath:
                # handle stdin redirection. Use bash redirection, I don't
                # know  another way.
                redirect = ' < "%s"' % self.inputPath

            f.write('"' + '" "'.join(escaped_command) + '"' + redirect + '\n')
        logger = logging.getLogger('ljp.pbspro_scheduler')
        logger.debug('build_pbs_script: %s' % script_file)
        return script_file

    def qsub_command(self, script_file):
        cmd = PBSProScheduler.qsub_command()
        if script_file:
            if self.outputPath:
                cmd += ['-o', self.outputPath]
            if self.errorPath:
                cmd += ['-e', self.errorPath]
            cmd.append(script_file)
        return cmd

    def run_job(self, script_file=None, keep_script_file=False):
        rm_script = False
        if not script_file:
            rm_script = True
            script_file = self.build_pbs_script()
        try:
            cmd = self.qsub_command(script_file)
            logger = logging.getLogger('ljp.pbspro_scheduler')
            logger.debug('run job: %s' % repr(cmd))
            job_id = subprocess.check_output(
                cmd, stderr=subprocess.STDOUT).decode('utf-8').strip()
            logger.debug('job_id: ' + repr(job_id))
            return job_id
        finally:
            if rm_script and not keep_script_file:
                os.unlink(script_file)
            elif rm_script:
                logger.info('job %s script file: %s' %
                            (self.jobName, script_file))


[docs] class PBSProScheduler(Scheduler): ''' Scheduling using a PBS Pro session. ''' # dict parallel_job_submission_info = None logger = None _configured_native_spec = None tmp_file_path = None is_sleeping = False FAKE_JOB = -167 _out_of_container_command = None def __init__(self, parallel_job_submission_info, tmp_file_path=None, configured_native_spec=None): super().__init__() self.logger = logging.getLogger('ljp.pbspro_scheduler') self.wake() # self.hostname = socket.gethostname() # use full qualified hostname, because of a probable bug on our # cluster. self.hostname = socket.getfqdn() self.parallel_job_submission_info = parallel_job_submission_info self._configured_native_spec = configured_native_spec self.logger.debug("Parallel job submission info: %s", repr(parallel_job_submission_info)) if tmp_file_path == None: self.tmp_file_path = os.path.abspath("tmp") else: self.tmp_file_path = os.path.abspath(tmp_file_path) self.setup_pbs_variant() def __del__(self): self.clean()
[docs] @staticmethod def out_of_container_command(): """ In case this server is running inside a contaner, qsat/qsub commands are not available inside the container but are on the host. We need to get out of the container in such a situation. """ out_container = getattr(PBSProScheduler, '_out_of_container_command', None) if out_container is not None: return out_container # cached if shutil.which('qstat') \ or 'CASA_HOST_DIR' not in os.environ: out_container = [] else: out_container = ['ssh', 'localhost'] PBSProScheduler._out_of_container_command = out_container return out_container
@staticmethod def qstat_command(): return PBSProScheduler.out_of_container_command() + ['qstat'] @staticmethod def qsub_command(): return PBSProScheduler.out_of_container_command() + ['qsub'] @staticmethod def qdel_command(): return PBSProScheduler.out_of_container_command() + ['qdel']
[docs] def submit_simple_test_job(self, outstr, out_o_file, out_e_file): ''' Create a job to test ''' outputPath = "{}:{}".format(self.hostname, os.path.join(self.tmp_file_path, "%s" % (out_o_file))) errorPath = "{}:{}".format(self.hostname, os.path.join(self.tmp_file_path, "%s" % (out_e_file))) jobTemplate = JobTemplate(remoteCommand=['echo', outstr], outputPath=outputPath, errorPath=errorPath) # print("jobTemplate="+repr(jobTemplate)) # print("jobTemplate.remoteCommand="+repr(jobTemplate.remoteCommand)) # print("jobTemplate.outputPath="+repr(jobTemplate.outputPath)) # print("jobTemplate.errorPath="+repr(jobTemplate.errorPath)) jobid = jobTemplate.run_job() # print("jobid="+jobid) retval = self.wait_job(jobid)
# print("retval="+repr(retval))
[docs] def get_pbs_version(self): ''' get PBS implementation and version. May be PBS Pro or Torque/PBS Returns ------- pbs_version: tuple (2 strings) (implementation, version) ''' cmd = self.qstat_command() + ['--version'] verstr = subprocess.check_output(cmd, stderr=subprocess.STDOUT).decode( 'utf-8') if verstr.startswith('pbs_version ='): impl = 'pbspro' ver = verstr.split('=')[1].strip() else: impl = 'torque' ver = verstr.split(':')[1].strip() return (impl, ver)
[docs] def setup_pbs_variant(self): ''' determine PBS implementation / version and set internal behaviour accordingly ''' impl, ver = self.get_pbs_version() self._pbs_impl = impl # strip off suffixes like in '20.0.1~ubuntu_16.04' if '~' in ver: ver = ver[:ver.index('~')] self._pbs_version = [int(x) for x in ver.split('.')] self.logger.info('PBS implementation: %s' % self._pbs_impl) self.logger.info('PBS version: %s' % repr(self._pbs_version))
def _setParallelJob(self, job_template, parallel_job_info): ''' Set the job template information for a parallel job submission. The configuration file must provide the parallel job submission information specific to the cluster in use. Parameters ---------- job_template: JobTemplate job template instance parallel_job_info: dict parallel info dict, containing: nodes_number: int maximum node number the job requests (on a unique machine or separated machine depending on the parallel configuration) cpu_per_node: int nomber of CPUs or cores per node ''' if self.is_sleeping: self.wake() self.logger.debug(">> _setParallelJob") configuration_name = parallel_job_info.get('config_name', 'native') cluster_specific_cfg_name = self.parallel_job_submission_info.get( configuration_name) if cluster_specific_cfg_name: for attribute in configuration.PARALLEL_DRMAA_ATTRIBUTES: value = self.parallel_job_submission_info.get(attribute) if value: value = value.replace( "{config_name}", cluster_specific_cfg_name) value = value.replace("{nodes_number}", repr(parallel_job_info.get( 'nodes_number', 1))) setattr(job_template, attribute, value) self.logger.debug( "Parallel job, drmaa attribute = %s, value = %s ", attribute, value) if parallel_job_info: native_spec = job_template.nativeSpecification if native_spec is None: native_spec = [] elif isinstance(native_spec, str): native_spec = [native_spec] native_spec.append( '-l nodes=%(nodes_number)s:ppn=%(cpu_per_node)s' % parallel_job_info) job_template.nativeSpecification = native_spec job_env = {} for parallel_env_v in configuration.PARALLEL_JOB_ENV: value = self.parallel_job_submission_info.get(parallel_env_v) if value: job_env[parallel_env_v] = value.rstrip() if job_template.env is None: job_template.env = {} job_template.env.update(job_env) self.logger.debug("Parallel job environment : " + repr(job_env)) self.logger.debug("<< _setParallelJob") return job_template
[docs] def job_submission(self, jobs, signal_end=True): ''' Parameters ---------- jobs: soma_workflow.client.Job job to be submitted signal_end: bool Returns ------- job_id: string job id ''' drmaa_ids = [] for job in jobs: try: drmaa_id = self.submit_one_job(job) except: drmaa_id = None drmaa_ids.append(drmaa_id) return drmaa_ids
def submit_one_job(self, job): if self.is_sleeping: self.wake() if job.is_engine_execution: # barrier jobs don't actually go through job submission self.logger.debug('job_submission, PBS - barrier job.') job.status = constants.DONE return self.FAKE_JOB command = job.plain_command() self.logger.info("command: " + repr(command)) self.logger.info("job.name=" + repr(job.name)) stdout_file = job.plain_stdout() stderr_file = job.plain_stderr() stdin = job.plain_stdin() try: outputPath = f"{self.hostname}:{stdout_file}" if stderr_file: errorPath = f"{self.hostname}:{stderr_file}" else: errorPath = None jobTemplate = JobTemplate(remoteCommand=command, outputPath=outputPath, errorPath=errorPath) jobTemplate.jobName = job.name self.logger.info("jobTemplate=" + repr(jobTemplate) + " command[0]=" + repr(command[0]) + " command[1:]=" + repr(command[1:])) self.logger.info( f"hostname and stdout_file= [{self.hostname}]:{stdout_file}") # ensure there is a directory for stdout if not os.path.exists(os.path.dirname(stdout_file)): os.makedirs(os.path.dirname(stdout_file)) if not job.join_stderrout: # ensure there is a directory for stderr if not os.path.exists(os.path.dirname(stderr_file)): os.makedirs(os.path.dirname(stderr_file)) if job.stdin: self.logger.debug("stdin: " + repr(stdin)) jobTemplate.inputPath = stdin working_directory = job.plain_working_directory() if working_directory: jobTemplate.workingDirectory = working_directory self.logger.debug( "JOB NATIVE_SPEC " + repr(job.native_specification)) self.logger.debug( "CONFIGURED NATIVE SPEC " + repr(self._configured_native_spec)) if job.queue: jobTemplate.queue = job.queue self.logger.debug("queue: " + str(job.queue)) native_spec = None if job.native_specification: native_spec = job.native_specification elif self._configured_native_spec: native_spec = self._configured_native_spec if native_spec: jobTemplate.nativeSpecification = str(native_spec) self.logger.debug( "NATIVE specification " + str(native_spec)) if job.parallel_job_info: jobTemplate = self._setParallelJob(jobTemplate, job.parallel_job_info) if job.env: jobTemplate.env = dict(job.env) self.logger.debug("before submit command: " + repr(command)) self.logger.debug("before submit job.name=" + repr(job.name)) job_id = jobTemplate.run_job() except subprocess.CalledProcessError as e: # try: # this will be done in the engine # f = open(stderr_file, "a") # f.write("Error in job submission: %s: %s, output: %s" # % (type(e), e, e.output)) # f.close() # except Exception: # pass self.logger.error("Error in job submission: %s: %s\nOutput:\n%s" % (type(e), e, e.output.decode()), exc_info=sys.exc_info()) raise DRMError("Job submission error: %s:\n%s\nOutput:\n%s" % (type(e), e, e.output.decode())) except Exception as e: self.logger.info('exception in PBS job submission:' + repr(e)) try: f = open(stderr_file, "wa") f.write(f"Error in job submission: {type(e)}: {e}") f.close() except Exception: pass self.logger.error(f"Error in job submission: {type(e)}: {e}", exc_info=sys.exc_info()) raise DRMError(f"Job submission error: {type(e)}: {e}") return job_id
[docs] def kill_job(self, scheduler_job_id): ''' Parameters ---------- scheduler_job_id: string Job id for the scheduling system (DRMAA for example) ''' if self.is_sleeping: self.wake() if scheduler_job_id == self.FAKE_JOB: return # barriers are not run, thus cannot be killed. self.logger.debug('kill job: %s' % scheduler_job_id) try: status = self.get_job_status(scheduler_job_id) if status not in (constants.DONE, constants.FAILED): cmd = self.qdel_command() + [scheduler_job_id] subprocess.check_call(cmd) except Exception as e: self.logger.critical(f"{type(e)}: {e}")
# raise
[docs] def get_job_extended_status(self, scheduler_job_id): ''' Get job full status in a dictionary (from qstat in json format) ''' if self.is_sleeping: self.wake() if scheduler_job_id == self.FAKE_JOB: # a barrier job is done as soon as it is started. return constants.DONE try: if self._pbs_impl == 'pbspro': cmd = self.qstat_command() + ['-x', '-f', '-F', 'json', scheduler_job_id] json_str = subprocess.check_output(cmd).decode('utf-8') super_status = json.loads(json_str) else: # torque/pbs import xml.etree.ElementTree as ET cmd = self.qstat_command() + ['-x', scheduler_job_id] xml_str = subprocess.check_output(cmd).decode('utf-8') super_status = {} s_xml = ET.fromstring(xml_str) xjob = s_xml[0] job = {} parsing = [(xjob, job)] while parsing: element, parent = parsing.pop(0) for child in element: tag = child.tag if tag == 'Job_Id': super_status['Jobs'] = {child.text: job} else: if len(child) != 0: current = {} parsing.append((child, current)) else: current = child.text parent[tag] = current current = None except Exception as e: self.logger.critical(f"{type(e)}: {e}") raise status = super_status['Jobs'][scheduler_job_id] self.logger.debug('get_job_extended_status: ' + repr(scheduler_job_id) + ': ' + repr(status)) return status
def get_pbs_status_codes(self): if self._pbs_impl == 'pbspro': class codes: ARRAY_STARTED = 'B' EXITING = 'E' FINISHED = 'F' HELD = 'H' MOVED = 'M' QUEUED = 'Q' RUNNING = 'R' SUSPENDED = 'S' MOVING = 'T' SUSPEND_KB = 'U' WAITING = 'W' SUBJOB_COMPLETE = 'X' else: # torque/pbs class codes: ARRAY_STARTED = 'B' # unused EXITING = 'E' FINISHED = 'C' HELD = 'H' MOVED = 'M' # unused QUEUED = 'Q' RUNNING = 'R' SUSPENDED = 'S' MOVING = 'T' SUSPEND_KB = 'U' # unused WAITING = 'W' SUBJOB_COMPLETE = 'X' # unused return codes
[docs] def get_job_status(self, scheduler_job_id): ''' Parameters ---------- scheduler_job_id: string Job id for the scheduling system (DRMAA for example) Returns ------- status: string Job status as defined in constants.JOB_STATUS ''' if scheduler_job_id == self.FAKE_JOB: # it's a barrier job, doesn't exist in DRMS, and it's always done. return constants.DONE codes = self.get_pbs_status_codes() try: status = self.get_job_extended_status(scheduler_job_id) state = status['job_state'] self.logger.debug( 'get_job_status for: ' + repr(scheduler_job_id) + ': ', repr(state)) except Exception: return constants.UNDETERMINED if state == codes.ARRAY_STARTED: return constants.RUNNING elif state == codes.EXITING: return constants.RUNNING # FIXME not exactly that elif state == codes.FINISHED: exit_value = status.get('Exit_status', -1) if exit_value != 0: return constants.FAILED return constants.DONE elif state == codes.HELD: return constants.USER_SYSTEM_ON_HOLD # FIXME or USER_ON_HOLD ? elif state == codes.MOVED: return constants.USER_SYSTEM_SUSPENDED # FIXME not exactly that elif state == codes.QUEUED: return constants.QUEUED_ACTIVE elif state == codes.RUNNING: return constants.RUNNING elif state == codes.SUSPENDED: return constants.USER_SYSTEM_SUSPENDED # FIXME or SYSTEM_SUSPENDED ? elif state == codes.MOVING: return constants.USER_SUSPENDED # FIXME not exactly that elif state == codes.SUSPEND_KB: return constants.USER_SYSTEM_SUSPENDED # is it that ? elif state == codes.WAITING: return constants.QUEUED_ACTIVE # FIXME not exactly that elif state == codes.SUBJOB_COMPLETE: return constants.DELETE_PENDING # is it that ? else: return constants.UNDETERMINED
[docs] def get_job_exit_info(self, scheduler_job_id): ''' The exit info consists of 4 values returned in a tuple: **exit_status**: string one of the constants.JOB_EXIT_STATUS values **exit_value**: int exit code of the command (normally 0 in case of success) **term_sig**: int termination signal, 0 IF ok **resource_usage**: unicode string in the shape ``'cpupercent=60 mem=13530kb cput=00:00:12'`` etc. Items may include: * cpupercent * cput * mem * vmem * ncpus * walltime Parameters ---------- scheduler_job_id: string Job id for the scheduling system (DRMAA for example) Returns ------- exit_info: tuple exit_status, exit_value, term_sig, resource_usage ''' if self.is_sleeping: self.wake() if scheduler_job_id == self.FAKE_JOB: res_resourceUsage = '' res_status = constants.FINISHED_REGULARLY res_exitValue = 0 res_termSignal = None return (res_status, res_exitValue, res_termSignal, res_resourceUsage) res_resourceUsage = [] res_status = constants.EXIT_UNDETERMINED res_exitValue = 0 res_termSignal = None try: self.logger.debug( " ==> Start to find info of job %s" % (scheduler_job_id)) status = self.get_job_extended_status(scheduler_job_id) jid_out = status['Output_Path'] exit_value = status.get('Exit_status', -1) signaled = False term_sig = 0 if exit_value >= 256: signaled = True term_sig = exit_value % 256 coredumped = (term_sig == 2) aborted = (exit_value <= -4 and exit_value >= -6) exit_status = exit_value resource_usage = status.get('resources_used', {}) # jid_out, exit_value, signaled, term_sig, coredumped, aborted, exit_status, resource_usage = self._drmaa.wait( # scheduler_job_id, self._drmaa.TIMEOUT_NO_WAIT) self.logger.debug(" ==> jid_out=" + repr(jid_out)) self.logger.debug(" ==> exit_value=" + repr(exit_value)) self.logger.debug(" ==> signaled=" + repr(signaled)) self.logger.debug(" ==> term_sig=" + repr(term_sig)) self.logger.debug(" ==> coredumped=" + repr(coredumped)) self.logger.debug(" ==> aborted=" + repr(aborted)) self.logger.debug(" ==> exit_status=" + repr(exit_status)) self.logger.debug( " ==> resource_usage=" + repr(resource_usage)) if aborted: res_status = constants.EXIT_ABORTED else: if exit_value == 0: res_status = constants.FINISHED_REGULARLY res_exitValue = exit_status else: res_exitValue = exit_status if signaled: res_status = constants.FINISHED_TERM_SIG res_termSignal = term_sig else: # in soma-workflow a job with a non-zero exit code # has still finished regularly (ran without being # interrupted) # res_status = constants.EXIT_ABORTED res_status = constants.FINISHED_REGULARLY self.logger.info(" ==> res_status=" + repr(res_status)) res_resourceUsage = '' for k, v in resource_usage.items(): res_resourceUsage = res_resourceUsage + \ k + '=' + str(v) + ' ' except ExitTimeoutException: res_status = constants.EXIT_UNDETERMINED self.logger.debug(" ==> wait time out") return (res_status, res_exitValue, res_termSignal, res_resourceUsage)
[docs] def wait_job(self, job_id, timeout=0, poll_interval=0.3): ''' Wait for a specific job to be terminated. ''' status = self.get_job_extended_status(job_id) stime = time.time() while status['job_state'] not in ('F', ): if timeout > 0 and time() - stime > timeout: raise ExitTimeoutException('wait_job timed out') time.sleep(poll_interval) status = self.get_job_extended_status(job_id) return status
[docs] @classmethod def build_scheduler(cls, config): ''' Build an instance of PBSProScheduler ''' sch = PBSProScheduler( config.get_parallel_job_config(), os.path.expanduser("~"), configured_native_spec=config.get_native_specification()) return sch