#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import time
import threading
import os
from datetime import datetime
from datetime import timedelta
import socket
import weakref
from soma_workflow import subprocess
from soma_workflow import connection
import sys
import traceback
import shutil
# import cProfile
# import pdb
PYQT4 = "pyqt4"
PYQT5 = "pyqt5"
PYQT6 = "pyqt6"
PYSIDE = "pyside"
QT_BACKEND = None
if 'PyQt4' in sys.modules:
QT_BACKEND = PYQT4
elif 'PyQt6' in sys.modules:
QT_BACKEND = PYQT6
elif 'PyQt5' in sys.modules:
QT_BACKEND = PYQT5
elif 'PySide' in sys.modules:
QT_BACKEND = PYSIDE
if QT_BACKEND is None:
qt_api = os.environ.get('QT_API')
if qt_api == 'pyqt6':
QT_BACKEND = PYQT6
elif qt_api == 'pyqt5':
QT_BACKEND = PYQT5
elif qt_api in ('pyqt4', 'pyqt'):
QT_BACKEND = PYQT4
elif qt_api == 'pyside':
QT_BACKEND = PYSIDE
if QT_BACKEND is None:
try:
from PyQt6 import QtGui, QtCore, QtWidgets
QT_BACKEND = PYQT6
except ImportError as e:
try:
from PyQt5 import QtGui, QtCore, QtWidgets
QT_BACKEND = PYQT5
except ImportError as e:
raise Exception("Soma-workflow Gui requires PyQt4, PyQt5, PyQt6 or PySide.")
if QT_BACKEND is None:
try:
import sip
sip_classes = ['QString', 'QVariant', 'QDate', 'QDateTime',
'QTextStream', 'QTime', 'QUrl', 'QStringList']
for sip_class in sip_classes:
try:
sip.setapi(sip_class, 2)
except ValueError as e:
pass
from PyQt4 import QtGui, QtCore # noqa: F811
QT_BACKEND = PYQT4
except ImportError as e:
pass
if QT_BACKEND is None:
try:
from PySide import QtGui, QtCore # noqa: F811
QT_BACKEND = PYSIDE
except ImportError as e:
pass
use_qvariant = False
if QT_BACKEND == PYQT4:
import sip
from PyQt4 import QtCore, QtGui, uic # noqa: F811
from PyQt4.uic import loadUiType
use_qvariant = False
if sip.getapi('QVariant') < 2:
use_qvariant = True
QtCore.Slot = QtCore.pyqtSlot
QtCore.Signal = QtCore.pyqtSignal
elif QT_BACKEND == PYQT5:
from PyQt5 import QtCore, QtGui, QtWidgets, uic # noqa: F811
from PyQt5.uic import loadUiType
QtCore.Slot = QtCore.pyqtSlot
QtCore.Signal = QtCore.pyqtSignal
# copy QtWidgets contents into QtGui
for key in QtWidgets.__dict__:
if not key.startswith('__') and key not in QtGui.__dict__:
setattr(QtGui, key, getattr(QtWidgets, key))
# more hacks
QtGui.QSortFilterProxyModel = QtCore.QSortFilterProxyModel
QtGui.QItemSelectionModel = QtCore.QItemSelectionModel
try:
from PyQt5 import sip
sys.modules['sip'] = sip
except ImportError:
import sip
elif QT_BACKEND == PYQT6:
from PyQt6 import QtCore, QtGui, QtWidgets, uic # noqa: F811
from PyQt6.uic import loadUiType
QtCore.Slot = QtCore.pyqtSlot
QtCore.Signal = QtCore.pyqtSignal
# copy QtWidgets contents into QtGui
for key in QtWidgets.__dict__:
if not key.startswith('__') and key not in QtGui.__dict__:
setattr(QtGui, key, getattr(QtWidgets, key))
# more hacks
QtGui.QSortFilterProxyModel = QtCore.QSortFilterProxyModel
QtGui.QItemSelectionModel = QtCore.QItemSelectionModel
try:
from PyQt6 import sip
sys.modules['sip'] = sip
except ImportError:
import sip
elif QT_BACKEND == PYSIDE:
from PySide import QtCore, QtGui, QtUiTools
from soma_workflow.client import Workflow, Group, FileTransfer, SharedResourcePath, TemporaryPath, Job, WorkflowController, Helper
from soma_workflow.engine_types import EngineWorkflow, EngineJob, EngineTransfer
import soma_workflow.constants as constants
import soma_workflow.configuration as configuration
from soma_workflow.test.workflow_tests import WorkflowExamples
from soma_workflow.test.workflow_tests import WorkflowExamplesLocal
from soma_workflow.test.workflow_tests import WorkflowExamplesShared
from soma_workflow.test.workflow_tests import WorkflowExamplesSharedTransfer
from soma_workflow.test.workflow_tests import WorkflowExamplesTransfer
from soma_workflow.errors import UnknownObjectError, ConfigurationError, SerializationError, WorkflowError, JobError, ConnectionError, DatabaseError
import soma_workflow.version as version
import six
def utf8(string):
"""Convert a Unicode or utf-8 encoded string to a native Python str.
This function's name is misleading: while on Python 2 it really
returns a utf-8-encoded string, on Python 3 it return a native
Unicode Python str.
This function is robust to incorrectly-encoded utf-8 strings, and
will replace undecodable sequences with the Unicode
U+FFFD REPLACEMENT CHARACTER.
"""
if isinstance(string, bytes):
unicode_string = string.decode('utf-8', errors='replace')
else:
unicode_string = str(string) # to handle QString
return six.ensure_str(unicode_string, 'utf-8')
MATPLOTLIB = True
try:
import matplotlib
if QT_BACKEND == PYQT6:
matplotlib.use('Qt5Agg') # apparently not Qt6Agg
elif QT_BACKEND == PYQT5:
matplotlib.use('Qt5Agg')
elif QT_BACKEND == PYQT4:
matplotlib.use('Qt4Agg')
elif QT_BACKEND == PYSIDE:
matplotlib.use('Qt5Agg')
if 'backend.qt4' in list(matplotlib.rcParams.keys()):
matplotlib.rcParams['backend.qt4'] = 'PySide'
else:
print("Could not use Matplotlib, the backend using PySide "
"is missing.")
MATPLOTLIB = False
if QT_BACKEND in (PYQT4, ):
from matplotlib.backends.backend_qt4agg \
import FigureCanvasQTAgg as FigureCanvas
else:
from matplotlib.backends.backend_qt5agg \
import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
import matplotlib.pyplot
import matplotlib.dates as mdates
except ImportError as e:
print(f"Could not use Matplotlib: {type(e)} {e}")
MATPLOTLIB = False
# from soma.utils.sip_compat:
import inspect
import enum
def sip4_to_sip6_enums(module, recursive=True):
''' Convert Sip4 style enums to sip6 style enums.
Sip4 exports C++ enums as in C++, ex::
instance.ENUM_VALUE # where ENUM_VALUE is an instance of EnumType
Sip6 keeps values inside the enum type::
instance.EnumType.ENUM_VALUE
In order to maintain code compatible, we duplicat these values. If using
sip4, then we copy values inside the enum types as sip6 does.
If recursive (the default), sub-modules are also scanned and modified.
'''
todo = [module]
done = set()
while todo:
module = todo.pop(0)
if id(module) in done:
continue
done.add(id(module))
enums = set()
for iname, item in module.__dict__.items():
if type(item).__name__ == 'enumtype' and id(item) not in done:
enums.add(item)
done.add(id(item))
elif inspect.isclass(item) or (recursive
and inspect.ismodule(item)):
todo.append(item)
if enums:
for iname, item in module.__dict__.items():
if type(item) in enums:
setattr(type(item), iname, item)
def sip6_to_sip4(module, recursive=True):
''' Convert Sip6 style enums to sip4 style enums.
Sip4 exports C++ enums as in C++, ex::
instance.ENUM_VALUE # where ENUM_VALUE is an instance of EnumType
Sip6 keeps values inside the enum type::
instance.EnumType.ENUM_VALUE
In order to maintain code compatible, we duplicat these values. If using
sip6, then we copy values outside the enum types as sip4 does.
If recursive (the default), sub-modules are also scanned and modified.
'''
todo = [module]
done = set()
while todo:
module = todo.pop(0)
if id(module) in done:
continue
done.add(id(module))
enums = []
for iname, item in module.__dict__.items():
if type(item) is enum.EnumMeta and id(item) not in done:
enums.append(item)
done.add(id(item))
elif inspect.isclass(item) or (recursive
and inspect.ismodule(item)):
todo.append(item)
for entype in enums:
for name in entype._member_names_:
setattr(module, name, getattr(entype, name))
def sip_export_enums(module, recursive=True):
''' Convert Sip6 style enums to sip4 style enums, or the contrary.
Sip4 exports C++ enums as in C++, ex::
instance.ENUM_VALUE # where ENUM_VALUE is an instance of EnumType
Sip6 keeps values inside the enum type::
instance.EnumType.ENUM_VALUE
In order to maintain code compatible, we duplicat these values. If using
sip6, then we copy values outside the enum types as sip4 does. If using
sip4, then we copy values inside the enum types as sip6 does.
If recursive (the default), sub-modules are also scanned and modified.
'''
sip = sys.modules['sip']
if sip.SIP_VERSION >= 0x060000:
sip6_to_sip4(module, recursive=recursive)
else:
sip4_to_sip6_enums(module, recursive=recursive)
# ---
sip_export_enums(QtCore)
sip_export_enums(QtGui)
sip_export_enums(QtWidgets)
class ProtocolError(Exception):
pass
class ConnectionClosedError(ProtocolError):
pass
# QFileDialog options:
# in binary packages containing thirdpary libs/python, we must use
# DontUseNativeDialog to prevent loading external libs from the system.
if QT_BACKEND == PYQT6:
filedialog_options = QtGui.QFileDialog.Option.DontUseNativeDialog
else:
filedialog_options = QtGui.QFileDialog.DontUseNativeDialog
#-----------------------------------------------------------------------------
# Globals and constants
#-----------------------------------------------------------------------------
NOT_SUBMITTED_WF_ID = -1
NOT_SUBMITTED_JOB_ID = -1
GRAY = QtGui.QColor(200, 200, 180)
BLUE = QtGui.QColor(0, 200, 255)
RED = QtGui.QColor(255, 100, 50)
GREEN = QtGui.QColor(155, 255, 50)
LIGHT_BLUE = QtGui.QColor(200, 255, 255)
if QT_BACKEND in (PYQT4, PYQT5, PYQT6):
'''
The types can be loaded directly from the ui files (useful during developpement)
'''
Ui_JobInfo = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'JobInfo.ui'))[0]
Ui_GraphWidget = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'graphWidget.ui'))[0]
Ui_PlotWidget = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'PlotWidget.ui'))[0]
Ui_TransferInfo = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'TransferInfo.ui'))[0]
Ui_GroupInfo = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'GroupInfo.ui'))[0]
Ui_ConnectionDlg = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'connection_dlg.ui'))[0]
Ui_WorkflowExampleDlg = uic.loadUiType(
os.path.join(os.path.dirname(__file__),
'workflowExampleDlg.ui'))[0]
Ui_SubmissionDlg = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'submissionDlg.ui'))[0]
Ui_ResourceWfSelect = uic.loadUiType(
os.path.join(os.path.dirname(__file__),
'resource_wf_select.ui'))[0]
Ui_MainWindow = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'main_window.ui'))[0]
Ui_WStatusNameDate = uic.loadUiType(
os.path.join(os.path.dirname(__file__),
'wf_status_name_date.ui'))[0]
Ui_SWMiniWidget = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'sw_mini.ui'))[0]
Ui_SearchWidget = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'search_widget.ui'))[0]
Ui_LocalSchedulerConfigController = uic.loadUiType(
os.path.join(os.path.dirname(__file__),
'local_scheduler_widget.ui'))[0]
Ui_WorkflowEngineConfigController = uic.loadUiType(
os.path.join(os.path.dirname(__file__),
'engine_controller_widget.ui'))[0]
Ui_ServerManagement = uic.loadUiType(
os.path.join(os.path.dirname(__file__),
'ServerManagement.ui'))[0]
Ui_NewServer = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'ServerNewDlg.ui'))[0]
Ui_RequirePW = uic.loadUiType(os.path.join(os.path.dirname(__file__),
'RequirePW.ui'))[0]
else:
from soma_workflow.gui.ui_job_info import Ui_JobInfo
from soma_workflow.gui.ui_graph_widget import Ui_GraphWidget
from soma_workflow.gui.ui_plot_widget import Ui_PlotWidget
from soma_workflow.gui.ui_transfer_info import Ui_TransferInfo
from soma_workflow.gui.ui_group_info import Ui_GroupInfo
from soma_workflow.gui.ui_connection_dlg import Ui_ConnectionDlg
from soma_workflow.gui.ui_workflow_example_dlg import Ui_WorkflowExampleDlg
from soma_workflow.gui.ui_submission_dlg import Ui_SubmissionDlg
from soma_workflow.gui.ui_resource_wf_select import Ui_ResourceWfSelect
from soma_workflow.gui.ui_main_window import Ui_MainWindow
from soma_workflow.gui.ui_wf_status_name_date import Ui_WStatusNameDate
from soma_workflow.gui.ui_sw_mini_widget import Ui_SWMiniWidget
from soma_workflow.gui.ui_search_widget import Ui_SearchWidget
from soma_workflow.gui.ui_local_scheduler_cfg_ctrl import Ui_LocalSchedulerConfigController
from soma_workflow.gui.ui_workflow_engine_cfg_ctrl import Ui_WorkflowEngineConfigController
# from soma_workflow.gui.ui_server_management import Ui_ServerManagement
# from soma_workflow.gui.ui_new_server import Ui_NewServer
# Ui_RequirePW
#-----------------------------------------------------------------------------
# Local utilities
#-----------------------------------------------------------------------------
def setLabelFromString(label, value):
'''
@type value: string
'''
if value == label.text():
return
if value:
label.setText(utf8(value))
else:
label.setText("")
def setTextEditFromString(text_edit, value):
'''
@type value: string
'''
if value == text_edit.toPlainText():
return
if value:
text_edit.setText(utf8(value))
else:
text_edit.setText("")
def setLabelFromInt(label, value):
'''
@type value: int
'''
if not value == None:
label.setText(repr(value))
else:
label.setText("")
def setLabelFromTimeDelta(label, value):
'''
@type value: datetime.timedelta
'''
if not value == None:
hours = value.seconds // 3600
mins = (value.seconds % 3600) // 60
seconds = (value.seconds % 3600) % 60
hours = hours + value.days * 24
label.setText(f"{repr(hours)}:{repr(mins)}:{repr(seconds)}")
else:
label.setText("")
def setLabelFromDateTime(label, value):
'''
@type value: datetime.datetime
'''
if not value == None:
datetime = QtCore.QDateTime(value)
label.setText(datetime.toString("dd/MM/yy HH:mm:ss"))
else:
label.setText("")
def workflow_status_icon(status=None):
'''
return the icon file path
'''
file_path = None
if status == None:
file_path = None
elif status == constants.WORKFLOW_NOT_STARTED:
file_path = os.path.join(
os.path.dirname(__file__), "icon/no_status.png")
elif status == constants.WORKFLOW_IN_PROGRESS:
file_path = os.path.join(os.path.dirname(__file__), "icon/running.png")
elif status == constants.WORKFLOW_DONE:
file_path = os.path.join(os.path.dirname(__file__), "icon/done.png")
elif status == constants.DELETE_PENDING:
file_path = os.path.join(
os.path.dirname(__file__), "icon/kill_delete_pending.png")
elif status == constants.WARNING:
file_path = os.path.join(os.path.dirname(__file__), "icon/warning.png")
return file_path
class QResizeMessageBox(QtGui.QMessageBox):
def __init__(self, *args, **kwargs):
QtGui.QMessageBox.__init__(self, *args, **kwargs)
self.setSizeGripEnabled(True)
def event(self, e):
result = QtGui.QMessageBox.event(self, e)
self.setMinimumHeight(0)
self.setMaximumHeight(16777215)
self.setMinimumWidth(0)
self.setMaximumWidth(16777215)
self.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding)
textEdit = self.findChild(QtGui.QTextEdit)
if textEdit != None :
textEdit.setMinimumHeight(0)
textEdit.setMaximumHeight(16777215)
textEdit.setMinimumWidth(0)
textEdit.setMaximumWidth(16777215)
textEdit.setSizePolicy(QtGui.QSizePolicy.Expanding, QtGui.QSizePolicy.Expanding)
return result
def detailed_critical_message_box(msg, title, parent):
long_msg_indic = "**More details:**"
if long_msg_indic in msg:
indic_index = msg.index(long_msg_indic)
long_msg = msg[indic_index + len(long_msg_indic):]
short_msg = msg[:indic_index]
message_box = QResizeMessageBox(parent)
message_box.setIcon(QtGui.QMessageBox.Critical)
message_box.setWindowTitle(title)
message_box.setText(short_msg)
message_box.setDetailedText(long_msg)
message_box.setSizeGripEnabled(True)
message_box.setSizePolicy(QtGui.QSizePolicy.Expanding,
QtGui.QSizePolicy.Expanding)
message_box.exec()
else:
QResizeMessageBox.critical(parent, "error", "%s" % (msg))
#-----------------------------------------------------------------------------
# Classes and functions
#-----------------------------------------------------------------------------
class Controller:
@staticmethod
def delete_workflow(wf_id,
force,
wf_ctrl):
return wf_ctrl.delete_workflow(wf_id, force)
@staticmethod
def delete_all_workflows(force,
wf_ctrl):
return Helper.delete_all_workflows(wf_ctrl, force)
@staticmethod
def stop_workflow(wf_id,
wf_ctrl):
return wf_ctrl.stop_workflow(wf_id)
@staticmethod
def change_workflow_expiration_date(wf_id,
date,
wf_ctrl):
return wf_ctrl.change_workflow_expiration_date(wf_id, date)
@staticmethod
def get_submitted_workflows(wf_ctrl):
if wf_ctrl:
return wf_ctrl.workflows()
else:
return {}
@staticmethod
def restart_workflow(workflow_id, queue, wf_ctrl):
return wf_ctrl.restart_workflow(workflow_id, queue)
@staticmethod
def stop_jobs(wf_id, job_ids, wf_ctrl):
return wf_ctrl.stop_jobs(wf_id, job_ids)
@staticmethod
def restart_jobs(wf_id, job_ids, wf_ctrl):
return wf_ctrl.restart_jobs(wf_id, job_ids)
@staticmethod
def get_connection(resource_id,
login,
password,
rsa_key_pass,
config=None,
isolated_light_mode=None):
print("Lauching workflow controller")
wf_ctrl = WorkflowController(resource_id=resource_id,
login=login,
password=password,
config=config,
rsa_key_pass=rsa_key_pass,
isolated_light_mode=isolated_light_mode)
return wf_ctrl
@staticmethod
def disconnect(wf_ctrl):
wf_ctrl.disconnect()
@staticmethod
def serialize_workflow(file_path, workflow):
Helper.serialize(file_path, workflow)
@staticmethod
def unserialize_workflow(file_path):
workflow = Helper.unserialize(file_path)
return workflow
@staticmethod
def get_queues(wf_ctrl):
return wf_ctrl.config.get_queues()
@staticmethod
def submit_workflow(workflow, expiration_date, name, queue, wf_ctrl):
wf_id = wf_ctrl.submit_workflow(
workflow=workflow,
expiration_date=expiration_date,
name=name,
queue=queue)
# TEST
# return wf_id
workflow = wf_ctrl.workflow(wf_id)
return workflow
@staticmethod
def transfer_input_files(workflow_id, wf_ctrl, buffer_size=256 ** 2):
Helper.transfer_input_files(workflow_id, wf_ctrl, buffer_size)
@staticmethod
def transfer_output_files(workflow_id, wf_ctrl, buffer_size=256 ** 2):
Helper.transfer_output_files(workflow_id, wf_ctrl, buffer_size)
class SomaWorkflowMiniWidget(QtGui.QWidget):
# SomaWorkflowWidget
sw_widget = None
# soma_workflow.gui.WorkflowGui.ApplicationModel
model = None
action_show_more_less = None
def __init__(self, model, sw_widget, parent=None):
super().__init__(parent)
self.ui = Ui_SWMiniWidget()
self.ui.setupUi(self)
self.sw_widget = sw_widget
self.model = model
self.resource_ids = []
self.action_add_resource = QtGui.QAction("Add a resource", self)
self.addAction(self.action_add_resource)
self.action_disconnect_resource = QtGui.QAction(
"Disconnect a resource", self)
self.addAction(self.action_disconnect_resource)
self.setContextMenuPolicy(QtCore.Qt.ActionsContextMenu)
self.ui.table.doubleClicked.connect(self.sw_widget.show)
self.ui.table.doubleClicked.connect(self.raise_sw_widget)
self.action_add_resource.triggered.connect(self.add_resource)
self.action_disconnect_resource.triggered.connect(
self.disconnect_resource)
self.ui.add_resource_tool_button.setDefaultAction(
self.action_add_resource)
self.ui.add_resource_tool_button.setText("Add")
self.ui.disconnect_resource_tool_button.setDefaultAction(
self.action_disconnect_resource)
self.ui.disconnect_resource_tool_button.setText("Disconnect")
self.model.global_workflow_state_changed.connect(self.refresh)
self.model.current_connection_changed.connect(self.connection_changed)
self.ui.table.itemSelectionChanged.connect(
self.resource_selection_changed)
self.connection_changed()
@QtCore.Slot()
def raise_sw_widget(self):
self.sw_widget.raise_()
@QtCore.Slot()
def resource_selection_changed(self):
selected_items = self.ui.table.selectedItems()
if len(selected_items) == 0:
return
item = selected_items[0]
if use_qvariant:
rid = utf8(str(
item.data(QtCore.Qt.UserRole).toString()))
else:
rid = utf8(item.data(QtCore.Qt.UserRole))
# print('resource_selection_changed:', repr(rid))
self.model.set_current_connection(rid)
@QtCore.Slot()
def connection_changed(self):
if self.model.current_resource_id == None:
return
submitted_wf = []
if self.model.current_resource_id not in self.resource_ids:
while True:
try:
submitted_wf = Controller.get_submitted_workflows(
self.model.current_connection)
except ConnectionClosedError as e:
if not self.reconnectAfterConnectionClosed():
return
else:
break
workflow_ids = self.sw_widget.workflow_filter(submitted_wf)
for wf_id in workflow_ids:
if self.model.is_loaded_workflow(wf_id):
self.model.set_current_workflow(wf_id)
else:
workflow_info_dict \
= self.model.current_connection.workflows(
[wf_id])
if len(workflow_info_dict) > 0:
# The workflow exist
workflow_status \
= self.model.current_connection.workflow_status(
wf_id)
workflow_info = workflow_info_dict[wf_id]
self.model.add_to_submitted_workflows(
wf_id,
workflow_exp_date=workflow_info[1],
workflow_name=workflow_info[0],
workflow_status=workflow_status)
self.resource_ids.append(self.model.current_resource_id)
self.refresh()
self.ui.table.selectRow(
self.resource_ids.index(self.model.current_resource_id))
self.model.set_no_current_workflow()
@QtCore.Slot()
def add_resource(self):
(resource_id, new_connection) = self.sw_widget.createConnection()
if new_connection:
self.model.add_connection(resource_id, new_connection)
@QtCore.Slot()
def disconnect_resource(self):
selected = self.ui.table.selectedItems()
resources = []
for item in selected:
if use_qvariant:
rid = utf8(str(
item.data(QtCore.Qt.UserRole).toString()))
else:
rid = utf8(item.data(QtCore.Qt.UserRole))
if rid and rid != "None":
resources.append(rid)
if len(resources) >= len(self.resource_ids):
QResizeMessageBox.warning(
self, "Cannot disconnect all resources",
"We must keep at least one valid resource.")
return
if len(resources) != 0:
resp = QResizeMessageBox.question(
self, "Disconnect the following resources ?",
"\n".join(resources),
QtGui.QMessageBox.Ok | QtGui.QMessageBox.Cancel)
if resp == QtGui.QMessageBox.Ok:
for resource_id in resources:
if resource_id in self.resource_ids:
self.resource_ids.remove(resource_id)
self.model.delete_connection(resource_id)
@QtCore.Slot()
def refresh(self):
try:
self.ui.table.clear()
self.ui.table.setColumnCount(3)
self.ui.table.setRowCount(len(self.resource_ids))
row = 0
for rid in self.resource_ids:
if not rid in self.model.resource_pool.resource_ids():
self.resource_ids.remove(rid)
continue
status_list = self.model.list_workflow_status(rid)
running = status_list.count(constants.WORKFLOW_IN_PROGRESS)
warning = status_list.count(constants.WARNING)
to_display = ""
if running == 0 and warning == 0:
icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/done.png"))
elif warning > 0:
icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/warning.png"))
if warning == 1:
to_display = to_display + " (1 warning)"
else:
to_display = to_display + \
" (" + repr(warning) + " warnings)"
if running == 1:
to_display = to_display + " (1 is running)"
elif running > 1:
to_display = to_display + \
" (" + repr(running) + " are running)"
else:
icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/running.png"))
if running == 1:
to_display = to_display + " (1 is running)"
else:
to_display = to_display + \
" (" + repr(running) + " are running)"
item = QtGui.QTableWidgetItem(rid + " ")
item.setData(QtCore.Qt.UserRole, rid)
self.ui.table.setItem(row, 0, item)
self.ui.table.setItem(row, 1, QtGui.QTableWidgetItem(
icon, repr(len(status_list)) + " workflows" + to_display))
resource = self.model.resource_pool.connection(rid)
if resource.config.get_scheduler_type() \
== configuration.LOCAL_SCHEDULER:
scheduler_config = resource.scheduler_config
if not scheduler_config and resource.engine_config_proxy:
scheduler_config \
= self.model.current_connection.scheduler_config
if scheduler_config:
scheduler_widget = LocalSchedulerConfigController(
scheduler_config,
self)
self.ui.table.setCellWidget(row, 2, scheduler_widget)
self.ui.table.resizeColumnToContents(2)
elif resource.engine_config_proxy.get_queue_limits() \
or resource.engine_config_proxy.get_running_jobs_limits():
controller_widget = WorkflowEngineConfigController(
resource.engine_config_proxy,
self)
self.ui.table.setCellWidget(row, 2, controller_widget)
self.ui.table.resizeColumnToContents(2)
row = row + 1
if self.model.current_resource_id != None and \
self.model.current_resource_id in self.resource_ids:
self.ui.table.selectRow(
self.resource_ids.index(self.model.current_resource_id))
self.ui.table.resizeColumnsToContents()
except Exception as e:
print('exception in Qt slot:', e, file=sys.stderr)
traceback.print_exc()
class LocalSchedulerConfigController(QtGui.QWidget):
scheduler_config = None
def __init__(self, scheduler_config, parent=None):
super().__init__(parent)
self.ui = Ui_LocalSchedulerConfigController()
self.ui.setupUi(self)
self.scheduler_config = scheduler_config
cpu_count = scheduler_config.get_cpu_count()
self.ui.advice_label.setText(" " + repr(cpu_count) + " CPUs detected")
proc_nb = scheduler_config.get_proc_nb()
if proc_nb is None:
self.ui.spin_box.setValue(-1)
else:
self.ui.spin_box.setValue(proc_nb)
max_proc_nb = scheduler_config.get_max_proc_nb()
if max_proc_nb is None:
self.ui.max_spin_box.setValue(0)
else:
self.ui.max_spin_box.setValue(max_proc_nb)
self.ui.spin_box.valueChanged.connect(self.nb_proc_changed)
self.ui.max_spin_box.valueChanged.connect(self.max_proc_changed)
def nb_proc_changed(self, nb_proc):
self.scheduler_config.set_proc_nb(nb_proc)
def max_proc_changed(self, nb_proc):
self.scheduler_config.set_max_proc_nb(nb_proc)
class WorkflowEngineConfigController(QtGui.QWidget):
engine_config = None
queue_limits = None
running_jobs_limits = None
def __init__(self, engine_config, parent=None):
super().__init__(parent)
self.ui = Ui_WorkflowEngineConfigController()
self.ui.setupUi(self)
self.engine_config = engine_config
self.queue_limits = self.engine_config.get_queue_limits()
self.running_jobs_limits = self.engine_config.get_running_jobs_limits()
queues = ['default'] + sorted([x
for x in self.engine_config.get_queues()
if x is not None])
if 'default' in queues[1:]:
del queues[queues.index('default', 1)]
for queue_name in queues:
if queue_name is not None:
self.ui.combo_queue.addItem(queue_name)
self.ui.combo_queue.currentIndexChanged.connect(self.update_limit)
self.ui.limit.valueChanged.connect(self.limit_changed)
self.ui.max_running.valueChanged.connect(self.max_running_changed)
self.update_limit()
def update_limit(self):
self.ui.limit.blockSignals(True)
self.ui.max_running.blockSignals(True)
queue_name = utf8(self.ui.combo_queue.currentText())
if queue_name == "default":
queue_name = None
self.ui.limit.setValue(self.queue_limits.get(queue_name, 0))
self.ui.max_running.setValue(self.running_jobs_limits.get(
queue_name, 0))
self.ui.limit.blockSignals(False)
self.ui.max_running.blockSignals(False)
def limit_changed(self, limit):
queue_name = utf8(self.ui.combo_queue.currentText())
if queue_name == "default":
queue_name = None
self.engine_config.change_queue_limits(queue_name, limit)
self.queue_limits = self.engine_config.get_queue_limits()
def max_running_changed(self, limit):
queue_name = utf8(self.ui.combo_queue.currentText())
if queue_name == "default":
queue_name = None
self.engine_config.change_running_jobs_limits(queue_name, limit)
self.running_jobs_limits = self.engine_config.get_running_jobs_limits()
class RequirePWDialog(QtGui.QDialog):
ui = None
is_install = False
strPW = None
strRSAPW = None
def __init__(self, parent=None):
super().__init__(parent=parent)
self.ui = Ui_RequirePW()
self.ui.setupUi(self)
self.ui.pushButton_ok.clicked.connect(self.EventOK)
def EventOK(self):
self.strPW = self.ui.lineEdit_PW.text()
self.strPW = utf8(self.strPW)
self.strRSAPW = self.ui.lineEdit_RSAPW.text()
self.strRSAPW = utf8(self.strRSAPW)
self.close()
pass
class NewServerDialog(QtGui.QDialog):
ui = None
is_install = False
def __init__(self, parent=None):
super().__init__(parent=parent)
self.ui = Ui_NewServer()
self.ui.setupUi(self)
self.update_schedulers()
self.ui.lineEdit_login.textChanged.connect(self.EventLoginTextChanged)
self.ui.lineEdit_cluster_add.textChanged.connect(self.UpdateResName)
self.ui.pushButton_Install.clicked.connect(self.InstallServer)
self.ui.pushButton_Connect.clicked.connect(
self.SetupServerNoInstallation)
# from soma_workflow.setup_client2server import GetHomeDirOnServer
# GetHomeDirOnServer()
def EventLoginTextChanged(self):
self.UpdateResName()
self.UpdateInstallationPath()
def UpdateResName(self):
strLogin = self.ui.lineEdit_login.text()
strLogin = utf8(strLogin)
strAdd = self.ui.lineEdit_cluster_add.text()
strAdd = utf8(strAdd)
ResName = strLogin + "@" + strAdd
self.ui.lineEdit_ResName.setText(ResName)
def UpdateInstallationPath(self):
strLogin = self.ui.lineEdit_login.text()
strLogin = utf8(strLogin)
self.ui.lineEdit_InstallPath.setText(
"/home/" + strLogin + "/.soma-workflow")
def update_schedulers(self):
from soma_workflow import scheduler
schedulers = scheduler.get_schedulers_list()
for scheduler_inst in schedulers:
# add only non-builtin
if scheduler_inst[0] not in ('local_basic', 'mpi', 'drmaa'):
self.ui.comboBox_schedulerType.addItem(scheduler_inst[0])
def InstallServer(self):
from soma_workflow.setup_client2server import InstallSomaWF2Server, check_if_somawfdb_on_server
strLogin = self.ui.lineEdit_login.text()
strLogin = utf8(strLogin)
strPort = self.ui.lineEdit_Port.text()
strPort = utf8(strPort)
intPort = int(strPort)
strAdd = self.ui.lineEdit_cluster_add.text()
strAdd = utf8(strAdd)
ResName = self.ui.lineEdit_ResName.text()
ResName = utf8(ResName)
strPW = self.ui.lineEdit_PW.text()
strPW = utf8(strPW)
strPWRSA = self.ui.lineEdit_RSAKeyPW.text()
strPWRSA = utf8(strPWRSA)
strInstallPath = self.ui.lineEdit_InstallPath.text()
strInstallPath = utf8(strInstallPath)
options = {}
scheduler_type = self.ui.comboBox_schedulerType.currentText()
scheduler_type = utf8(scheduler_type).lower()
if scheduler_type == 'local basic (SWF)':
options['scheduler_type'] = 'local_basic'
elif scheduler_type != 'drmaa':
options['scheduler_type'] = scheduler_type
if check_if_somawfdb_on_server(ResName, strLogin, strAdd, userpw=strPW, sshport=intPort):
reply = QtGui.QMessageBox.question(self, 'Message',
"Soma-workflow is running on your server. Are you sure to remove it and install it ?",
QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if reply == QtGui.QMessageBox.No:
return
try:
InstallSomaWF2Server(
strLogin, strAdd, ResName, userpw=strPW,
install_swf_path_server=strInstallPath, sshport=intPort,
config_options=options)
except Exception as e:
import traceback
tb = traceback.format_tb(sys.exc_info()[2])
tb = '\n'.join(tb)
QtGui.QMessageBox.critical(
self, "Oops...", "Oops...Failed to install soma-workflow.\n"
"%s:\n%s\n\n"
"traceback:\n%s" % (str(type(e)), str(e), tb))
self.is_install = False
else:
self.is_install = True
QtGui.QMessageBox.information(
self, "Information", "Succeed to install soma-workflow on %s" % (strInstallPath))
self.close()
def SetupServerNoInstallation(self):
from soma_workflow.setup_client2server import SetupSomaWF2Server
strLogin = self.ui.lineEdit_login.text()
strLogin = utf8(strLogin)
strPort = self.ui.lineEdit_Port.text()
strPort = utf8(strPort)
intPort = int(strPort)
strAdd = self.ui.lineEdit_cluster_add.text()
strAdd = utf8(strAdd)
ResName = self.ui.lineEdit_ResName.text()
ResName = utf8(ResName)
strPW = self.ui.lineEdit_PW.text()
strPW = utf8(strPW)
strPWRSA = self.ui.lineEdit_RSAKeyPW.text()
strPWRSA = utf8(strPWRSA)
strInstallPath = self.ui.lineEdit_InstallPath.text()
strInstallPath = utf8(strInstallPath)
try:
SetupSomaWF2Server(
strLogin, strAdd, ResName, userpw=strPW, sshport=intPort)
except Exception as e:
QtGui.QMessageBox.critical(self, "Oops...", "Oops...%s" % (e))
self.is_install = False
else:
self.is_install = True
QtGui.QMessageBox.information(
self, "Information", "Succeed to connect soma-workflow")
self.close()
class ServerManagementDialog(QtGui.QDialog):
login_list = ""
resource_list = []
login_list = {}
config_file_path = None
add_widget = None
def __init__(self, parent=None):
super().__init__(parent=parent)
self.ui = Ui_ServerManagement()
self.ui.setupUi(self)
self.ui.btn_add_server.clicked.connect(self.add_server)
self.ui.btn_remove_server.clicked.connect(self.remove_server)
self.ui.btn_rm_serveronclient.clicked.connect(
self.remove_server_on_client)
self.ui.combo_resources.currentIndexChanged.connect(self.update_login)
self.UpdateInterface()
@QtCore.Slot()
def UpdateInterface(self):
try:
self.config_file_path = configuration.Configuration.search_config_path(
)
self.resource_list = configuration.Configuration.get_configured_resources(
self.config_file_path)
self.login_list = configuration.Configuration.get_logins(
self.config_file_path)
except ConfigurationError as e:
QtGui.QMessageBox.critical(
self, "Configuration problem", "%s" % (e))
self.close()
self.ui.combo_resources.clear()
self.ui.combo_resources.addItems(self.resource_list)
self.ui.combo_resources.setEnabled(True)
self.update_login()
@QtCore.Slot()
def update_login(self):
resource_id = str(
self.ui.combo_resources.currentText()).encode('utf-8')
if resource_id == '' or resource_id == None:
return
login = None
if resource_id in self.login_list:
login = self.login_list[resource_id]
if login != None:
self.ui.lineEdit_login.setText(login)
else:
self.ui.lineEdit_login.clear()
self.ui.combo_queues.clear()
self.ui.lineEdit_InstallPath.clear()
config = None
if self.config_file_path != None:
config = configuration.Configuration.load_from_file(
resource_id, self.config_file_path)
self.ui.lineEdit_cluster_add.clear()
if config != None:
queues = sorted(config.get_queues())
cluster_address = config.get_cluster_address()
installpath = config.get_res_install_path()
if queues != None:
self.ui.combo_queues.addItems(queues)
if cluster_address != None:
self.ui.lineEdit_cluster_add.setText(cluster_address)
if installpath != None:
self.ui.lineEdit_InstallPath.setText(installpath)
@QtCore.Slot()
def add_server(self):
self.add_widget = NewServerDialog(self)
self.add_widget.exec()
self.UpdateInterface()
@QtCore.Slot()
def remove_server(self):
from soma_workflow.setup_client2server import RemoveSomaWF2Server
reply = QtGui.QMessageBox.question(self, 'Message',
"Are you sure to remove soma-workflow on your server ? "
"(Please make sure you are **NOT** connecting to the server that is going to be removed)",
QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if reply == QtGui.QMessageBox.No:
return
resource_id = str(
self.ui.combo_resources.currentText()).encode('utf-8')
if self.config_file_path != None:
config = configuration.Configuration.load_from_file(
resource_id, self.config_file_path)
sshport = config.get_ssh_port()
installpath = config.get_res_install_path()
login = self.login_list[resource_id]
cluster_add = config.get_cluster_address()
getPWDlg = RequirePWDialog(self)
RemoveSomaWF2Server(
installpath, resource_id, login, cluster_add, getPWDlg.strPW, int(sshport))
self.UpdateInterface()
QtGui.QMessageBox.information(
self, "Information", "Finish to remove soma-workflow on the cluster.")
@QtCore.Slot()
def remove_server_on_client(self):
from soma_workflow.setup_client2server import RemoveResNameOnConfigureFile
resource_id = str(
self.ui.combo_resources.currentText()).encode('utf-8')
if resource_id != None:
RemoveResNameOnConfigureFile(resource_id)
self.UpdateInterface()
[docs]
class ConnectionDialog(QtGui.QDialog):
[docs]
def __init__(self,
login_list,
resource_list,
resource_id=None,
editable_resource=True,
parent=None):
super().__init__(parent=parent)
self.ui = Ui_ConnectionDlg()
self.ui.setupUi(self)
self.login_list = login_list
self.ui.combo_resources.addItems(resource_list)
self.ui.combo_resources.setEnabled(editable_resource)
if resource_id != None:
index = resource_list.index(resource_id)
self.ui.combo_resources.setCurrentIndex(index)
self.ui.combo_resources.currentIndexChanged.connect(self.update_login)
self.update_login()
self.ui.kill_button.clicked.connect(self.kill_servers)
[docs]
@QtCore.Slot()
def update_login(self):
resource_id = str(
self.ui.combo_resources.currentText())
resource_id = six.ensure_str(resource_id, 'utf-8')
login = self.login_list[resource_id]
if login != None:
self.ui.lineEdit_login.setText(login)
else:
self.ui.lineEdit_login.clear()
[docs]
@QtCore.Slot()
def kill_servers(self):
resource_id = str(self.ui.combo_resources.currentText())
erase_db = self.ui.erase_db_checkbox.isChecked()
login = str(self.ui.lineEdit_login.text())
passwd = str(self.ui.lineEdit_password.text())
#rsa_passwd = unicode(self.ui.lineEdit_rsa_password.text())
print('kill_servers', resource_id, erase_db)
connection.RemoteConnection.kill_remote_servers(
resource_id, login=login, passwd=passwd, clear_db=erase_db)
class SomaWorkflowWidget(QtGui.QWidget):
ui = None
model = None
resource_list = None
config_file_path = None
login_list = None
update_workflow_list_from_model = None
workflow_info_widget = None
closing = QtCore.Signal()
def __init__(self,
model,
user=None,
auto_connect=False,
computing_resource=None,
parent=None,
flags=0,
config_file=None,
db_file=None,
interactive=False,
isolated_light_mode=None):
super().__init__(parent)
self.ui = Ui_ResourceWfSelect()
self.ui.setupUi(self)
self.model = model
self.update_workflow_list_from_model = False
self.workflow_info_widget = WorkflowInfoWidget(self.model, parent=self)
self.ui.wf_info_layout.addWidget(self.workflow_info_widget)
self.model.current_connection_changed.connect(
self.currentConnectionChanged)
self.model.current_workflow_changed.connect(
self.current_workflow_changed)
self.model.connection_closed_error[()].connect(
self.reconnectAfterConnectionClosed)
self.model.connection_closed_error[str].connect(
self.reconnectAfterConnectionClosed)
self.model.global_workflow_state_changed.connect(
self.update_workflow_status_icons)
self.config_file_path = config_file
self.UpdateLocalparameters()
self.ui.combo_resources.addItems(self.resource_list)
self.workflow_info_widget.hide()
self.ui.toolButton_button_delete_wf.setDefaultAction(
self.ui.action_delete_workflow)
self.ui.toolButton_delete_all.setDefaultAction(
self.ui.action_delete_all)
self.ui.action_about.setIcon(
QtGui.QIcon(os.path.join(os.path.dirname(__file__), "icon/soma_workflow_icon.png")))
self.ui.action_about.triggered.connect(self.display_about_dlg)
self.ui.action_submit.triggered.connect(self.submit_workflow)
self.ui.action_transfer_infiles.triggered.connect(
self.transferInputFiles)
self.ui.action_transfer_outfiles.triggered.connect(
self.transferOutputFiles)
self.ui.action_open_wf.triggered.connect(self.openWorkflow)
self.ui.action_create_wf_ex.triggered.connect(
self.createWorkflowExample)
self.ui.action_delete_workflow.triggered.connect(self.delete_workflow)
self.ui.action_delete_all.triggered.connect(self.delete_all_workflows)
self.ui.action_change_expiration_date.triggered.connect(
self.change_expiration_date)
self.ui.action_save.triggered.connect(self.saveWorkflow)
self.ui.action_restart.triggered.connect(self.restart_workflow)
self.ui.action_stop_wf.triggered.connect(self.stop_workflow)
self.ui.actionServer_Management.triggered.connect(
self.openServerManagement)
# disable server management until it is fixed
#self.ui.actionServer_Management.setEnabled(False)
#self.ui.actionServer_Management.setVisible(False)
self.ui.list_widget_submitted_wfs.itemSelectionChanged.connect(
self.workflowSelectionChanged)
self.ui.combo_resources.currentIndexChanged.connect(
self.resourceSelectionChanged)
self.ui.wf_list_refresh_button.clicked.connect(
self.refreshWorkflowList)
self.connection_dlg = ConnectionDialog(self.login_list,
self.resource_list,
parent=self)
self.connection_dlg.accepted.connect(self.firstConnection)
self.connection_dlg.rejected.connect(self.close)
self.db_file = db_file
self.isolated_light_mode = isolated_light_mode
# First connection:
# Try to connect directly:
if computing_resource is None and self.config_file_path is not None:
computing_resource \
= configuration.Configuration.get_local_resource_id(
config=None, config_file_path=self.config_file_path)
if interactive:
if computing_resource is not None:
print('interactive, resource:', computing_resource)
self.connection_dlg.ui.combo_resources.setCurrentText(
computing_resource)
if user is not None:
self.connection_dlg.ui.lineEdit_login.setText(user)
self.connection_dlg.show()
else:
if computing_resource:
print('connect to computing resource:', computing_resource)
self.connect_to_controller(computing_resource, user)
else:
if auto_connect and user is not None \
and len(self.resource_list) > 0:
print('connect to computing resource:',
self.resource_list[0])
self.connect_to_controller(self.resource_list[0], user)
else:
print('connect to computing resource:',
socket.gethostname())
self.connect_to_controller(socket.gethostname())
if self.model.current_resource_id != None:
self.currentConnectionChanged()
def UpdateLocalparameters(self):
try:
self.config_file_path \
= configuration.Configuration.search_config_path()
self.resource_list \
= configuration.Configuration.get_configured_resources(
self.config_file_path)
self.login_list = configuration.Configuration.get_logins(
self.config_file_path)
except ConfigurationError as e:
QtGui.QMessageBox.critical(
self, "Configuration problem", "%s" % (e))
self.close()
def closeEvent(self, event):
self.closing.emit()
def display_about_dlg(self):
message_box = QtGui.QMessageBox(QtGui.QMessageBox.NoIcon,
"About Soma-workflow",
"\n\nVersion: %s \n\nDocumentation and examples: http://www.brainvisa.info/soma-workflow" % (
version.shortVersion),
parent=self)
message_box.setIconPixmap(
QtGui.QPixmap(os.path.join(os.path.dirname(__file__),
"icon/logo.png")))
message_box.exec()
def connect_to_controller(self,
resource_id,
login=None,
password=None,
rsa_key_pass=None):
if self.model.resource_pool.resource_exist(resource_id):
self.model.set_current_connection(resource_id)
return
wf_ctrl = None
QtGui.QApplication.setOverrideCursor(QtCore.Qt.WaitCursor)
config = None
local_resource_id = configuration.Configuration.get_local_resource_id(
None, config_file_path=self.config_file_path)
if self.config_file_path is not None \
or (resource_id in ('localhost', socket.gethostname(),
local_resource_id)
and self.db_file is not None):
config = configuration.Configuration.load_from_file(
resource_id=resource_id,
config_file_path=self.config_file_path)
if resource_id in ('localhost', socket.gethostname(),
local_resource_id) \
and self.db_file is not None:
config._database_file = self.db_file
try:
wf_ctrl = Controller.get_connection(
resource_id,
login,
password,
rsa_key_pass,
config=config,
isolated_light_mode=self.isolated_light_mode)
QtGui.QApplication.restoreOverrideCursor()
except ConfigurationError as e:
QtGui.QApplication.restoreOverrideCursor()
QtGui.QMessageBox.critical(
self, "Configuration problem", "%s" % (e))
self.connection_dlg.ui.lineEdit_password.clear()
self.connection_dlg.show()
except Exception as e:
QtGui.QApplication.restoreOverrideCursor()
msg = str(e) + '\n**More details:**\n' + traceback.format_exc()
detailed_critical_message_box(msg=msg,
title="Connection failed",
parent=self)
self.connection_dlg.ui.lineEdit_password.clear()
self.connection_dlg.show()
else:
self.model.add_connection(resource_id, wf_ctrl)
self.connection_dlg.hide()
# pass
@QtCore.Slot()
def firstConnection(self):
resource_id = str(
self.connection_dlg.ui.combo_resources.currentText())
if self.connection_dlg.ui.lineEdit_login.text():
login = str(
self.connection_dlg.ui.lineEdit_login.text()).encode('utf-8')
else:
login = None
if self.connection_dlg.ui.lineEdit_password.text():
password = str(
self.connection_dlg.ui.lineEdit_password.text()).encode(
'utf-8')
else:
password = None
if self.connection_dlg.ui.lineEdit_rsa_password.text():
rsa_key_pass = str(
self.connection_dlg.ui.lineEdit_rsa_password.text()).encode(
'utf-8')
else:
rsa_key_pass = None
self.connect_to_controller(resource_id, login, password, rsa_key_pass)
# pass
@QtCore.Slot()
def openServerManagement(self):
self.server_widget = ServerManagementDialog(self)
self.server_widget.exec()
# pass
@QtCore.Slot()
def openWorkflow(self):
file_path = QtGui.QFileDialog.getOpenFileName(
self, "Open a workflow", "", "", "", filedialog_options)
file_path = file_path[0]
if file_path:
try:
workflow = Controller.unserialize_workflow(file_path)
except SerializationError as e:
QtGui.QMessageBox.warning(
self, "Error opening the workflow", "%s" % (e))
else:
self.model.add_workflow(NOT_SUBMITTED_WF_ID,
datetime.now() + timedelta(days=5),
workflow.name,
constants.WORKFLOW_NOT_STARTED,
workflow)
self.updateWorkflowList()
@QtCore.Slot()
def saveWorkflow(self):
file_path = QtGui.QFileDialog.getSaveFileName(
self, "Save the current workflow", "", "", "",
filedialog_options)
file_path = file_path[0]
if file_path:
try:
Controller.serialize_workflow(
file_path, self.model.current_workflow().server_workflow)
except SerializationError as e:
QtGui.QMessageBox.warning(
self, "Error", f"{type(e)}: {e}")
@QtCore.Slot()
def createWorkflowExample(self):
workflowExample_dlg = QtGui.QDialog(self)
ui = Ui_WorkflowExampleDlg()
ui.setupUi(workflowExample_dlg)
ui.comboBox_example_type.addItems(
WorkflowExamples.get_workflow_example_list())
if workflowExample_dlg.exec() == QtGui.QDialog.Accepted:
with_file_transfer = ui.checkBox_file_transfers.checkState(
) == QtCore.Qt.Checked
with_shared_resource_path = ui.checkBox_shared_resource_path.checkState(
) == QtCore.Qt.Checked
example_type = ui.comboBox_example_type.currentIndex()
file_path = QtGui.QFileDialog.getSaveFileName(self,
"Create a workflow example")
file_path = QtGui.QFileDialog.getSaveFileName(
self, "Create a workflow example", "", "", "",
filedialog_options)
if file_path:
try:
if with_file_transfer and not with_shared_resource_path:
wf_examples = WorkflowExamplesTransfer()
elif with_file_transfer and with_shared_resource_path:
wf_examples = WorkflowExamplesSharedTransfer()
elif not with_file_transfer and with_shared_resource_path:
wf_examples = WorkflowExamplesShared()
else:
wf_examples = WorkflowExamplesLocal()
except ConfigurationError as e:
QtGui.QMessageBox.warning(self, "Error", "%s" % (e))
else:
workflow = wf_examples.get_workflow_example(example_type)
try:
Controller.serialize_workflow(file_path, workflow)
except SerializationError as e:
QtGui.QMessageBox.warning(self, "Error", "%s" % (e))
@QtCore.Slot()
def submit_workflow(self,
date=None,
name=None,
queue=None):
assert(self.model.current_workflow())
# date is the only mandatory argument
# if date is None the submission dialog has to be open.
if date == None:
submission_dlg = QtGui.QDialog(self)
ui = Ui_SubmissionDlg()
ui.setupUi(submission_dlg)
ui.resource_label.setText(self.model.current_resource_id)
if self.model.current_workflow().name == None:
ui.lineedit_wf_name.setText("")
else:
ui.lineedit_wf_name.setText(
self.model.current_workflow().server_workflow.name)
ui.dateTimeEdit_expiration.setDateTime(
datetime.now() + timedelta(days=5))
queues = ["default"]
queues.extend(sorted([q for q in Controller.get_queues(
self.model.current_connection) if q not in (None, 'default')]))
ui.combo_queue.addItems(queues)
if submission_dlg.exec() != QtGui.QDialog.Accepted:
return (None, None)
if ui.lineedit_wf_name.text():
name = utf8(ui.lineedit_wf_name.text())
else:
name = None
qtdt = ui.dateTimeEdit_expiration.dateTime()
date = datetime(
qtdt.date().year(), qtdt.date().month(), qtdt.date().day(),
qtdt.time().hour(), qtdt.time().minute(), qtdt.time().second())
queue = utf8(ui.combo_queue.currentText())
if queue == "default":
queue = None
while True:
try:
workflow = Controller.submit_workflow(
# TEST
# workflow_id = Controller.submit_workflow(
self.model.current_workflow(
).server_workflow,
date,
name,
queue,
self.model.current_connection)
except WorkflowError as e:
QtGui.QMessageBox.warning(self,
"Workflow submission error",
"%s" % (e))
return (None, None)
except JobError as e:
QtGui.QMessageBox.warning(self,
"Workflow submission error",
"%s" % (e))
return (None, None)
except ConnectionClosedError as e:
if not self.reconnectAfterConnectionClosed():
return (None, None)
else:
break
# TEST
# self.model.add_workflow(workflow_id,
# date,
# name,
# constants.WORKFLOW_NOT_STARTED,
# self.model.current_workflow().server_workflow)
self.model.add_workflow(workflow.wf_id,
date,
workflow.name,
constants.WORKFLOW_NOT_STARTED,
workflow)
self.updateWorkflowList()
# TEST
# return (workflow_id, self.model.current_resource_id)
return (workflow.wf_id, self.model.current_resource_id)
@QtCore.Slot()
def restart_workflow(self):
queue = None
date = None
if Controller.get_queues(self.model.current_connection):
submission_dlg = QtGui.QDialog(self)
ui = Ui_SubmissionDlg()
ui.setupUi(submission_dlg)
ui.resource_label.setText(self.model.current_resource_id)
if self.model.workflow_name:
ui.lineedit_wf_name.setText(self.model.workflow_name)
else:
ui.lineedit_wf_name.setText(repr(self.model.current_wf_id))
ui.lineedit_wf_name.setEnabled(False)
ui.dateTimeEdit_expiration.setDateTime(
datetime.now() + timedelta(days=5))
submission_dlg.setWindowTitle("Restart")
queues = ["default"]
queues.extend(sorted([q for q in Controller.get_queues(
self.model.current_connection) if q not in (None, 'default')]))
ui.combo_queue.addItems(queues)
previous_queue = self.model.current_workflow().queue
if previous_queue == None:
previous_queue = "default"
index = queues.index(previous_queue)
ui.combo_queue.setCurrentIndex(index)
if submission_dlg.exec() != QtGui.QDialog.Accepted:
return
queue = utf8(ui.combo_queue.currentText())
if queue == "default":
queue = None
qtdt = ui.dateTimeEdit_expiration.dateTime()
date = datetime(
qtdt.date().year(), qtdt.date().month(), qtdt.date().day(),
qtdt.time().hour(), qtdt.time().minute(), qtdt.time().second())
done = False
try:
done = Controller.restart_workflow(self.model.current_wf_id,
queue,
self.model.current_connection)
if done and date != None:
Controller.change_workflow_expiration_date(
self.model.current_wf_id,
date,
self.model.current_connection)
except ConnectionClosedError as e:
pass
except SystemExit as e:
pass
if not done:
QtGui.QMessageBox.warning(self,
"Restart workflow",
"The workflow is already running.")
else:
self.model.restart_current_workflow()
self.model.update()
@QtCore.Slot()
def transferInputFiles(self):
def transfer(self):
try:
self.ui.action_transfer_infiles.setEnabled(False)
Controller.transfer_input_files(
self.model.current_wf_id,
self.model.current_connection,
buffer_size=256 ** 2)
except ConnectionClosedError as e:
self.ui.action_transfer_infiles.setEnabled(True)
pass
except SystemExit as e:
pass
self.ui.action_transfer_infiles.setEnabled(True)
thread = threading.Thread(name="TransferInputFiles",
target=transfer,
args=([self]))
thread.daemon = True
thread.start()
@QtCore.Slot()
def transferOutputFiles(self):
def transfer(self):
try:
self.ui.action_transfer_outfiles.setEnabled(False)
Controller.transfer_output_files(
self.model.current_wf_id,
self.model.current_connection,
buffer_size=256 ** 2)
except ConnectionClosedError as e:
self.ui.action_transfer_outfiles.setEnabled(True)
except SystemExit as e:
pass
self.ui.action_transfer_outfiles.setEnabled(True)
thread = threading.Thread(name="TransferOuputFiles",
target=transfer,
args=([self]))
thread.daemon = True
thread.start()
@QtCore.Slot()
def workflowSelectionChanged(self):
selected_items = self.ui.list_widget_submitted_wfs.selectedItems()
if not selected_items:
return
QtGui.QApplication.setOverrideCursor(QtCore.Qt.WaitCursor)
try:
if use_qvariant:
wf_id = selected_items[0].data(QtCore.Qt.UserRole).toInt()[0]
else:
wf_id = int(selected_items[0].data(QtCore.Qt.UserRole))
if wf_id != NOT_SUBMITTED_WF_ID:
if self.model.is_loaded_workflow(wf_id):
self.model.set_current_workflow(wf_id)
else:
workflow_info_dict = self.model.current_connection.workflows(
[wf_id])
if len(workflow_info_dict) > 0:
# The workflow exist
workflow_status = self.model.current_connection.workflow_status(
wf_id)
workflow_info = workflow_info_dict[wf_id]
self.model.add_workflow(wf_id,
workflow_exp_date=workflow_info[
1],
workflow_name=workflow_info[0],
workflow_status=workflow_status)
else:
self.model.clear_current_workflow()
self.updateWorkflowList()
else:
self.model.clear_current_workflow()
QtGui.QApplication.restoreOverrideCursor()
except ConnectionClosedError as e:
QtGui.QApplication.restoreOverrideCursor()
self.reconnectAfterConnectionClosed()
except Exception as e:
QtGui.QApplication.restoreOverrideCursor()
raise e
@QtCore.Slot(int)
def resourceSelectionChanged(self, index):
if index < 0 or index >= self.ui.combo_resources.count():
index = self.ui.combo_resources.findText(
self.model.current_resource_id)
self.ui.combo_resources.setCurrentIndex(index)
return
resource_id = utf8(str(
self.ui.combo_resources.itemText(index)))
if resource_id == " ":
index = self.ui.combo_resources.findText(
self.model.current_resource_id)
self.ui.combo_resources.setCurrentIndex(index)
return
if self.model.resource_pool.resource_exist(resource_id):
self.model.set_current_connection(resource_id)
return
else:
(resource_id, new_connection) = self.createConnection(resource_id)
if new_connection:
self.model.add_connection(resource_id, new_connection)
def createConnection(self,
resource_id=None,
editable_resource=True,
replace=False):
'''
returns a tuple (resource_id, connection)
'''
self.UpdateLocalparameters()
connection_invalid = True
try_again = True
while connection_invalid or try_again:
connection_dlg = ConnectionDialog(self.login_list,
self.resource_list,
resource_id,
editable_resource)
connection_dlg.setModal(True)
if connection_dlg.exec() != QtGui.QDialog.Accepted:
try_again = False
index = self.ui.combo_resources.findText(
self.model.current_resource_id)
self.ui.combo_resources.setCurrentIndex(index)
break
index = connection_dlg.ui.combo_resources.currentIndex()
resource_id = self.resource_list[index]
if connection_dlg.ui.lineEdit_login.text():
login = str(
connection_dlg.ui.lineEdit_login.text()).encode('utf-8')
else:
login = None
if connection_dlg.ui.lineEdit_password.text():
password = str(
connection_dlg.ui.lineEdit_password.text()).encode('utf-8')
else:
password = None
if connection_dlg.ui.lineEdit_rsa_password.text():
rsa_key_pass = str(
connection_dlg.ui.lineEdit_rsa_password.text()).encode('utf-8')
else:
rsa_key_pass = None
if not replace and resource_id in self.model.resource_pool.resource_ids():
QtGui.QMessageBox.information(
self, "Connection already exists", "The connection to the resource %s already exists." % (resource_id))
return (resource_id, None)
QtGui.QApplication.setOverrideCursor(QtCore.Qt.WaitCursor)
try:
wf_ctrl = Controller.get_connection(
resource_id,
login,
password,
rsa_key_pass,
isolated_light_mode=self.isolated_light_mode)
QtGui.QApplication.restoreOverrideCursor()
except ConfigurationError as e:
QtGui.QApplication.restoreOverrideCursor()
QtGui.QMessageBox.information(
self, "Configuration error", "%s" % (e))
return (resource_id, None)
except Exception as e:
QtGui.QApplication.restoreOverrideCursor()
msg = str(e) + '\n**More details:**\n' + traceback.format_exc()
detailed_critical_message_box(msg=msg,
title="Connection failed",
parent=self)
else:
return (resource_id, wf_ctrl)
return (resource_id, None)
@QtCore.Slot()
def stop_workflow(self):
assert(self.model.current_workflow() and
self.model.current_wf_id != NOT_SUBMITTED_WF_ID)
if self.model.current_workflow().name:
name = self.model.current_workflow().name
else:
name = repr(self.model.current_wf_id)
if self.model.workflow_status != constants.WARNING:
answer = QtGui.QMessageBox.question(
self, "confirmation", "The running jobs will be killed and the jobs in the queue will be removed. \nDo you want to stop the workflow " + name + " anyway?", QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if answer != QtGui.QMessageBox.Yes:
return
stopped_properly = False
while True:
try:
stopped_properly = Controller.stop_workflow(
self.model.current_wf_id,
self.model.current_connection)
except ConnectionClosedError as e:
if not self.reconnectAfterConnectionClosed():
return
else:
if not stopped_properly and \
self.model.current_connection.config.get_mode() != configuration.LIGHT_MODE:
QtGui.QMessageBox.warning(self,
"Stop workflow",
"The workflow was stopped. \n However, some jobs "
"may still be active and burden the computing "
"resource. \n In case of long jobs, please "
"inspect the active jobs (running or in the "
"queue) using the DRMS interface.")
break
self.model.update()
@QtCore.Slot()
def delete_all_workflows(self):
workflows = self.model.workflows(self.model.current_resource_id)
if not workflows:
return
workflow_names = []
for wf_id, (wf_name, exp_date) in workflows.items():
if wf_name:
workflow_names.append(wf_name)
else:
workflow_names.append(str(wf_id))
separator = ", "
names = separator.join(workflow_names)
answer = QtGui.QMessageBox.question(self,
"confirmation",
"Do you want to delete the "
"workflows: \n" + names + "?",
QtGui.QMessageBox.Yes,
QtGui.QMessageBox.No)
if answer != QtGui.QMessageBox.Yes:
return
force = self.ui.check_box_force_delete.isChecked()
while True:
try:
deleled_properly = Controller.delete_all_workflows(
force, self.model.current_connection)
except ConnectionClosedError as e:
if not self.reconnectAfterConnectionClosed():
return
else:
break
if force:
self.model.delete_workflow()
if not deleled_properly and \
self.model.current_connection.config.get_mode() \
!= configuration.LIGHT_MODE:
QtGui.QMessageBox.warning(
self,
"Delete workflow",
"The workflow were deleted. \n However, some jobs "
"may still be active and burden the computing "
"resource. \n In case of long jobs, please "
"inspect the active jobs (running or in the "
"queue) using the DRMS interface.")
self.refreshWorkflowList()
@QtCore.Slot()
def delete_workflow(self):
assert(self.model.current_workflow() and
self.model.current_wf_id != NOT_SUBMITTED_WF_ID)
if self.model.current_workflow().name:
name = self.model.current_workflow().name
else:
name = repr(self.model.current_wf_id)
answer = QtGui.QMessageBox.question(
self, "confirmation", "Do you want to delete the workflow " + name + "?", QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if answer != QtGui.QMessageBox.Yes:
return
force = self.ui.check_box_force_delete.isChecked()
while True:
try:
deleled_properly = Controller.delete_workflow(
self.model.current_wf_id,
force,
self.model.current_connection)
except ConnectionClosedError as e:
if not self.reconnectAfterConnectionClosed():
return
else:
break
if force:
self.model.delete_workflow()
self.updateWorkflowList()
if not deleled_properly and \
self.model.current_connection.config.get_mode() \
!= configuration.LIGHT_MODE:
QtGui.QMessageBox.warning(
self,
"Delete workflow",
"The workflow was deleted. \n However, some jobs "
"may still be active and burden the computing "
"resource. \n In case of long jobs, please "
"inspect the active jobs (running or in the "
"queue) using the DRMS interface.")
@QtCore.Slot()
def change_expiration_date(self):
dlg = QtGui.QDialog(self)
ui = Ui_SubmissionDlg()
ui.setupUi(dlg)
ui.resource_label.setText(self.model.current_resource_id)
if self.model.workflow_name:
ui.lineedit_wf_name.setText(self.model.workflow_name)
else:
ui.lineedit_wf_name.setText(repr(self.model.current_wf_id))
ui.lineedit_wf_name.setReadOnly(True)
ui.lineedit_wf_name.setFrame(False)
ui.dateTimeEdit_expiration.setDateTime(self.model.workflow_exp_date)
dlg.setWindowTitle("Change expiration date")
queue = self.model.current_workflow().queue
if queue == None:
queue = "default queue"
ui.combo_queue.addItem(queue)
ui.combo_queue.setEnabled(False)
if dlg.exec() != QtGui.QDialog.Accepted:
return
qtdt = ui.dateTimeEdit_expiration.dateTime()
date = datetime(
qtdt.date().year(), qtdt.date().month(), qtdt.date().day(),
qtdt.time().hour(), qtdt.time().minute(), qtdt.time().second())
while True:
try:
change_occured = Controller.change_workflow_expiration_date(
self.model.current_wf_id,
date,
self.model.current_connection)
except ConnectionClosedError as e:
if not self.reconnectAfterConnectionClosed():
return
else:
break
if not change_occured:
QtGui.QMessageBox.information(
self, "information",
"The workflow expiration date was not changed.")
else:
self.model.change_expiration_date(date)
self.workflow_info_widget.current_workflow_changed()
@QtCore.Slot()
def currentConnectionChanged(self):
self.model.clear_current_workflow()
self.updateWorkflowList()
if self.model.current_resource_id is not None:
index = self.ui.combo_resources.findText(
self.model.current_resource_id)
self.ui.combo_resources.setCurrentIndex(index)
@QtCore.Slot()
def current_workflow_changed(self):
if not self.model.current_connection:
return
scheduler_type = self.model.current_connection.engine_config_proxy.get_scheduler_type(
)
control_authorized = (scheduler_type != configuration.MPI_SCHEDULER)
if self.model.current_wf_id == None:
# No workflow
self.ui.action_submit.setEnabled(False)
self.ui.action_change_expiration_date.setEnabled(False)
self.ui.action_stop_wf.setEnabled(False)
self.ui.action_restart.setEnabled(False)
self.ui.action_delete_workflow.setEnabled(False)
self.ui.action_transfer_infiles.setEnabled(False)
self.ui.action_transfer_outfiles.setEnabled(False)
self.ui.action_save.setEnabled(False)
self.workflow_info_widget.hide()
self.ui.list_widget_submitted_wfs.clearSelection()
else:
if self.model.current_wf_id == NOT_SUBMITTED_WF_ID:
# Workflow not submitted
self.ui.action_submit.setEnabled(control_authorized)
self.ui.action_change_expiration_date.setEnabled(False)
self.ui.action_delete_workflow.setEnabled(False)
self.ui.action_stop_wf.setEnabled(False)
self.ui.action_restart.setEnabled(False)
self.ui.action_transfer_infiles.setEnabled(False)
self.ui.action_transfer_outfiles.setEnabled(False)
self.workflow_info_widget.show()
self.ui.list_widget_submitted_wfs.clearSelection()
else:
# Submitted workflow
self.ui.action_submit.setEnabled(False)
self.ui.action_change_expiration_date.setEnabled(True)
self.ui.action_delete_workflow.setEnabled(True)
self.ui.action_stop_wf.setEnabled(control_authorized)
self.ui.action_restart.setEnabled(control_authorized)
self.ui.action_transfer_infiles.setEnabled(control_authorized)
self.ui.action_transfer_outfiles.setEnabled(control_authorized)
self.ui.action_save.setEnabled(True)
self.workflow_info_widget.show()
index = None
for i in range(0, self.ui.list_widget_submitted_wfs.count()):
if use_qvariant:
wf_id = self.ui.list_widget_submitted_wfs.item(
i).data(QtCore.Qt.UserRole).toInt()[0]
else:
wf_id = int(
self.ui.list_widget_submitted_wfs.item(i).data(QtCore.Qt.UserRole))
if self.model.current_wf_id == wf_id:
self.ui.list_widget_submitted_wfs.setCurrentRow(i)
break
@QtCore.Slot()
def refreshWorkflowList(self):
self.model.clear_current_workflow()
self.updateWorkflowList(force_not_from_model=True)
def workflow_filter(self, workflows):
'''
Reimplement this function to filter the displayed workflows.
'''
return workflows
@QtCore.Slot()
def update_workflow_status_icons(self):
for i in range(0, self.ui.list_widget_submitted_wfs.count()):
item = self.ui.list_widget_submitted_wfs.item(i)
if use_qvariant:
wf_id = item.data(QtCore.Qt.UserRole).toInt()[0]
else:
wf_id = int(item.data(QtCore.Qt.UserRole))
status = self.model.get_workflow_status(
self.model.current_resource_id, wf_id)
icon_path = workflow_status_icon(status)
if status != constants.WORKFLOW_DONE and icon_path != None:
item.setIcon(QtGui.QIcon(icon_path))
else:
item.setIcon(QtGui.QIcon())
def updateWorkflowList(self, force_not_from_model=False):
if force_not_from_model or \
not self.update_workflow_list_from_model or \
len(self.model.list_workflow_names(
self.model.current_resource_id)) == 0:
while True:
try:
submitted_wf = Controller.get_submitted_workflows(
self.model.current_connection)
except ConnectionClosedError as e:
if not self.reconnectAfterConnectionClosed():
return
else:
break
else:
submitted_wf = self.model.workflows(self.model.current_resource_id)
submitted_wf = self.workflow_filter(submitted_wf)
self.ui.list_widget_submitted_wfs.itemSelectionChanged.disconnect(
self.workflowSelectionChanged)
self.ui.list_widget_submitted_wfs.clear()
wf_id_info = sorted(
submitted_wf.items(),
key=lambda elem: tuple(x if x is not None else ''
for x in elem[1]),
reverse=True)
for wf_id, wf_info in wf_id_info:
workflow_name, expiration_date = wf_info
if not workflow_name:
workflow_name = repr(wf_id)
status = self.model.get_workflow_status(
self.model.current_resource_id, wf_id)
icon_path = workflow_status_icon(status)
if status != constants.WORKFLOW_DONE and icon_path != None:
item = QtGui.QListWidgetItem(QtGui.QIcon(icon_path),
workflow_name,
self.ui.list_widget_submitted_wfs)
else:
item = QtGui.QListWidgetItem(
workflow_name, self.ui.list_widget_submitted_wfs)
item.setData(QtCore.Qt.UserRole, wf_id)
self.ui.list_widget_submitted_wfs.addItem(item)
self.ui.list_widget_submitted_wfs.itemSelectionChanged.connect(
self.workflowSelectionChanged)
#@QtCore.Slot(QtCore.QString)
def reconnectAfterConnectionClosed(self, resource_id=None):
if resource_id == None:
resource_id = self.model.current_resource_id
else:
resource_id = six.ensure_str(resource_id)
answer = QtGui.QMessageBox.question(None,
"Connection closed",
"The connection to " + resource_id +
" closed.\n Do you want to try a reconnection?",
QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if answer == QtGui.QMessageBox.Yes:
(new_resource_id, new_connection) = self.createConnection(
resource_id, editable_resource=False, replace=True)
if new_connection:
self.model.reinit_connection(new_resource_id, new_connection)
return True
self.model.delete_connection(resource_id)
return False
[docs]
class MainWindow(QtGui.QMainWindow):
ui = None
model = None
sw_widget = None
server_widget = None
[docs]
def __init__(self,
model,
user=None,
auto_connect=False,
computing_resource=None,
parent=None,
flags=0,
config_file=None,
db_file=None,
interactive=False,
isolated_light_mode=None):
'''
Parameters
----------
model: ApplicationModel
'''
super().__init__(parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
self.setWindowIcon(QtGui.QIcon(os.path.join(os.path.dirname(__file__),
"icon/soma_workflow_icon.png")))
self.setCorner(
QtCore.Qt.BottomLeftCorner, QtCore.Qt.LeftDockWidgetArea)
self.setCorner(
QtCore.Qt.BottomRightCorner, QtCore.Qt.RightDockWidgetArea)
self.tabifyDockWidget(self.ui.dock_graph, self.ui.dock_plot)
self.setWindowTitle("soma-workflow")
self.model = model
self.model.current_connection_changed.connect(
self.currentConnectionChanged)
self.sw_widget = SomaWorkflowWidget(self.model,
user,
auto_connect,
computing_resource,
self,
flags,
config_file=config_file,
db_file=db_file,
interactive=interactive,
isolated_light_mode
=isolated_light_mode)
if True:
self.mini_widget = SomaWorkflowMiniWidget(self.model,
self.sw_widget,
self)
self.mini_widget.layout().setContentsMargins(0, 0, 0, 0)
self.sw_widget.ui.combo_resources.hide()
self.sw_widget.ui.resource_selection_frame.layout().addWidget(
self.mini_widget)
resourcesWfLayout = QtGui.QVBoxLayout()
resourcesWfLayout.setContentsMargins(2, 2, 2, 2)
resourcesWfLayout.addWidget(self.sw_widget)
self.ui.dockWidgetContents_resource_wf.setLayout(resourcesWfLayout)
self.sw_widget.closing.connect(self.close)
self.treeWidget = WorkflowTree(self.model, parent=self)
treeWidgetLayout = QtGui.QVBoxLayout()
treeWidgetLayout.setContentsMargins(2, 2, 2, 2)
treeWidgetLayout.addWidget(self.treeWidget)
self.ui.centralwidget.setLayout(treeWidgetLayout)
self.itemInfoWidget = WorkflowElementInfo(
model=self.model, proxy_model=self.treeWidget.proxy_model,
parent=self)
itemInfoLayout = QtGui.QVBoxLayout()
itemInfoLayout.setContentsMargins(2, 2, 2, 2)
itemInfoLayout.addWidget(self.itemInfoWidget)
self.ui.dockWidgetContents_intemInfo.setLayout(itemInfoLayout)
self.itemInfoWidget.job_selected.connect(self.treeWidget.select_job)
self.treeWidget.selection_model_changed.connect(
self.itemInfoWidget.setSelectionModel)
self.itemInfoWidget.connection_closed_error.connect(
self.sw_widget.reconnectAfterConnectionClosed)
self.workflowInfoWidget = WorkflowGroupInfo(self.model, self)
wfInfoLayout = QtGui.QVBoxLayout()
wfInfoLayout.setContentsMargins(2, 2, 2, 2)
wfInfoLayout.addWidget(self.workflowInfoWidget)
self.ui.widget_wf_info.setLayout(wfInfoLayout)
self.graphWidget = WorkflowGraphView(self.model, self)
graphWidgetLayout = QtGui.QVBoxLayout()
graphWidgetLayout.setContentsMargins(2,2,2,2)
graphWidgetLayout.addWidget(self.graphWidget)
self.ui.dockWidgetContents_graph.setLayout(graphWidgetLayout)
# no graph for now
#self.ui.dock_graph.hide()
#self.ui.dock_graph.toggleViewAction().setVisible(False)
self.ui.dock_graph.visibilityChanged.connect(
self.graphWidget.graph_visibility_changed)
self.workflowPlotWidget = WorkflowPlot(self.model, parent=self)
plotLayout = QtGui.QVBoxLayout()
plotLayout.setContentsMargins(2, 2, 2, 2)
plotLayout.addWidget(self.workflowPlotWidget)
self.ui.dockWidgetContents_plot.setLayout(plotLayout)
self.workflowPlotWidget.job_selected.connect(
self.treeWidget.select_job)
if not MATPLOTLIB:
self.ui.dock_plot.hide()
self.ui.dock_plot.toggleViewAction().setVisible(False)
self.ui.menu_file.addAction(self.sw_widget.ui.action_open_wf)
self.ui.menu_file.addAction(self.sw_widget.ui.action_save)
self.ui.menu_file.addSeparator()
self.ui.menu_file.addAction(self.sw_widget.ui.action_create_wf_ex)
self.ui.menu_file.addAction(self.sw_widget.ui.action_create_wf_ex)
self.ui.menu_file.addSeparator()
if QtCore.QT_VERSION >= 0x060000:
self.ui.menu_file.addAction(
'Quit',
QtGui.QKeySequence(QtCore.Qt.Key_Q
| QtCore.Qt.ControlModifier),
self.close)
else:
self.ui.menu_file.addAction(
'Quit',
self.close,
QtGui.QKeySequence(QtCore.Qt.Key_Q
| QtCore.Qt.ControlModifier))
self.ui.menu_workflow.addAction(self.sw_widget.ui.action_submit)
self.ui.menu_workflow.addAction(self.sw_widget.ui.action_stop_wf)
self.ui.menu_workflow.addAction(self.sw_widget.ui.action_restart)
self.ui.menu_workflow.addSeparator()
self.ui.menu_workflow.addAction(
self.sw_widget.ui.action_transfer_infiles)
self.ui.menu_workflow.addAction(
self.sw_widget.ui.action_transfer_outfiles)
self.ui.menu_workflow.addSeparator()
self.ui.menu_workflow.addAction(
self.sw_widget.ui.action_delete_workflow)
self.ui.menu_workflow.addAction(self.sw_widget.ui.action_delete_all)
self.ui.menu_workflow.addAction(
self.sw_widget.ui.action_change_expiration_date)
self.ui.menu_view.addAction(self.ui.dock_plot.toggleViewAction())
self.ui.menu_view.addAction(self.ui.dock_graph.toggleViewAction())
self.ui.menu_help.addAction(self.sw_widget.ui.action_about)
self.ui.tool_bar.addAction(self.sw_widget.ui.action_open_wf)
self.ui.tool_bar.addSeparator()
self.ui.tool_bar.addAction(self.sw_widget.ui.action_submit)
self.ui.tool_bar.addAction(self.sw_widget.ui.action_stop_wf)
self.ui.tool_bar.addAction(self.sw_widget.ui.action_restart)
self.ui.tool_bar.addSeparator()
self.ui.tool_bar.addAction(self.sw_widget.ui.action_transfer_infiles)
self.ui.tool_bar.addAction(self.sw_widget.ui.action_transfer_outfiles)
self.ui.tool_bar.addAction(self.sw_widget.ui.actionServer_Management)
self.ui.tool_bar.addSeparator()
# self.showMaximized()
[docs]
def canExit(self):
for res_id in self.model.resource_pool.resource_ids():
connection = self.model.resource_pool.connection(res_id)
for workflow_id in self.model.workflows(res_id):
try:
wf_elements_status = connection.workflow_elements_status(
workflow_id)
except Exception:
continue # workflow deleted
for transfer_info in wf_elements_status[1]:
status = transfer_info[1][0]
if status == constants.TRANSFERING_FROM_CR_TO_CLIENT or \
status == constants.TRANSFERING_FROM_CLIENT_TO_CR:
reply = QtGui.QMessageBox.question(
None,
'Warning!!',
"Files are transfering, "
"If you close it, the workflow will be broken."
"Do you want to close it?",
QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if reply == QtGui.QMessageBox.Yes:
return True
else:
return False
return True
[docs]
@QtCore.Slot()
def currentConnectionChanged(self):
current_resource_id = self.model.current_resource_id
if current_resource_id is None:
current_resource_id = '<none>'
self.setWindowTitle(
"soma-workflow - " + current_resource_id)
[docs]
@QtCore.Slot()
def closeEvent(self, event):
# do stuff
if self.canExit():
event.accept() # let the window close
self.model.resource_pool.delete_all()
else:
event.ignore()
class WorkflowInfoWidget(QtGui.QWidget):
def __init__(self,
model,
assigned_wf_id=None,
assigned_resource_id=None,
parent=None):
super().__init__(parent)
self.ui = Ui_WStatusNameDate()
self.ui.setupUi(self)
self.model = model
self.assigned_wf_id = assigned_wf_id
self.assigned_resource_id = assigned_resource_id
self.model.current_workflow_changed.connect(
self.current_workflow_changed)
self.model.workflow_state_changed.connect(self.update_workflow_status)
def check_workflow(self):
return self.assigned_wf_id == None or \
(self.assigned_wf_id == self.model.current_wf_id and
self.assigned_resource_id == self.model.current_resource_id)
@QtCore.Slot()
def update_workflow_status(self):
if self.check_workflow():
self.update_workflow_status_widgets(self.model.workflow_status,
self.model.current_workflow().queue)
def update_workflow_status_widgets(self, status, queue):
if status == None:
self.ui.wf_status.clear()
self.ui.wf_queue.clear()
else:
self.ui.wf_status.setText(status)
if queue == None:
self.ui.wf_queue.setText("default queue")
else:
self.ui.wf_queue.setText(queue)
icon_file_path = workflow_status_icon(status)
if icon_file_path == None:
pixmap = QtGui.QPixmap()
else:
image = QtGui.QImage(icon_file_path).scaled(30, 30)
pixmap = QtGui.QPixmap.fromImage(image)
self.ui.wf_status_icon.setPixmap(pixmap)
@QtCore.Slot()
def current_workflow_changed(self):
if self.check_workflow():
self.setEnabled(True)
self.ui.wf_name.setEnabled(True)
self.ui.wf_status_icon.setEnabled(True)
self.ui.wf_id.setEnabled(True)
if self.model.current_wf_id == None:
self.ui.wf_name.clear()
self.ui.wf_id.clear()
self.update_workflow_status_widgets(None, None)
self.ui.dateTimeEdit_expiration.setDateTime(datetime.now())
elif self.model.current_wf_id == NOT_SUBMITTED_WF_ID:
if self.model.workflow_name:
self.ui.wf_name.setText(self.model.workflow_name)
else:
self.ui.wf_name.clear()
self.ui.wf_id.clear()
self.ui.wf_status.setText("not submitted")
self.ui.wf_status_icon.setPixmap(QtGui.QPixmap())
self.ui.dateTimeEdit_expiration.setDateTime(
datetime.now() + timedelta(days=5))
else:
if self.model.workflow_name:
self.ui.wf_name.setText(self.model.workflow_name)
else:
self.ui.wf_name.setText(repr(self.model.current_wf_id))
self.ui.wf_id.setText(repr(self.model.current_wf_id))
self.update_workflow_status_widgets(self.model.workflow_status,
self.model.current_workflow().queue)
self.ui.dateTimeEdit_expiration.setDateTime(
self.model.workflow_exp_date)
elif self.assigned_wf_id != None:
self.setEnabled(False)
self.ui.wf_name.setEnabled(False)
self.ui.wf_id.setEnabled(False)
self.ui.wf_status_icon.setEnabled(False)
class SearchWidget(QtGui.QWidget):
workflow_tree = None
def __init__(self, workflow_tree, parent=None):
'''
* workflow_tree *WorkflowTree*
'''
super().__init__(parent)
self.workflow_tree = weakref.ref(workflow_tree)
self.ui = Ui_SearchWidget()
self.ui.setupUi(self)
self.statuses = []
self.statuses.append([])
self.statuses.append([constants.FAILED])
self.statuses.append([constants.RUNNING])
self.statuses.append([constants.QUEUED_ACTIVE])
self.statuses.append([constants.SUBMISSION_PENDING])
self.statuses.append([constants.RUNNING, constants.QUEUED_ACTIVE])
self.statuses.append(
[constants.RUNNING, constants.QUEUED_ACTIVE, constants.SUBMISSION_PENDING])
self.statuses.append([constants.DONE])
self.ui.status_combo_box.addItem("All")
self.ui.status_combo_box.addItem(
QtGui.QIcon(os.path.join(os.path.dirname(__file__), "icon/failed.png")), constants.FAILED)
self.ui.status_combo_box.addItem(
QtGui.QIcon(os.path.join(os.path.dirname(__file__), "icon/running.png")), constants.RUNNING)
self.ui.status_combo_box.addItem(
QtGui.QIcon(os.path.join(os.path.dirname(__file__), "icon/queued.png")), constants.QUEUED_ACTIVE)
self.ui.status_combo_box.addItem(
QtGui.QIcon(os.path.join(os.path.dirname(__file__), "icon/pending.png")), constants.SUBMISSION_PENDING)
self.ui.status_combo_box.addItem(
constants.QUEUED_ACTIVE + " or " + constants.RUNNING)
self.ui.status_combo_box.addItem(
constants.SUBMISSION_PENDING + " or " + constants.QUEUED_ACTIVE + " or " + constants.RUNNING)
self.ui.status_combo_box.addItem(
QtGui.QIcon(os.path.join(os.path.dirname(__file__), "icon/success.png")), constants.DONE)
self.ui.line_edit.textChanged.connect(self.text_changed)
self.ui.expand_button.clicked.connect(
self.workflow_tree().tree_view.expandAll)
self.ui.status_combo_box.currentIndexChanged.connect(
self.status_changed)
@QtCore.Slot(int)
def status_changed(self, index):
if self.workflow_tree().proxy_model != None:
self.workflow_tree().proxy_model.setFilterStatus(
self.statuses[index])
def text_changed(self, text):
if self.workflow_tree().proxy_model != None:
self.workflow_tree().proxy_model.setFilterRegExp(
QtCore.QRegExp(text,
QtCore.Qt.CaseInsensitive,
QtCore.QRegExp.FixedString))
class JobFilterProxyModel(QtGui.QSortFilterProxyModel):
def __init__(self, parent=None):
super().__init__(parent)
self.statuses = []
def setFilterStatus(self, statuses):
self.statuses = statuses
self.invalidateFilter()
def filterAcceptsRow(self, sourceRow, sourceParent):
index = self.sourceModel().index(sourceRow, 0, sourceParent)
if isinstance(index.internalPointer(), GuiGroup):
if len(self.statuses) != 0:
group = index.internalPointer()
if constants.DONE in self.statuses and len(group.done) > 0:
return True
elif constants.FAILED in self.statuses and len(group.failed) > 0:
return True
elif constants.RUNNING in self.statuses and len(group.running) > 0:
return True
elif constants.QUEUED_ACTIVE in self.statuses and len(group.queued) > 0:
return True
elif constants.SUBMISSION_PENDING in self.statuses and len(group.pending) > 0:
return True
else:
return False
return True
elif isinstance(index.internalPointer(), GuiTransfer):
return True
elif isinstance(index.internalPointer(), GuiJob):
if len(self.statuses) != 0:
if constants.FAILED in self.statuses:
job = index.internalPointer()
if job.status == constants.FAILED:
return QtGui.QSortFilterProxyModel.filterAcceptsRow(
self,
sourceRow,
sourceParent)
elif job.status == constants.DONE:
exit_status, exit_value, term_signal, resource_usage = job.exit_info
if exit_status != constants.FINISHED_REGULARLY or exit_value != 0:
return QtGui.QSortFilterProxyModel.filterAcceptsRow(
self,
sourceRow,
sourceParent)
else:
return False
else:
return False
elif constants.DONE in self.statuses:
job = index.internalPointer()
if job.status == constants.DONE:
exit_status, exit_value, term_signal, resource_usage = job.exit_info
if exit_status == constants.FINISHED_REGULARLY and exit_value == 0:
return QtGui.QSortFilterProxyModel.filterAcceptsRow(
self,
sourceRow,
sourceParent)
else:
return False
else:
return False
elif constants.RUNNING in self.statuses:
job = index.internalPointer()
if job.status == constants.RUNNING or job.status == constants.UNDETERMINED:
return QtGui.QSortFilterProxyModel.filterAcceptsRow(
self,
sourceRow,
sourceParent)
if index.internalPointer().status not in self.statuses:
return False
return QtGui.QSortFilterProxyModel.filterAcceptsRow(self,
sourceRow,
sourceParent)
class WorkflowTree(QtGui.QWidget):
'''
'''
assigned_wf_id = None
selection_model_changed = QtCore.Signal(QtGui.QItemSelectionModel)
tree_view = None
# JobFilterProxyModel
proxy_model = None
search_widget = None
def __init__(self,
model,
assigned_wf_id=None,
assigned_resource_id=None,
parent=None):
super().__init__(parent)
self.model = model
self.item_model = None
self.assigned_wf_id = assigned_wf_id
self.assigned_resource_id = assigned_resource_id
self.proxy_model = JobFilterProxyModel(self)
self.model.workflow_state_changed.connect(self.proxy_model.invalidate)
self.tree_view = QtGui.QTreeView(self)
self.tree_view.setHeaderHidden(True)
self.search_widget = SearchWidget(workflow_tree=self, parent=self)
self.vLayout = QtGui.QVBoxLayout(self)
self.vLayout.setContentsMargins(0, 0, 0, 0)
self.vLayout.addWidget(self.tree_view)
self.vLayout.addWidget(self.search_widget)
self.model.current_connection_changed.connect(self.clear)
self.model.current_workflow_about_to_change.connect(
self.currentWorkflowAboutToChange)
self.model.current_workflow_changed.connect(
self.current_workflow_changed)
self.model.workflow_state_changed.connect(self.dataChanged)
# enable customContextMenuRequested signal to be emited
self.tree_view.setContextMenuPolicy(
QtCore.Qt.CustomContextMenu)
self.tree_view.customContextMenuRequested.connect(self.openContextMenu)
def check_workflow(self):
return self.assigned_wf_id == None or \
(self.assigned_wf_id == self.model.current_wf_id and
self.assigned_resource_id == self.model.current_resource_id)
@QtCore.Slot()
def clear(self):
if self.item_model:
if self.assigned_wf_id != None:
self.setEnabled(False)
else:
self.item_model.modelAboutToBeReset.emit()
self.item_model = None
self.tree_view.setModel(None)
@QtCore.Slot()
def currentWorkflowAboutToChange(self):
if self.item_model:
if self.assigned_wf_id != None:
self.setEnabled(False)
else:
self.item_model.modelAboutToBeReset.emit()
self.item_model = None
self.tree_view.setModel(None)
@QtCore.Slot()
def current_workflow_changed(self):
if self.model.current_wf_id != None:
if self.check_workflow():
self.setEnabled(True)
workflow = self.model.current_workflow()
if workflow != None:
self.item_model = WorkflowItemModel(workflow, self)
if self.proxy_model:
self.proxy_model.setSourceModel(self.item_model)
self.tree_view.setModel(self.proxy_model)
else:
self.tree_view.setModel(self.item_model)
self.item_model.modelReset.emit()
self.selection_model_changed.emit(
self.tree_view.selectionModel())
else:
self.setEnabled(False)
elif self.assigned_wf_id != None:
self.setEnabled(False)
@QtCore.Slot()
def dataChanged(self):
if self.item_model:
if self.check_workflow():
self.setEnabled(True)
row = self.item_model.rowCount(QtCore.QModelIndex())
self.item_model.dataChanged.emit(
self.item_model.index(0, 0, QtCore.QModelIndex()),
self.item_model.index(row, 0, QtCore.QModelIndex()))
elif self.assigned_wf_id != None:
self.setEnabled(False)
def select_job(self, job_id):
selection_model = self.tree_view.selectionModel()
model = self.tree_view.model()
found_item = None
item_lists = [QtCore.QModelIndex()]
while item_lists and found_item is None:
parent = item_lists.pop(0)
rows = model.rowCount(parent)
for row in range(rows):
index = model.index(row, 0, parent)
source_index = index
if self.proxy_model is not None:
source_index = model.mapToSource(index)
data = source_index.internalPointer()
if isinstance(data, GuiJob):
if data.job_id == job_id:
found_item = index
break
elif isinstance(data, GuiGroup):
item_lists.append(index)
if found_item is not None:
selection_model.setCurrentIndex(
found_item, QtGui.QItemSelectionModel.SelectCurrent)
def selected_jobs(self, include_groups=True):
selected_items = self.tree_view.selectedIndexes()
if selected_items:
if self.proxy_model is not None:
model = self.tree_view.model()
selected_items = [model.mapToSource(item)
for item in selected_items]
selected_items = [item.internalPointer() for item in selected_items]
if include_groups:
classes = (GuiJob, GuiGroup)
else:
classes = GuiJob
selected_jobs = [item for item in selected_items
if isinstance(item, classes)]
return selected_jobs
return []
def openContextMenu(self, point):
# check that the workflow is running (or stopped?)
wf_status = self.model.get_workflow_status(
self.model.current_resource_id, self.model.current_wf_id)
if wf_status != constants.WORKFLOW_IN_PROGRESS:
print('workflow not running.')
return
selected_jobs = self.selected_jobs()
if selected_jobs:
popup = QtGui.QMenu()
stop = popup.addAction('Stop jobs', self.stop_selected_jobs)
restart = popup.addAction('Restart jobs',
self.restart_selected_jobs)
popup.exec(QtGui.QCursor.pos())
def _expand_groups(self, jobs_groups):
expanded = []
todo = list(jobs_groups)
while todo:
item = todo.pop(0)
if isinstance(item, GuiJob):
expanded.append(item)
elif isinstance(item, GuiGroup):
todo += [item.gui_workflow.items[child]
for child in item.children]
return expanded
def stop_selected_jobs(self):
selected_jobs = self._expand_groups(self.selected_jobs())
if selected_jobs:
reply = QtGui.QMessageBox.question(
None,
'Warning!!',
"Stop selected jobs and dependencies ?",
QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if reply == QtGui.QMessageBox.Yes:
Controller.stop_jobs(self.model.current_wf_id,
[j.job_id for j in selected_jobs],
self.model.current_connection)
return True
else:
return False
def restart_selected_jobs(self):
selected_jobs = self._expand_groups(self.selected_jobs())
if selected_jobs:
reply = QtGui.QMessageBox.question(
None,
'Warning!!',
"Stop/restart selected jobs and dependencies ?",
QtGui.QMessageBox.Yes, QtGui.QMessageBox.No)
if reply == QtGui.QMessageBox.Yes:
Controller.restart_jobs(self.model.current_wf_id,
[j.job_id for j in selected_jobs],
self.model.current_connection)
return True
else:
return False
class WorkflowGroupInfo(QtGui.QWidget):
def __init__(self, model, parent=None):
super().__init__(parent)
self.infoWidget = None
self.vLayout = QtGui.QVBoxLayout(self)
self.model = model
self.model.current_connection_changed.connect(self.clear)
self.model.current_workflow_about_to_change.connect(self.clear)
self.model.current_workflow_changed.connect(
self.current_workflow_changed)
self.model.workflow_state_changed.connect(self.dataChanged)
@QtCore.Slot()
def clear(self):
if self.infoWidget:
self.infoWidget.hide()
self.vLayout.removeWidget(self.infoWidget)
self.infoWidget = None
self.dataChanged()
@QtCore.Slot()
def current_workflow_changed(self):
if self.infoWidget:
self.infoWidget.hide()
self.vLayout.removeWidget(self.infoWidget)
if self.model.current_workflow():
self.infoWidget = GroupInfoWidget(
self.model.current_workflow().root_item, self)
if self.infoWidget:
self.vLayout.addWidget(self.infoWidget)
self.update()
@QtCore.Slot()
def dataChanged(self):
if self.infoWidget:
self.infoWidget.dataChanged()
class WorkflowPlot(QtGui.QWidget):
job_selected = QtCore.Signal(int)
def __init__(self,
model,
assigned_wf_id=None,
assigned_resource_id=None,
parent=None):
super().__init__(parent)
self.plotWidget = None
self.vLayout = QtGui.QVBoxLayout(self)
self.model = model
self.assigned_wf_id = assigned_wf_id
self.assigned_resource_id = assigned_resource_id
if MATPLOTLIB:
self.model.current_connection_changed.connect(self.clear)
self.model.current_workflow_about_to_change.connect(self.clear)
self.model.current_workflow_changed.connect(
self.current_workflow_changed)
self.model.workflow_state_changed.connect(self.dataChanged)
def check_workflow(self):
return self.assigned_wf_id == None or \
(self.assigned_wf_id == self.model.current_wf_id and
self.assigned_resource_id == self.model.current_resource_id)
@QtCore.Slot()
def clear(self):
if self.assigned_wf_id != None:
self.setEnabled(False)
else:
if self.plotWidget:
self.plotWidget.hide()
self.vLayout.removeWidget(self.plotWidget)
self.plotWidget = None
self.dataChanged()
@QtCore.Slot()
def current_workflow_changed(self):
if self.check_workflow():
self.setEnabled(True)
if self.plotWidget:
self.plotWidget.hide()
self.vLayout.removeWidget(self.plotWidget)
if self.model.current_workflow():
self.plotWidget = PlotView(
self.model.current_workflow().root_item, self)
self.plotWidget.job_selected.connect(self.job_selected)
if self.plotWidget:
self.vLayout.addWidget(self.plotWidget)
self.update()
elif self.assigned_wf_id != None:
self.setEnabled(False)
@QtCore.Slot()
def dataChanged(self):
if self.plotWidget:
if self.check_workflow():
self.setEnabled(True)
self.plotWidget.dataChanged()
elif self.assigned_wf_id != None:
self.setEnabled(False)
class WorkflowElementInfo(QtGui.QWidget):
# QtGui.QAbstractProxyModel used in the widget providing the selection
# model
proxy_model = None
connection_closed_error = QtCore.Signal()
job_selected = QtCore.Signal(int)
def __init__(self, model, proxy_model=None, parent=None):
super().__init__(parent)
self.selectionModel = None
self.infoWidget = None
self.model = model # used to update stderr and stdout only
self.proxy_model = proxy_model
self.job_info_current_tab = 0
self.model.workflow_state_changed.connect(self.dataChanged)
self.model.current_connection_changed.connect(self.clear)
self.model.current_workflow_changed.connect(self.clear)
self.vLayout = QtGui.QVBoxLayout(self)
@QtCore.Slot(QtGui.QItemSelectionModel)
def setSelectionModel(self, selectionModel):
if self.selectionModel:
self.selectionModel.currentChanged[QtCore.QModelIndex,
QtCore.QModelIndex].disconnect(
self.currentChanged)
self.selectionModel = selectionModel
self.selectionModel.currentChanged[QtCore.QModelIndex,
QtCore.QModelIndex].connect(
self.currentChanged)
if self.infoWidget:
self.infoWidget.hide()
self.vLayout.removeWidget(self.infoWidget)
self.infoWidget = None
@QtCore.Slot()
def clear(self):
if self.infoWidget:
self.infoWidget.hide()
self.vLayout.removeWidget(self.infoWidget)
self.infoWidget = None
self.dataChanged()
@QtCore.Slot(QtCore.QModelIndex, QtCore.QModelIndex)
def currentChanged(self, current, previous):
#print('WorkflowElementInfo.currentChanged')
if self.infoWidget:
if isinstance(self.infoWidget, JobInfoWidget):
self.job_info_current_tab = self.infoWidget.currentIndex()
self.infoWidget.hide()
self.vLayout.removeWidget(self.infoWidget)
if self.proxy_model != None:
current = self.proxy_model.mapToSource(current)
item = current.internalPointer()
if isinstance(item, GuiJob):
self.infoWidget \
= JobInfoWidget(item,
weakref.proxy(self.model.current_connection),
self.job_info_current_tab,
self)
self.infoWidget.source_job_selected.connect(self.job_selected)
elif isinstance(item, GuiTransfer):
self.infoWidget = TransferInfoWidget(item, self)
elif isinstance(item, GuiGroup):
self.infoWidget = GroupInfoWidget(item, self)
else:
self.infoWidget = None
if self.infoWidget:
self.vLayout.addWidget(self.infoWidget)
self.update()
@QtCore.Slot()
def dataChanged(self):
if self.infoWidget:
self.infoWidget.dataChanged()
#
# VIEWS #########################
#
class JobInfoWidget(QtGui.QTabWidget):
source_job_selected = QtCore.Signal(int)
def __init__(self,
job_item,
connection,
current_tab_index=0,
parent=None):
super().__init__(parent)
self.ui = Ui_JobInfo()
self.ui.setupUi(self)
self.job_item = job_item
self.connection = connection
self.tab_index = {i: i for i in range(6)}
if not job_item.data or (not job_item.data.has_outputs
and not job_item.data.param_dict):
self.removeTab(4)
self.tab_index[4] = -1
self.tab_index[5] = 4
if current_tab_index == 4:
current_tab_index = 0
else:
if not job_item.data.param_dict:
self.ui.input_params_contents.hide()
self.ui.input_params_label.hide()
elif not job_item.data.has_outputs:
self.ui.output_params_contents.hide()
self.ui.output_params_label.hide()
if job_item.data.param_dict:
table = self.ui.input_params_contents
table.setRowCount(len(job_item.data.param_dict))
table.horizontalHeader().setSectionResizeMode(
QtGui.QHeaderView.ResizeToContents)
if job_item.gui_workflow():
workflow = job_item.gui_workflow().server_workflow
else:
workflow = None
job_mapping = getattr(workflow, 'job_mapping', None)
engine_wf = True
if job_mapping is None:
# workflow loaded but not submitted
engine_wf = False
job_mapping = {}
for item_id, item \
in job_item.gui_workflow().items.items():
if isinstance(item, GuiJob):
job_mapping[item.data] = item.it_id
for i, param_name in \
enumerate(sorted(job_item.data.param_dict.keys())):
value = job_item.data.param_dict[param_name]
table.setItem(i, 0, QtGui.QTableWidgetItem(param_name))
table.setItem(i, 1, QtGui.QTableWidgetItem(repr(value)))
if workflow:
links = workflow.param_links.get(job_item.data)
if links:
linkl = links.get(param_name, [])
if linkl:
linkstr = []
job_ids = []
for link in linkl:
linkstr.append(
f'{link[0].name}.{link[1]}')
if engine_wf:
job_ids.append(job_mapping.get(
link[0], link[0]).job_id)
else:
job_ids.append(job_mapping.get(
link[0]))
linkstr = ', '.join(linkstr)
item = QtGui.QTableWidgetItem(linkstr)
item.job_ids = job_ids
table.setItem(i, 2, item)
table.cellDoubleClicked.connect(
self.input_param_double_clicked)
self.setCurrentIndex(current_tab_index)
self.dataChanged()
if current_tab_index == self.tab_index[4]:
self.refresh_params()
elif current_tab_index in (self.tab_index[1], self.tab_index[2]):
self.refreshStdErrOut()
elif current_tab_index == self.tab_index[5]:
self.refresh_envar()
self.currentChanged.connect(self.currentTabChanged)
self.ui.stderr_refresh_button.clicked.connect(self.refreshStdErrOut)
self.ui.stdout_refresh_button.clicked.connect(self.refreshStdErrOut)
self.ui.params_refresh_button.clicked.connect(self.refresh_params)
self.ui.envar_copy_btn.clicked.connect(self.copy_envars)
def dataChanged(self):
# reread command if needed
if self.ui.job_status.text() == '' \
or (self.ui.job_status.text() == constants.NOT_SUBMITTED
and self.job_item.status != self.ui.job_status.text()):
self.job_item.update_job_command(self.connection)
setLabelFromString(self.ui.job_name, self.job_item.name)
setLabelFromString(self.ui.job_status, self.job_item.status)
exit_status, exit_value, term_signal, resource_usage = self.job_item.exit_info
setLabelFromString(self.ui.exit_status, exit_status)
setLabelFromInt(self.ui.exit_value, exit_value)
setLabelFromString(self.ui.term_signal, term_signal)
setTextEditFromString(self.ui.command, self.job_item.command)
setLabelFromInt(self.ui.priority, self.job_item.priority)
setLabelFromString(self.ui.queue, self.job_item.queue)
if resource_usage:
if six.PY3 and isinstance(resource_usage, bytes):
resource_usage = resource_usage.decode()
self.ui.resource_usage.clear()
self.ui.resource_usage.insertItems(0, resource_usage.split())
else:
self.ui.resource_usage.clear()
setLabelFromDateTime(
self.ui.submission_date, self.job_item.submission_date)
setLabelFromDateTime(
self.ui.execution_date, self.job_item.execution_date)
setLabelFromDateTime(self.ui.ending_date, self.job_item.ending_date)
setLabelFromInt(self.ui.job_id, self.job_item.job_id)
setLabelFromString(self.ui.drms_job_id, self.job_item.drmaa_id)
if self.job_item.submission_date:
if self.job_item.execution_date:
time_in_queue = self.job_item.execution_date - \
self.job_item.submission_date
setLabelFromTimeDelta(self.ui.time_in_queue, time_in_queue)
if self.job_item.ending_date:
execution_time = self.job_item.serial_duration
setLabelFromTimeDelta(
self.ui.execution_time, execution_time)
setTextEditFromString(
self.ui.stdout_file_contents, self.job_item.stdout)
setTextEditFromString(
self.ui.stderr_file_contents, self.job_item.stderr)
@QtCore.Slot(int)
def currentTabChanged(self, index):
if index == self.tab_index[0]:
try:
self.job_item.update_job_command(self.connection)
except ConnectionClosedError as e:
self.parent.connection_closed_error[()].emit()
else:
self.dataChanged()
elif (index == self.tab_index[1] or index == self.tab_index[2]) \
and self.job_item.stdout == "":
try:
self.job_item.updateStdOutErr(self.connection)
except ConnectionClosedError as e:
self.parent.connection_closed_error[()].emit()
else:
self.dataChanged()
elif index == self.tab_index[4]:
self.refresh_params()
elif index == self.tab_index[5]:
self.refresh_envar()
@QtCore.Slot()
def refreshStdErrOut(self):
try:
self.job_item.updateStdOutErr(self.connection)
except ConnectionClosedError as e:
self.parent().connection_closed_error[()].emit()
self.dataChanged()
@QtCore.Slot()
def refresh_params(self):
if self.job_item.data is None:
return
try:
self.job_item.update_job_params(self.connection)
except ConnectionClosedError as e:
self.parent().connection_closed_error[()].emit()
return
if self.job_item.data.param_dict:
table = self.ui.input_params_contents
param_dict = self.job_item.data.param_dict
for row in range(table.rowCount()):
param_name = table.item(row, 0).text()
if param_name in param_dict:
value = param_dict[param_name]
table.setItem(row, 1, QtGui.QTableWidgetItem(repr(value)))
if self.job_item.data.has_outputs:
table = self.ui.output_params_contents
table.clearContents()
output_params = getattr(self.job_item, 'output_params', None)
if output_params is not None:
table.setRowCount(len(output_params))
table.horizontalHeader().setSectionResizeMode(
QtGui.QHeaderView.ResizeToContents)
for row, param_name in enumerate(sorted(output_params.keys())):
value = output_params[param_name]
table.setItem(row, 0, QtGui.QTableWidgetItem(param_name))
table.setItem(row, 1, QtGui.QTableWidgetItem(repr(value)))
@QtCore.Slot(int, int)
def input_param_double_clicked(self, row, col):
if col == 2:
item = self.ui.input_params_contents.item(row, col)
job_ids = getattr(item, 'job_ids', None)
if job_ids:
self.source_job_selected.emit(job_ids[0])
def get_envars(self):
if self.job_item.data is None:
self.ui.envar_table.clear()
return {}
env = {}
gui_w = self.job_item.gui_workflow()
if gui_w:
workflow = gui_w.server_workflow
else:
workflow = None
if workflow:
job_mapping = getattr(workflow, 'job_mapping', None)
if job_mapping:
ejob = job_mapping[self.job_item.data]
in_param_file = ejob.plain_input_params_file()
if in_param_file:
env['SOMAWF_INPUT_PARAMS'] = in_param_file
out_param_file = ejob.plain_output_params_file()
if out_param_file:
env['SOMAWF_OUTPUT_PARAMS'] = out_param_file
if ejob.env:
env.update(ejob.env)
if self.job_item.data.env:
env.update(self.job_item.data.env)
return env
@QtCore.Slot()
def refresh_envar(self):
env = self.get_envars()
table = self.ui.envar_table
table.clearContents()
table.setRowCount(len(env))
row = 0
for var, value in env.items():
table.setItem(row, 0, QtGui.QTableWidgetItem(var))
table.setItem(row, 1, QtGui.QTableWidgetItem(value))
row += 1
table.horizontalHeader().setSectionResizeMode(
QtGui.QHeaderView.ResizeToContents)
def copy_envars(self):
def _repl(s):
sl = [c if c != '"' else '\\"' for c in s]
return ''.join(sl)
env = self.get_envars()
txt_env = ' '.join([f'{k}="{_repl(v)}"' for k, v in env.items()])
clipboard = QtGui.qApp.clipboard()
clipboard.setText(txt_env)
class TransferInfoWidget(QtGui.QTabWidget):
def __init__(self, transfer_item, parent=None):
super().__init__(parent)
self.ui = Ui_TransferInfo()
self.ui.setupUi(self)
self.transfer_item = transfer_item
self.dataChanged()
def dataChanged(self):
setLabelFromString(self.ui.transfer_name, self.transfer_item.name)
setLabelFromString(self.ui.transfer_status,
self.transfer_item.transfer_status)
if isinstance(self.transfer_item.data, FileTransfer):
setLabelFromString(self.ui.client_path,
self.transfer_item.data.client_path)
if self.transfer_item.data.client_paths:
self.ui.client_paths.insertItems(0,
self.transfer_item.data.client_paths)
else:
self.ui.client_paths.clear()
elif isinstance(self.transfer_item.data, TemporaryPath):
if self.transfer_item.data.is_directory:
setLabelFromString(
self.ui.client_path, '<temporary directory>')
else:
setLabelFromString(self.ui.client_path, '<temporary file>')
self.ui.client_paths.clear()
setLabelFromString(self.ui.cr_path, self.transfer_item.engine_path)
class GroupInfoWidget(QtGui.QWidget):
def __init__(self, group_item, parent=None):
super().__init__(parent)
self.ui = Ui_GroupInfo()
self.ui.setupUi(self)
self.group_item = group_item
self.dataChanged()
def dataChanged(self):
# job_nb = len(self.group_item.not_sub) + len(self.group_item.done) +
# len(self.group_item.failed) + len(self.group_item.running) +
# len(self.group_item.warning) + len(self.group_item.queued)
ended_job_nb = len(self.group_item.done) + len(self.group_item.failed)
total_time = None
if self.group_item.first_sub_date != datetime.max:
if self.group_item.first_sub_date:
if self.group_item.last_end_date:
total_time = self.group_item.last_end_date - \
self.group_item.first_sub_date
else:
total_time = datetime.now() - \
self.group_item.first_sub_date
# input_file_nb = len(self.group_item.input_to_transfer) + len(self.group_item.input_transfer_ended)
# ended_input_transfer_nb = len(self.group_item.input_transfer_ended)
setLabelFromString(self.ui.status, self.group_item.status)
setLabelFromInt(self.ui.job_nb, self.group_item.job_count)
setLabelFromInt(self.ui.ended_job_nb, ended_job_nb)
setLabelFromTimeDelta(self.ui.total_time, total_time)
setLabelFromTimeDelta(
self.ui.theoretical_serial_time, self.group_item.theoretical_serial_time)
# setLabelFromInt(self.ui.input_file_nb, input_file_nb)
# setLabelFromInt(self.ui.ended_input_transfer_nb,
# ended_input_transfer_nb)
# if self.group_item.input_to_transfer:
# self.ui.comboBox_input_to_transfer.insertItems(0, self.group_item.input_to_transfer)
# else:
# self.ui.comboBox_input_to_transfer.clear()
# if self.group_item.output_ready:
# self.ui.comboBox_output_to_transfer.insertItems(0, self.group_item.output_ready)
# else:
# self.ui.comboBox_output_to_transfer.clear()
class PlotView(QtGui.QWidget):
job_selected = QtCore.Signal(int)
def __init__(self, group_item, parent=None):
super().__init__(parent)
self.ui = Ui_PlotWidget()
self.ui.setupUi(self)
self.vlayout = QtGui.QVBoxLayout()
self.ui.frame_plot.setLayout(self.vlayout)
self.ui.combo_plot_type.addItems(["jobs fct time",
"jobs+cpu fct time",
"nb jobs fct time",
"nb cpu fct time"])
self.ui.combo_plot_type.setCurrentIndex(0)
self.plot_type = 0
self.canvas = None
self.group_item = group_item
self.updatePlot()
self.ui.combo_plot_type.currentIndexChanged.connect(
self.plotTypeChanged)
self.ui.button_refresh.clicked.connect(self.refresh)
def __del__(self):
if self.canvas is not None:
del self.axes
matplotlib.pyplot.close(self.figure)
self.canvas.setAttribute(QtCore.Qt.WA_DeleteOnClose, True)
self.canvas.close()
def update_jobs(self):
self.jobs = list(self.group_item.done)
self.jobs.extend(self.group_item.failed)
self.jobs.extend(self.group_item.running)
self.jobs.extend(self.group_item.not_sub)
self.jobs = sorted(self.jobs, key=self.sortkey)
def sortkey(self, j):
if j.execution_date:
return j.execution_date
else:
return datetime.max
@QtCore.Slot(int)
def plotTypeChanged(self, index):
self.plot_type = index
self.updatePlot()
QtCore.Slot()
def refresh(self):
self.updatePlot()
def dataChanged(self):
if self.ui.checkbox_auto_update.isChecked():
self.updatePlot()
def updatePlot(self):
if not self.isVisible():
return
self.update_jobs()
if self.plot_type in (0, 1):
self.jobsFctTime()
if self.plot_type in (2, 3):
self.nbProcFctTime()
def jobsFctTime(self):
if self.canvas:
self.axes.clear()
# self.canvas.hide()
# self.vlayout.removeWidget(self.canvas)
# matplotlib.pyplot.close( self.figure )
# self.canvas.setAttribute( QtCore.Qt.WA_DeleteOnClose, True )
# self.canvas.close()
# self.canvas = None
else:
self.figure = Figure()
self.axes = self.figure.add_subplot(111)
if int(matplotlib.__version__.split('.')[0]) <= 2:
self.axes.hold(True)
self.canvas = FigureCanvas(self.figure)
try:
self.canvas.setParent(self)
except TypeError as e:
print("WARNING: The error might come from a mismatch between the matplotlib qt4 backend and the one used by soma.workflow "
+ repr(QT_BACKEND))
return
self.canvas.updateGeometry()
self.vlayout.addWidget(self.canvas)
self._pick = self.canvas.mpl_connect('pick_event',
self._jobs_mouse_press)
def key(j):
if j.execution_date:
return j.execution_date
else:
return datetime.max
#self.jobs = sorted(self.jobs, key=self.sortkey)
nb_jobs = 0
x_min = datetime.max
x_max = datetime.min
n = 0
#cols = matplotlib.rcParams['axes.prop_cycle']
cols = [[0.0, 0.0, 1.0],
[0.0, 0.0, 0.5],
[0.0, 0.36470588235294116, 0.7725490196078432],
[0.0, 0.18235294117647058, 0.3862745098039216],
[0.0, 0.7098039215686275, 0.5529411764705883],
[0.0, 0.35490196078431374, 0.27647058823529413],
[0.0, 0.9803921568627451, 0.35294117647058826],
[0.0, 0.49019607843137253, 0.17647058823529413],
[0.23529411764705882, 1.0, 0.21568627450980393],
[0.11764705882352941, 0.5, 0.10784313725490197],
[0.5725490196078431, 1.0, 0.11764705882352941],
[0.28627450980392155, 0.5, 0.058823529411764705],
[0.8901960784313725, 1.0, 0.0392156862745098],
[0.44509803921568625, 0.5, 0.0196078431372549],
[1.0, 0.9882352941176471, 0.0],
[0.5, 0.49411764705882355, 0.0],
[1.0, 0.7294117647058823, 0.0],
[0.5, 0.36470588235294116, 0.0],
[1.0, 0.38823529411764707, 0.0],
[0.5, 0.19411764705882353, 0.0],
[1.0, 0.00392156862745098, 0.0],
[0.5, 0.00196078431372549, 0.0]]
# darken a bit
cols = [[x*0.75 for x in c] for c in cols]
for j in self.jobs:
ncpu = 1
if self.plot_type == 1 and j.parallel_job_info:
ncpu = j.parallel_job_info.get('cpu_per_node', 1) \
* j.parallel_job_info.get('nodes_number', 1)
if j.execution_date:
nc = len(cols)
#for c in cols[n % nc:(n % nc) + 1]:
#col = c['color']
#break
col = cols[n % nc]
n += 1
nb_jobs = nb_jobs + 1
if j.execution_date < x_min:
x_min = j.execution_date
kwargs = {'picker': True, 'pickradius': 2}
if j.ending_date:
if ncpu == 1:
self.axes.plot(
[j.execution_date, j.ending_date],
[nb_jobs, nb_jobs], color=col, **kwargs)
# link to job
self.axes.lines[-1].job = j.job_id
else:
self.axes.fill(
[j.execution_date, j.ending_date,
j.ending_date, j.execution_date],
[nb_jobs, nb_jobs, nb_jobs + ncpu - 1,
nb_jobs + ncpu - 1], color=col, **kwargs)
# link to job
self.axes.patches[-1].job = j.job_id
if j.ending_date > x_max:
x_max = j.ending_date
else:
if ncpu == 1:
self.axes.plot(
[j.execution_date, datetime.now()],
[nb_jobs, nb_jobs], color=col, **kwargs)
# link to job
self.axes.lines[-1].job = j.job_id
else:
now = datetime.now()
self.axes.fill(
[j.execution_date, now, now, j.execution_date],
[nb_jobs, nb_jobs, nb_jobs + ncpu - 1,
nb_jobs + ncpu - 1], color=col, **kwargs)
# link to job
self.axes.patches[-1].job = j.job_id
nb_jobs += ncpu - 1
#print('njobs:', n, 'nb_jobs:', nb_jobs)
if nb_jobs:
self.axes.set_ylim(0, nb_jobs + 1)
# self.axes.set_xlabel("Time")
locator = mdates.AutoDateLocator(minticks=3)
# locator = mdates.MinuteLocator(interval=2)
self.axes.xaxis.set_major_locator(locator)
self.axes.xaxis.set_major_formatter(mdates.ConciseDateFormatter(
locator))
if self.plot_type == 1:
self.axes.set_ylabel("CPUs")
else:
self.axes.set_ylabel("Jobs")
self.figure.autofmt_xdate(rotation=80)
self.canvas.draw()
self.update()
def nbProcFctTime(self):
if self.canvas:
self.axes.clear()
# self.canvas.hide()
# self.vlayout.removeWidget(self.canvas)
# self.canvas = None
else:
self.figure = Figure()
self.axes = self.figure.add_subplot(111)
if int(matplotlib.__version__.split('.')[0]) <= 2:
self.axes.hold(True)
self.canvas = FigureCanvas(self.figure)
self.canvas.setParent(self)
self.canvas.updateGeometry()
self.vlayout.addWidget(self.canvas)
dates = []
nb_process_running = []
infos = [] # sequence of tuple (job_item, start, date, ncpu)
# start is a bolean
# if start then date is the execution date
# else date is the ending date
for job_item in self.jobs:
ncpu = 1
if self.plot_type == 3 and job_item.parallel_job_info:
ncpu = job_item.parallel_job_info.get('cpu_per_node', 1) \
* job_item.parallel_job_info.get('nodes_number', 1)
if job_item.execution_date:
infos.append((job_item, True, job_item.execution_date, ncpu))
if job_item.ending_date:
infos.append((job_item, False, job_item.ending_date, ncpu))
else:
infos.append((job_item, False, datetime.now(), ncpu))
infos = sorted(infos, key=lambda info_elem: info_elem[2])
nb_process = 0
previous = None
for info_elem in infos:
ncpu = info_elem[3]
if previous and info_elem[2] == previous[2]:
if info_elem[1]:
nb_process = nb_process + ncpu
else:
nb_process = nb_process - ncpu
nb_process_running[len(nb_process_running) - 1] = nb_process
else:
dates.append(info_elem[2])
if info_elem[1]:
nb_process = nb_process + ncpu
else:
nb_process = nb_process - ncpu
nb_process_running.append(nb_process)
previous = info_elem
nb_proc_max = 0
for i in range(1, len(nb_process_running)):
self.axes.fill_between([dates[i - 1], dates[i]], [
nb_process_running[i - 1], nb_process_running[i - 1]], y2=0, edgecolor='b')
if nb_process_running[i] > nb_proc_max:
nb_proc_max = nb_process_running[i]
if nb_proc_max != 0:
self.axes.set_ylim(0, nb_proc_max + 1)
# self.axes.set_xlabel("Time")
locator = mdates.AutoDateLocator(minticks=3)
# locator = mdates.MinuteLocator() #interval=10)
self.axes.xaxis.set_major_locator(locator)
self.axes.xaxis.set_major_formatter(mdates.ConciseDateFormatter(
locator))
if self.plot_type == 3:
self.axes.set_ylabel("Nb of CPU")
else:
self.axes.set_ylabel("Nb of jobs")
self.figure.autofmt_xdate(rotation=80)
self.canvas.draw()
self.update()
def _jobs_mouse_press(self, event):
'''matplotlib callback for picker event
'''
fig = event.canvas.figure
artist = event.artist
job = getattr(artist, 'job', None)
if job is not None:
self.job_selected.emit(job)
class WorkflowGraphView(QtGui.QWidget):
def __init__(self, model=None, parent=None):
super().__init__(parent)
self.ui = Ui_GraphWidget()
self.ui.setupUi(self)
self.workflow = None
self.connection = None
self.draw_enabled = False
self._data_changed = False
self.image_label = QtGui.QLabel(self)
self.image_label.setBackgroundRole(QtGui.QPalette.Base)
self.image_label.setSizePolicy(
QtGui.QSizePolicy.Ignored, QtGui.QSizePolicy.Ignored)
self.image_label.setScaledContents(True)
self.ui.scrollArea.setBackgroundRole(QtGui.QPalette.Dark)
# self.ui.scrollArea.setWidget(self.image_label)
self.ui.scrollArea.setWidgetResizable(False)
self.ui.zoom_slider.setRange(10, 200)
self.ui.zoom_slider.sliderMoved.connect(self.zoomChanged)
self.ui.zoom_slider.setValue(100)
self.scale_factor = 1.0
self.ui.adjust_size_checkBox.stateChanged.connect(
self.adjustSizeChanged)
self.ui.button_refresh.clicked.connect(self.refresh)
self.model = model
if model is not None:
self.model.current_workflow_changed.connect(
self.current_workflow_changed)
self.model.current_connection_changed.connect(
self.current_workflow_changed)
@QtCore.Slot()
def current_workflow_changed(self):
if self.model is not None:
gui_workflow = self.model.current_workflow()
if gui_workflow is None:
workflow = None
else:
workflow = gui_workflow.server_workflow
self.setWorkflow(workflow,
self.model.current_connection)
self.refresh()
else:
self.setWorkflow(None, None)
def setWorkflow(self, workflow, connection):
self.workflow = workflow
self.connection = connection
self.dataChanged()
def clear(self):
self.workflow = None
self.dataChanged()
@QtCore.Slot(int)
def zoomChanged(self, percentage):
self.scale_factor = percentage / 100.0
if self.workflow:
if self.image_label.pixmap():
self.image_label.resize(
self.image_label.pixmap().size() * self.scale_factor)
@QtCore.Slot(int)
def adjustSizeChanged(self, state):
if self.ui.adjust_size_checkBox.isChecked():
pass
# TBI
@QtCore.Slot()
def refresh(self):
self.dataChanged(force=True)
@QtCore.Slot()
def dataChanged(self, force=False):
if not self.draw_enabled:
self._data_changed = True
return
if self.workflow and (force or
self.ui.checkbox_auto_update.isChecked()):
self.rebuild_graph()
else:
self.ui.scrollArea.takeWidget()
self._data_changed = False
def rebuild_graph(self):
image_file_path = self.printWorkflow()
if image_file_path is not None:
image = QtGui.QImage(image_file_path)
pixmap = QtGui.QPixmap.fromImage(image)
self.image_label.setPixmap(pixmap)
self.ui.scrollArea.setWidget(self.image_label)
self.image_label.resize(
self.image_label.pixmap().size() * self.scale_factor)
self._data_changed = False
def graph_visibility_changed(self, visible):
self.draw_enabled = visible
if visible and self._data_changed:
self.rebuild_graph()
def printWorkflow(self):
import tempfile
if not shutil.which("dot"):
print("Unable to print workflow because dot executable is not",
"available.", file = sys.stderr)
return None
output_dir = tempfile.gettempdir()
GRAY = "\"#C8C8B4\""
BLUE = "\"#00C8FF\""
RED = "\"#FF6432\""
GREEN = "\"#9BFF32\""
LIGHT_BLUE = "\"#C8FFFF\""
names = dict()
current_id = 0
dot_file_path = os.path.join(output_dir, "tmp.dot")
graph_file_path = os.path.join(output_dir, "tmp.png")
if dot_file_path and os.path.isfile(dot_file_path):
os.remove(dot_file_path)
file = open(dot_file_path, "w")
print("digraph G {", file=file)
for node in self.workflow.jobs:
current_id = current_id + 1
names[node] = ("node" + repr(current_id), "\"" + node.name + "\"")
for ar in self.workflow.dependencies:
print(names[ar[0]][0] + " -> " + names[ar[1]][0], file=file)
for node in self.workflow.jobs:
if isinstance(node, Job):
if not hasattr(node, "job_id") \
or node.job_id == NOT_SUBMITTED_JOB_ID:
print(names[node][0] + "[shape=box label="
+ names[node][1] + "];", file=file)
else:
status = self.connection.job_status(node.job_id)
if status == constants.NOT_SUBMITTED:
print(names[node][0] + "[shape=box label="
+ names[node][1] + ", style=filled, color="
+ GRAY + "];", file=file)
elif status == constants.DONE:
exit_status, exit_value, term_signal, resource_usage \
= self.connection.job_termination_status(
node.job_id)
if exit_status == constants.FINISHED_REGULARLY \
and exit_value == 0:
print(names[node][0] + "[shape=box label="
+ names[node][1] + ", style=filled, color="
+ LIGHT_BLUE + "];", file=file)
else:
print(names[node][0] + "[shape=box label="
+ names[node][1] + ", style=filled, color="
+ RED + "];", file=file)
elif status == constants.FAILED:
print(names[node][0] + "[shape=box label="
+ names[node][1] + ", style=filled, color="
+ RED + "];", file=file)
else:
print(names[node][0] + "[shape=box label="
+ names[node][1] + ", style=filled, color="
+ GREEN + "];", file=file)
if isinstance(node, FileTransfer):
if not hasattr(node, "engine_path") or not node.engine_path:
print(names[node][0] + "[label=" + names[node][1] + "];",
file=file)
else:
status = self.connection.transfer_status(
node.engine_path)[0]
if status == constants.FILES_DONT_EXIST:
print(names[node][0] + "[label="
+ names[node][1] + ", style=filled, color="
+ GRAY + "];", file=file)
elif status == constants.FILES_ON_CR \
or status == constants.FILES_ON_CLIENT_AND_CR \
or status == constants.FILES_ON_CLIENT:
print(names[node][0] + "[label="
+ names[node][1] + ", style=filled, color="
+ BLUE + "];", file=file)
elif status == constants.TRANSFERING_FROM_CLIENT_TO_CR or \
status == constants.TRANSFERING_FROM_CR_TO_CLIENT:
print(names[node][0] + "[label="
+ names[node][1] + ", style=filled, color="
+ GREEN + "];", file=file)
elif status == constants.FILES_UNDER_EDITION:
print(names[node][0] + "[label=" + names[node][1]
+ ", style=filled, color=" + LIGHT_BLUE + "];",
file=file)
print("}", file=file)
file.close()
command = ["dot", "-Tpng", dot_file_path, "-o", graph_file_path]
subprocess.check_call(command)
return graph_file_path
#
# MODEL FOR THE TREE VIEW ###############
#
class WorkflowItemModel(QtCore.QAbstractItemModel):
def __init__(self, gui_workflow, parent=None):
super().__init__(parent)
self.workflow = gui_workflow
self.group_done_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/group_done.png"))
self.group_failed_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/group_failed.png"))
self.group_running_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/group_running.png"))
self.group_no_status_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/group_no_status.png"))
self.group_warning_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/group_warning.png"))
self.running_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/running.png"))
self.failed_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/failed.png"))
self.done_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/success.png"))
self.pending_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/pending.png"))
self.queued_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/queued.png"))
self.undetermined_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/undetermined.png"))
self.warning_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/warning.png"))
self.kill_delete_pending_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/kill_delete_pending.png"))
self.no_status_icon = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/no_status.png"))
self.transfer_files_dont_exit = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/transfer_files_dont_exist.png"))
self.transfer_files_on_client = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/transfer_files_on_client.png"))
self.transfer_files_on_both = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/transfer_files_on_both.png"))
self.transfer_files_on_cr = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/transfer_files_on_cr.png"))
self.transfering_from_client_to_cr = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/transfering_from_client_to_cr.png"))
self.transfering_from_cr_to_client = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/transfering_from_cr_to_client.png"))
self.files_under_edition = QtGui.QIcon(
os.path.join(os.path.dirname(__file__), "icon/files_under_edition.png"))
def index(self, row, column, parent=QtCore.QModelIndex()):
if row < 0 or not column == 0:
return QtCore.QModelIndex()
if not parent.isValid():
if row < len(self.workflow.root_item.children):
return self.createIndex(row, column, self.workflow.items[self.workflow.root_item.children[row]])
else:
parent_item = parent.internalPointer()
if row < len(parent_item.children):
if parent_item.children[row] < len(self.workflow.items):
return self.createIndex(row, column, self.workflow.items[parent_item.children[row]])
return QtCore.QModelIndex()
def parent(self, index):
if not index.isValid():
return QtCore.QModelIndex()
item = index.internalPointer()
if not item.parent == -1:
parent_item = self.workflow.items[item.parent]
return self.createIndex(parent_item.row, 0, self.workflow.items[item.parent])
return QtCore.QModelIndex()
def rowCount(self, parent):
if not parent.isValid():
return len(self.workflow.root_item.children)
else:
parent_item = parent.internalPointer()
return len(parent_item.children)
def columnCount(self, parent):
children_nb = 0
if not parent.isValid():
children_nb = len(self.workflow.root_item.children)
else:
children_nb = len(parent.internalPointer().children)
if children_nb == 0:
return 0
else:
return 1
def data(self, index, role):
if not index.isValid():
if use_qvariant:
return QtCore.QVariant()
else:
return None
item = index.internalPointer()
# if not item.initiated:
# WIP
# Groups ####
if isinstance(item, GuiGroup):
if role == QtCore.Qt.FontRole:
font = QtGui.QFont()
font.setBold(True)
return font
if role == QtCore.Qt.DisplayRole:
return item.name
else:
if item.status == GuiGroup.GP_NO_STATUS:
if role == QtCore.Qt.DecorationRole:
return self.group_no_status_icon
if item.status == GuiGroup.GP_NOT_SUBMITTED:
if role == QtCore.Qt.DecorationRole:
return QtGui.QIcon()
if item.status == GuiGroup.GP_DONE:
if role == QtCore.Qt.DecorationRole:
return self.group_done_icon
if item.status == GuiGroup.GP_FAILED:
if role == QtCore.Qt.DecorationRole:
return self.group_failed_icon
if item.status == GuiGroup.GP_RUNNING:
if role == QtCore.Qt.DecorationRole:
return self.group_running_icon
if item.status == GuiGroup.GP_WARNING:
if role == QtCore.Qt.DecorationRole:
return self.group_warning_icon
# Jobs ####
if isinstance(item, GuiJob):
if item.job_id == NOT_SUBMITTED_JOB_ID:
if role == QtCore.Qt.DisplayRole:
return item.name
if role == QtCore.Qt.DecorationRole:
return self.no_status_icon
else:
status = item.status
# Done or Failed
if status == constants.DONE or status == constants.FAILED:
exit_status, exit_value, term_signal, resource_usage = item.exit_info
if role == QtCore.Qt.DisplayRole:
return item.name # + " status " + repr(exit_status) + " exit_value: " + repr(exit_value) + " signal " + repr(term_signal)
if role == QtCore.Qt.DecorationRole:
if status == constants.DONE and exit_status == constants.FINISHED_REGULARLY and exit_value == 0:
return self.done_icon
elif status == constants.DONE and exit_status == None:
return self.undetermined_icon
else:
return self.failed_icon
if role == QtCore.Qt.DisplayRole:
return item.name # + " " + status
if role == QtCore.Qt.DecorationRole:
if status == constants.NOT_SUBMITTED:
return QtGui.QIcon()
if status == constants.UNDETERMINED:
return self.undetermined_icon
elif status == constants.QUEUED_ACTIVE:
return self.queued_icon
elif status == constants.RUNNING:
return self.running_icon
elif status == constants.DELETE_PENDING or status == constants.KILL_PENDING:
return self.kill_delete_pending_icon
elif status == constants.SUBMISSION_PENDING:
return self.pending_icon
elif status == constants.WARNING:
return self.warning_icon
else: # SYSTEM_ON_HOLD USER_ON_HOLD USER_SYSTEM_ON_HOLD
# SYSTEM_SUSPENDED USER_SUSPENDED
# USER_SYSTEM_SUSPENDED
return self.warning_icon
# FileTransfers ####
if isinstance(item, GuiTransfer):
if isinstance(item, GuiInputTransfer):
# if role == QtCore.Qt.ForegroundRole:
# return QtGui.QBrush(RED)
if item.transfer_status == constants.TRANSFERING_FROM_CLIENT_TO_CR or item.transfer_status == constants.TRANSFERING_FROM_CR_TO_CLIENT:
display = "in: " + item.name + " " + \
repr(item.percentage_achievement) + "%"
else:
display = "in: " + item.name
if isinstance(item, GuiOutputTransfer):
# if role == QtCore.Qt.ForegroundRole:
# return QtGui.QBrush(BLUE)
if item.transfer_status == constants.TRANSFERING_FROM_CLIENT_TO_CR or item.transfer_status == constants.TRANSFERING_FROM_CR_TO_CLIENT:
display = "out: " + item.name + " " + \
repr(item.percentage_achievement) + "%"
else:
display = "out: " + item.name
if not item.engine_path:
if role == QtCore.Qt.DisplayRole:
return display
else:
status = item.transfer_status
if role == QtCore.Qt.DisplayRole:
return display # + " => " + status
if status == constants.FILES_DO_NOT_EXIST:
if role == QtCore.Qt.DecorationRole:
return self.transfer_files_dont_exit
if status == constants.FILES_ON_CLIENT:
if role == QtCore.Qt.DecorationRole:
return self.transfer_files_on_client
if status == constants.FILES_ON_CR:
if role == QtCore.Qt.DecorationRole:
return self.transfer_files_on_cr
if status == constants.FILES_ON_CLIENT_AND_CR:
if role == QtCore.Qt.DecorationRole:
return self.transfer_files_on_both
if status == constants.TRANSFERING_FROM_CLIENT_TO_CR:
if role == QtCore.Qt.DecorationRole:
return self.transfering_from_client_to_cr
if status == constants.TRANSFERING_FROM_CR_TO_CLIENT:
if role == QtCore.Qt.DecorationRole:
return self.transfering_from_cr_to_client
if status == constants.FILES_UNDER_EDITION:
if role == QtCore.Qt.DecorationRole:
return self.files_under_edition
if use_qvariant:
return QtCore.QVariant()
else:
return None
#
# MODEL ######################
#
class ComputingResourcePool:
'''
Holds the instances of soma_workflow.client.WorkflowController associated
to a unique resource id.
'''
# dictonary: resource id -> soma_workflow.client.WorkflowController
_connections = None
# dictionary: resource id -> lock
_connection_locks = None
def __init__(self):
self._connections = {}
self._connection_locks = {}
def __del__(self):
self.delete_all()
def add_default_connection(self):
resource_id = socket.gethostname()
if resource_id not in self._connections.keys():
try:
self.add_connection(resource_id, WorkflowController())
except Exception:
print('could not connect to the default connection %s'
% resource_id)
def add_connection(self, resource_id, workflow_controller):
self._connections[resource_id] = workflow_controller
self._connection_locks[resource_id] = threading.RLock()
def delete_connection(self, resource_id):
print('ComputingResourcePool delete_connection:', resource_id)
if resource_id in self._connections:
print('del WFC')
del self._connections[resource_id]
if resource_id in self._connection_locks:
del self._connection_locks[resource_id]
def delete_all(self):
resource_ids = list(self._connections.keys())
for resource_id in resource_ids:
self.delete_connection(resource_id)
try:
import gc
except ImportError:
gc = None
if gc:
gc.collect()
def reinit_connection(self, resource_id, workflow_controller):
with self._connection_locks[resource_id]:
self._connections[resource_id] = workflow_controller
def connection(self, resource_id):
return self._connections[resource_id]
def lock(self, resource_id):
return self._connection_locks[resource_id]
def resource_exist(self, resource_id):
return resource_id in self._connections.keys()
def resource_ids(self):
return list(self._connections.keys())
[docs]
class ApplicationModel(QtCore.QObject):
'''
Model for the application. This model was created to provide faster
application minimizing communications with the server.
The instances of this class hold the connections and the GuiWorkflow instances
created in the current session of the application.
The current workflow is periodically updated and the signal workflow_state_changed is
emitted if necessary.
'''
# Computing resource pool
resource_pool = None
# gui_workflows
# dictionary: resource_id => workflow_id => GuiWorkflow
_workflows = None
# dictionary: resource_id => workflow_ids => expiration_dates
_expiration_dates = None
# dictionary: resource_id => workflow_ids => workflow_names
_workflow_names = None
# dictionary: resource_id => workflow_ids => workflow_status
_workflow_statuses = None
current_connection = None
current_resource_id = None
_current_workflow = None
current_wf_id = None
workflow_exp_date = None
workflow_name = None
workflow_status = None
tmp_stderrout_dir = None
# dictionary: resource_id => boolean
_hold = None
_timer = None
_timeout_duration = None
# signals
connection_closed_error = QtCore.Signal((), (str,))
current_connection_changed = QtCore.Signal()
workflow_state_changed = QtCore.Signal()
current_workflow_about_to_change = QtCore.Signal()
current_workflow_changed = QtCore.Signal()
global_workflow_state_changed = QtCore.Signal()
[docs]
class UpdateThread(QtCore.QThread):
[docs]
def __init__(self, application_model, parent):
super().__init__(parent)
self.application_model = application_model
[docs]
def run(self):
self.application_model.update()
[docs]
def __init__(self, resource_pool=None, parent=None):
'''
**resource_pool**: *ComputingResourcePool*
'''
super().__init__(parent)
home_dir = configuration.Configuration.get_home_dir()
self.tmp_stderrout_dir = os.path.join(home_dir, ".soma-workflow")
if not os.path.isdir(self.tmp_stderrout_dir) \
and os.path.islink(self.tmp_stderrout_dir):
# make ~/.soma-wodkflow as target of the dead link (if any)
swf_dir = os.readlink(self.tmp_stderrout_dir)
os.makedirs(swf_dir)
if not os.path.isdir(self.tmp_stderrout_dir):
os.makedirs(self.tmp_stderrout_dir)
if resource_pool != None:
self.resource_pool = resource_pool
else:
self.resource_pool = ComputingResourcePool()
self._workflows = {}
self._expiration_dates = {}
self._workflow_names = {}
self._workflow_statuses = {}
self.current_connection = None
self.current_resource_id = None
self._current_workflow = None
self.current_wf_id = None
self.workflow_exp_date = None
self.workflow_name = None
self.workflow_status = None
self._timeout_duration = {}
self.update_interval = 3 # update period in seconds
self.auto_update = True
self._hold = {}
self._lock = threading.RLock()
for rid, connection_inst in self.resource_pool._connections.items():
self.add_connection(rid, connection_inst)
self.update_thread = None
self.global_workflow_state_changed.emit()
self._timer = QtCore.QTimer(self)
self._timer.setInterval(self.update_interval * 3000)
self._timer.timeout.connect(self.threaded_update)
self._timer.start()
QtGui.QApplication.instance().lastWindowClosed.connect(
self.wait_for_thread)
[docs]
@QtCore.Slot()
def threaded_update(self):
if self.update_thread != None:
# flush running update
if not self.update_thread.isFinished():
return # still updating, we will do it again later
if self.update_thread == None:
self.update_thread = ApplicationModel.UpdateThread(
application_model=self,
parent=None)
self.update_thread.start(QtCore.QThread.LowPriority)
[docs]
@QtCore.Slot()
def wait_for_thread(self):
if self.update_thread != None:
if self.update_thread.isRunning():
if not self.update_thread.wait(10000):
self.update_thread.terminate()
self.update_thread.wait()
self.update_thread = None
[docs]
def update(self):
with self._lock:
if self.auto_update:
if self.current_wf_id != None and not self._hold[self.current_resource_id]:
try:
if self.current_wf_id == NOT_SUBMITTED_WF_ID:
wf_status = None
elif self._current_workflow:
# print(" ==> communication with the server " + repr(self.wf_id))
# begining = datetime.now()
# wf_complete_status =
# self.current_connection.workflow_elements_status(self.current_wf_id)
wf_complete_status = self.connection_timeout(
WorkflowController.workflow_elements_status,
args=(
self.current_connection, self.current_wf_id),
timeout_duration=self._timeout_duration[self.current_resource_id])
wf_status = wf_complete_status[2]
# end = datetime.now() - begining
# print(" <== end communication" + repr(self.wf_id)
# + " : " + repr(end.seconds))
else:
wf_status = self.connection_timeout(
WorkflowController.workflow_status,
args=(
self.current_connection, self.current_wf_id),
timeout_duration=self._timeout_duration[self.current_resource_id])
# wf_status =
# self.current_connection.workflow_status(self.current_wf_id)
except ConnectionClosedError as e:
self.connection_closed_error[str].emit(
self.current_resource_id)
self._hold[self.current_resource_id] = True
return
except UnknownObjectError as e:
self.delete_workflow()
return
else:
if self._current_workflow and self.current_wf_id != NOT_SUBMITTED_WF_ID:
if self._current_workflow.updateState(
wf_complete_status):
self.workflow_state_changed.emit()
if self.current_wf_id != NOT_SUBMITTED_WF_ID and self.workflow_status != wf_status:
self.workflow_status = wf_status
self._workflow_statuses[self.current_resource_id][
self.current_wf_id] = wf_status
self.workflow_state_changed.emit()
self.global_workflow_state_changed.emit()
if True:
# update the status of every workflow
global_wf_state_changed = False
for rid in self.resource_pool.resource_ids():
if not self._hold[rid]:
for wfid in list(self._workflows[rid].keys()):
if wfid != self.current_wf_id:
try:
connection = self.resource_pool.connection(
rid)
wf_status = self.connection_timeout(
WorkflowController.workflow_status,
args=(
connection, wfid),
timeout_duration=self._timeout_duration[self.current_resource_id])
# wf_status =
# self.resource_pool.connection(rid).workflow_status(wfid)
except ConnectionClosedError as e:
self.connection_closed_error[str].emit(
rid)
self._hold[rid] = True
break
except UnknownObjectError as e:
self.delete_workflow(wfid)
continue
else:
if wf_status != self._workflow_statuses[rid][wfid]:
global_wf_state_changed = True
self._workflow_statuses[
rid][wfid] = wf_status
if global_wf_state_changed:
self.global_workflow_state_changed.emit()
[docs]
def connection_timeout(self,
func,
args=(),
kwargs={},
timeout_duration=30,
default=None):
"""This function will spawn a thread and run the given function
using the args, kwargs and return the given default value if the
timeout_duration is exceeded.
"""
class InterruptibleThread(QtCore.QThread):
def __init__(self):
super().__init__(parent=None)
self.result = default
self.exception = None
def run(self):
try:
self.result = func(*args, **kwargs)
except Exception as e:
self.exception = e
it = InterruptibleThread()
it.start()
it.wait(timeout_duration * 1000)
if it.isRunning():
it.terminate()
raise ConnectionClosedError("Connection time out")
else:
if it.exception is not None:
raise it.exception
return it.result
[docs]
def list_workflow_names(self, resource_id):
return list(self._workflow_names[resource_id].values())
[docs]
def list_workflow_status(self, resource_id):
return list(self._workflow_statuses[resource_id].values())
[docs]
def get_workflow_status(self, resource_id, workflow_id):
if workflow_id in self._workflow_statuses[resource_id]:
return self._workflow_statuses[resource_id][workflow_id]
else:
return None
[docs]
def list_workflow_expiration_dates(self, resource_id):
return list(self._expiration_dates[resource_id].values())
[docs]
def workflows(self, resource_id):
result = {}
for wf_id in self._workflows[resource_id].keys():
result[wf_id] = (self._workflow_names[resource_id][wf_id],
self._expiration_dates[resource_id][wf_id])
return result
[docs]
def add_connection(self, resource_id, connection):
'''
Adds a connection and use it as the current connection
'''
with self._lock:
self.resource_pool.add_connection(resource_id, connection)
self._workflows[resource_id] = {}
self._expiration_dates[resource_id] = {}
self._workflow_names[resource_id] = {}
self._workflow_statuses[resource_id] = {}
self.current_resource_id = resource_id
self.current_connection = connection
self._current_workflow = None
self.current_wf_id = None
self.workflow_exp_date = None
self.workflow_status = None
self.workflow_name = None
if connection.config.get_mode() == configuration.REMOTE_MODE:
self._timeout_duration[resource_id] = 40
else:
self._timeout_duration[resource_id] = 240
self.current_connection_changed.emit()
self.current_workflow_about_to_change.emit()
self.current_workflow_changed.emit()
self._hold[resource_id] = False
[docs]
def delete_connection(self, resource_id):
'''
Delete the connection.
If the resource is the current connection:
If any other connections exist the new current connection will be one of them.
If not the current connection is set to None.
'''
with self._lock:
#ref = self.resource_pool._connections[resource_id]
self.resource_pool.delete_connection(resource_id)
del self._workflows[resource_id]
del self._expiration_dates[resource_id]
del self._workflow_names[resource_id]
del self._workflow_statuses[resource_id]
del self._hold[resource_id]
if resource_id == self.current_resource_id:
self.current_resource_id = None
self.current_connection = None
self._current_workflow = None
self.current_wf_id = None
self.workflow_exp_date = None
self.workflow_name = None
resource_ids = self.resource_pool.resource_ids()
if resource_ids:
self.current_resource_id \
= next(iter(self.resource_pool.resource_ids()))
else:
self.current_resource_id = None
if self.current_resource_id != None:
self.current_connection = self.resource_pool.connection(
self.current_resource_id)
self.current_connection_changed.emit()
self.global_workflow_state_changed.emit()
#ref.disconnect() # should be done by WorkflowController.__del__
#import objgraph
#objgraph.show_backrefs(ref, refcounts=True, max_depth=10)
# we need to use gc.collect() here to make sure the
# WorkflowController actually gets destroyed. It seems that there
# are some internal references (that I could not find) that prevent
# automatic refcount deletion.
import gc
#print('ref:', gc.get_referrers(ref))
#del ref
gc.collect()
[docs]
def set_current_connection(self, resource_id):
if resource_id != self.current_resource_id:
with self._lock:
print('resource_exist:', repr(resource_id), self.resource_pool.resource_exist(resource_id))
assert(self.resource_pool.resource_exist(resource_id))
self.current_resource_id = resource_id
self.current_connection = self.resource_pool.connection(
resource_id)
self.current_connection_changed.emit()
[docs]
def reinit_connection(self, resource_id, connection):
with self._lock:
self.current_resource_id = resource_id
self.current_connection = connection
self.resource_pool.reinit_connection(resource_id, connection)
self.current_connection_changed.emit()
self._hold[resource_id] = False
[docs]
def add_to_submitted_workflows(self,
workflow_id,
workflow_exp_date,
workflow_name,
workflow_status,
workflow=None):
'''
Add a workflow without modifying the current workflow
'''
with self._lock:
if workflow_id != NOT_SUBMITTED_WF_ID:
if workflow:
self._workflows[self.current_resource_id][workflow_id] = GuiWorkflow(
workflow,
self.tmp_stderrout_dir)
else:
self._workflows[self.current_resource_id][
workflow_id] = None
self._expiration_dates[self.current_resource_id][
workflow_id] = workflow_exp_date
self._workflow_names[self.current_resource_id][
workflow_id] = workflow_name
self._workflow_statuses[self.current_resource_id][
workflow_id] = workflow_status
self.global_workflow_state_changed.emit()
[docs]
def add_workflow(self,
workflow_id,
workflow_exp_date,
workflow_name,
workflow_status,
workflow=None):
'''
Build a GuiWorkflow from a soma_workflow.client.Worklfow and
use it as the current workflow.
'''
with self._lock:
self.current_workflow_about_to_change.emit()
if workflow:
self._current_workflow = GuiWorkflow(
workflow, self.tmp_stderrout_dir)
else:
self._current_workflow = None
self.current_wf_id = workflow_id
self.workflow_exp_date = workflow_exp_date
self.workflow_name = workflow_name
self.workflow_status = workflow_status
if self.current_wf_id != NOT_SUBMITTED_WF_ID:
self._workflows[self.current_resource_id][
workflow_id] = self._current_workflow
self._expiration_dates[self.current_resource_id][
workflow_id] = self.workflow_exp_date
self._workflow_names[self.current_resource_id][
workflow_id] = workflow_name
self._workflow_statuses[self.current_resource_id][
workflow_id] = workflow_status
if self._current_workflow != None:
try:
wf_status \
= self.current_connection.workflow_elements_status(
workflow_id)
except ConnectionClosedError as e:
self.connection_closed_error[()].emit()
else:
self._current_workflow.updateState(wf_status)
self.current_workflow_changed.emit()
self.global_workflow_state_changed.emit()
[docs]
def current_workflow(self):
if self.current_wf_id == NOT_SUBMITTED_WF_ID or \
self.current_wf_id == None or \
self._current_workflow != None:
return self._current_workflow
with self._lock:
try:
workflow = self.current_connection.workflow(self.current_wf_id)
except ConnectionClosedError as e:
QtGui.QApplication.restoreOverrideCursor()
self.connection_closed_error[()].emit()
except UnknownObjectError as e:
self.delete_workflow()
return self._current_workflow
else:
try:
self._current_workflow = GuiWorkflow(
workflow, self.tmp_stderrout_dir)
self._workflows[self.current_resource_id][
self._current_workflow.wf_id] = self._current_workflow
wf_status \
= self.current_connection.workflow_elements_status(
self.current_wf_id)
except ConnectionClosedError as e:
QtGui.QApplication.restoreOverrideCursor()
self.connection_closed_error[()].emit()
except Exception:
import traceback
traceback.print_exc()
self._current_workflow = None
else:
self._current_workflow.updateState(wf_status)
return self._current_workflow
[docs]
def restart_current_workflow(self):
self._current_workflow.restart()
[docs]
def delete_workflow(self, workflow_id=None):
with self._lock:
if workflow_id != None and (self._current_workflow == None or workflow_id != self._current_workflow.wf_id):
del self._workflows[self.current_resource_id][workflow_id]
del self._expiration_dates[
self.current_resource_id][workflow_id]
del self._workflow_names[self.current_resource_id][workflow_id]
del self._workflow_statuses[
self.current_resource_id][workflow_id]
else:
self.current_workflow_about_to_change.emit()
if self._current_workflow and self._current_workflow.wf_id \
in self._workflows[self.current_resource_id].keys():
del self._workflows[self.current_resource_id][
self._current_workflow.wf_id]
del self._expiration_dates[self.current_resource_id][
self._current_workflow.wf_id]
del self._workflow_names[self.current_resource_id][
self._current_workflow.wf_id]
del self._workflow_statuses[self.current_resource_id][
self._current_workflow.wf_id]
self._current_workflow = None
self.current_wf_id = None
self.workflow_exp_date = None # datetime.now()
self.workflow_status = None
self.workflow_name = None
self.current_workflow_changed.emit()
self.global_workflow_state_changed.emit()
[docs]
def clear_current_workflow(self):
with self._lock:
if self._current_workflow != None or \
self.current_wf_id != NOT_SUBMITTED_WF_ID:
self.current_workflow_about_to_change.emit()
self._current_workflow = None
self.current_wf_id = None
self.workflow_exp_date = None # datetime.now()
self.workflow_status = None
self.workflow_name = None
self.current_workflow_changed.emit()
[docs]
def set_current_workflow(self, wf_id):
if wf_id != self.current_wf_id:
with self._lock:
assert(wf_id in self._workflows[
self.current_resource_id].keys())
self.current_workflow_about_to_change.emit()
self.current_wf_id = wf_id
self._current_workflow = self._workflows[
self.current_resource_id][self.current_wf_id]
self.workflow_exp_date = self._expiration_dates[
self.current_resource_id][self.current_wf_id]
self.workflow_name = self._workflow_names[
self.current_resource_id][self.current_wf_id]
self.workflow_status = self._workflow_statuses[
self.current_resource_id][self.current_wf_id]
self.current_workflow_changed.emit()
[docs]
def set_no_current_workflow(self):
with self._lock:
self.current_workflow_about_to_change.emit()
self._current_workflow = None
self.current_wf_id = None
self.workflow_exp_date = None # datetime.now()
self.workflow_status = None
self.workflow_name = None
self.current_workflow_changed.emit()
[docs]
def change_expiration_date(self, date):
self.workflow_exp_date = date
self._expiration_dates[self.current_resource_id][
self._current_workflow.wf_id] = self.workflow_exp_date
[docs]
def is_loaded_workflow(self, wf_id):
return wf_id in self._workflows[self.current_resource_id].keys()
class GuiWorkflow:
# id of the workflow in soma-workflow
wf_id = None
#
wf_status = None
#
root_item = None
# dict: id => WorkflowItems
items = None
# Workflow or EngineWorkflow
server_workflow = None
# dict: FileTransfer id => sequence of gui item transfers id
server_file_transfers = None
# dict: TemporaryPath id => sequence of gui item temporary id
server_temporary = None
# dict: Job id => gui item job id
server_jobs = None
queue = None
def __init__(self, workflow, tmp_stderrout_dir):
# print("wf " +repr(workflow))
self.name = workflow.name
if isinstance(workflow, EngineWorkflow):
self.wf_id = workflow.wf_id
self.queue = workflow.queue
else:
self.wf_id = NOT_SUBMITTED_WF_ID
self.queue = None
self.wf_status = None
ids = {} # workflow element => sequence of ids
self.items = {} # id => WorkflowItem
self.root_item = None
id_cnt = 0 # unique id for the items
self.server_workflow = workflow
self.server_jobs = {}
self.server_file_transfers = {}
self.server_temporary = {}
# print(" ==> building the workflow ")
# begining = datetime.now()
# retrieving the set of job and the set of file transfers
w_js = set()
w_fts = set()
for job in workflow.jobs:
w_js.add(job)
# Processing the Jobs to create the corresponding GuiJob instances
for job in w_js:
item_id = id_cnt
id_cnt = id_cnt + 1
if isinstance(workflow, EngineWorkflow):
ejob = workflow.job_mapping[job]
job_id = ejob.job_id
command = ejob.plain_command()
else:
job_id = NOT_SUBMITTED_JOB_ID
command = job.command
gui_job = GuiJob(it_id=item_id,
command=command,
tmp_stderrout_dir=tmp_stderrout_dir,
parent=-1,
row=-1,
data=job,
children_nb=len(job.referenced_input_files) + len(
job.referenced_output_files),
name=job.name,
job_id=job_id,
priority=job.priority,
parallel_job_info=job.parallel_job_info,
gui_workflow=self)
ids[job] = item_id
self.items[item_id] = gui_job
self.server_jobs[gui_job.job_id] = item_id
for ft in job.referenced_input_files:
w_fts.add(ft)
for ft in job.referenced_output_files:
w_fts.add(ft)
# Create the GuiGroup instances
self.root_item = GuiGroup(self,
it_id=-1,
parent=-1,
row=-1,
data=workflow.root_group,
children_nb=len(workflow.root_group))
for group in workflow.groups:
item_id = id_cnt
id_cnt = id_cnt + 1
ids[group] = item_id
self.items[item_id] = GuiGroup(self,
it_id=item_id,
parent=-1,
row=-1,
data=group,
children_nb=len(group.elements),
name=group.name)
# parent and children research for jobs and groups
# build maps to accelerate search
rows = {data: row for row, data in enumerate(workflow.root_group)}
g_rows = {}
for group in workflow.groups:
g_rows[group] = {data: row
for row, data in enumerate(group.elements)}
for item in self.items.values():
if isinstance(item, GuiGroup) or isinstance(item, GuiJob):
row = rows.get(item.data)
if row is not None:
item.parent = -1
item.row = row
self.root_item.children[item.row] = item.it_id
for group in workflow.groups:
row = g_rows[group].get(item.data)
if row is not None:
item.parent = ids[group]
item.row = row
self.items[item.parent].children[item.row] = item.it_id
# processing the file transfers
def file_transfer_key(ft):
if isinstance(ft, FileTransfer):
return ft.name
return ft
for ft in w_fts:
# print(" ft " + repr(ft))
ids[ft] = []
for job in w_js:
ref_in = list(job.referenced_input_files)
ref_in.sort(key=file_transfer_key)
ref_out = list(job.referenced_output_files)
ref_out.sort(key=file_transfer_key)
if ft in ref_in:
item_id = id_cnt
id_cnt = id_cnt + 1
ids[ft].append(item_id)
row = ref_in.index(ft)
if isinstance(workflow, EngineWorkflow):
engine_id = workflow.transfer_mapping[ft].get_id()
engine_path = workflow.transfer_mapping[ft].engine_path
else:
engine_id = None
engine_path = None
gui_transfer = GuiInputTransfer(it_id=item_id,
parent=ids[job],
row=row,
data=ft,
name=ft.name,
engine_path=engine_path,
engine_id=engine_id)
self.items[item_id] = gui_transfer
if isinstance(ft, TemporaryPath):
self.server_temporary.setdefault(
gui_transfer.engine_id, []).append(item_id)
else:
self.server_file_transfers.setdefault(
gui_transfer.engine_id, []).append(item_id)
self.items[ids[job]].children[row] = item_id
# print(repr(job.name) + " " +
# repr(self.items[ids[job]].children))
if ft in ref_out:
item_id = id_cnt
id_cnt = id_cnt + 1
ids[ft].append(item_id)
row = len(ref_in) + ref_out.index(ft)
if isinstance(workflow, EngineWorkflow):
engine_id = workflow.transfer_mapping[ft].get_id()
engine_path = workflow.transfer_mapping[ft].engine_path
else:
engine_id = None
engine_path = None
gui_ft = GuiOutputTransfer(it_id=item_id,
parent=ids[job],
row=row,
data=ft,
name=ft.name,
engine_path=engine_path,
engine_id=engine_id)
self.items[item_id] = gui_ft
if isinstance(ft, TemporaryPath):
self.server_temporary.setdefault(
gui_ft.engine_id, []).append(item_id)
else:
self.server_file_transfers.setdefault(
gui_ft.engine_id, []).append(item_id)
self.items[ids[job]].children[row] = item_id
# print(repr(job.name) + " " +
# repr(self.items[ids[job]].children))
# for item in self.items.itervalues():
# print(repr(item.children))
# end = datetime.now() - begining
# print(" <== end building workflow " + repr(end.seconds))
# print model ####################
# print("dependencies : " + repr(len(workflow.dependencies)))
# if workflow.full_dependencies:
# print("full_dependencies : " + repr(len(workflow.full_dependencies)))
# for dep in workflow.dependencies:
# print(dep[0].name + " -> " + dep[1].name)
# for item in self.items.values():
# print(repr(item.it_id) + " " + repr(item.parent) + " " + repr(item.row) + " " + repr(item.it_type) + " " + repr(item.name) + " " + repr(item.children))
# raw_input()
#
def updateState(self, wf_status):
if self.wf_id == NOT_SUBMITTED_WF_ID:
return False
data_changed = False
self.queue = wf_status[3]
if not wf_status:
return False
# updating jobs:
for job_info in wf_status[0]:
job_id, status, queue, exit_info, date_info, drmaa_id = job_info
# date_info = (None, None, None) # (submission_date,
# execution_date, ending_date)
item = self.items[self.server_jobs[job_id]]
data_changed = item.updateState(
status, queue, exit_info, date_info, drmaa_id) or data_changed
# end = datetime.now() - begining
# print(" <== end updating jobs" + repr(self.wf_id) + " : " +
# repr(end.seconds))
# print(" ==> updating transfers " + repr(self.wf_id))
# begining = datetime.now()
# updating file transfer
for transfer_info in wf_status[1]:
transfer_id, complete_status = transfer_info
for item_id in self.server_file_transfers[transfer_id]:
item = self.items[item_id]
data_changed = item.updateState(
complete_status) or data_changed
# updating temp files
for temp_info in wf_status[4]:
temp_path_id, engine_file_path, status = temp_info
complete_status = (status, None, engine_file_path, None, None)
for item_id in self.server_temporary[temp_path_id]:
item = self.items[item_id]
data_changed = item.updateState(
complete_status) or data_changed
# end = datetime.now() - begining
# print(" <== end updating transfers" + repr(self.wf_id) + " : " +
# repr(end.seconds) + " " + repr(data_changed))
# updateing groups
self.root_item.updateState()
data_changed = data_changed or not self.wf_status == wf_status[2]
self.wf_status = wf_status[2]
return data_changed
def restart(self):
for item in self.items.values():
if isinstance(item, GuiJob):
item.stdout = ""
item.stderr = ""
item.submission_date = None
item.execution_date = None
item.ending_date = None
item.serial_duration = None
class GuiWorkflowItem:
'''
Abstract class for workflow items.
'''
def __init__(self,
it_id,
parent=-1,
row=-1,
data=None,
children_nb=0):
self.it_id = it_id
self.parent = parent # parent_id
self.row = row
self.data = data
self.children = [-1 for i in range(children_nb)]
self.initiated = False
class GuiGroup(GuiWorkflowItem):
GP_NOT_SUBMITTED = "not_submitted"
GP_DONE = "done"
GP_FAILED = "failed"
GP_RUNNING = "running"
GP_NO_STATUS = "no_status"
GP_WARNING = "warning"
def __init__(self,
gui_workflow,
it_id,
parent=-1,
row=-1,
data=None,
children_nb=0,
name="no name"):
super().__init__(it_id, parent, row, data, children_nb)
self.gui_workflow = gui_workflow
self.status = GuiGroup.GP_NO_STATUS
self.name = name
self.not_sub = []
self.done = []
self.failed = []
self.running = []
self.pending = []
self.queued = []
self.warning = []
self.job_count = 0
self.first_sub_date = None
self.last_end_date = None
self.theoretical_serial_time = None
self.input_to_transfer = []
self.input_transfer_ended = []
self.output_ready = []
self.output_transfer_ended = []
def updateState(self):
self.initiated = True
state_changed = False
self.first_sub_date = datetime.max
self.last_end_date = datetime.min
self.theoretical_serial_time = timedelta(0, 0, 0)
self.input_to_transfer = []
self.input_transfer_ended = []
self.output_ready = []
self.output_transfer_ended = []
self.not_sub = []
self.done = []
self.failed = []
self.running = []
self.pending = []
self.queued = []
self.warning = []
no_status = False
self.job_count = 0
for child in self.children:
item = self.gui_workflow.items[child]
# TO DO : explore files
if isinstance(item, GuiJob):
self.job_count = self.job_count + 1
if item.job_id == NOT_SUBMITTED_JOB_ID:
no_status = True
break
if item.status == constants.WARNING:
self.warning.append(item)
elif item.status == constants.NOT_SUBMITTED:
self.not_sub.append(item)
elif item.status == constants.DONE or item.status == constants.FAILED:
exit_status, exit_value, term_signal, resource_usage = item.exit_info
if item.status == constants.DONE and exit_status == constants.FINISHED_REGULARLY and exit_value == 0:
self.done.append(item)
elif item.status == constants.DONE and exit_status == None:
self.running.append(item)
else:
self.failed.append(item)
elif item.status == constants.SUBMISSION_PENDING:
self.pending.append(item)
elif item.status == constants.QUEUED_ACTIVE:
self.queued.append(item)
else:
self.running.append(item)
if item.serial_duration:
self.theoretical_serial_time \
= self.theoretical_serial_time + item.serial_duration
if item.ending_date is not None \
and item.ending_date > self.last_end_date:
self.last_end_date = item.ending_date
if item.submission_date \
and item.submission_date < self.first_sub_date:
self.first_sub_date = item.submission_date
if isinstance(item, GuiGroup):
item.updateState()
self.job_count = self.job_count + item.job_count
self.not_sub.extend(item.not_sub)
self.done.extend(item.done)
self.failed.extend(item.failed)
self.running.extend(item.running)
self.pending.extend(item.pending)
self.queued.extend(item.queued)
self.warning.extend(item.warning)
self.input_to_transfer.extend(item.input_to_transfer)
self.input_transfer_ended.extend(item.input_transfer_ended)
self.output_ready.extend(item.output_ready)
self.output_transfer_ended.extend(item.output_transfer_ended)
if item.first_sub_date and item.first_sub_date < self.first_sub_date:
self.first_sub_date = item.first_sub_date
if item.last_end_date and item.last_end_date > self.last_end_date:
self.last_end_date = item.last_end_date
self.theoretical_serial_time = self.theoretical_serial_time + \
item.theoretical_serial_time
if no_status:
new_status = GuiGroup.GP_NO_STATUS
elif len(self.warning) > 0:
new_status = GuiGroup.GP_WARNING
elif len(self.failed) > 0:
new_status = GuiGroup.GP_FAILED
elif len(self.not_sub) == 0 and len(self.failed) == 0 and len(self.running) + len(self.pending) + len(self.queued) == 0:
new_status = GuiGroup.GP_DONE
elif len(self.running) + len(self.pending) + len(self.queued) == 0 and len(self.done) == 0 and len(self.failed) == 0:
new_status = GuiGroup.GP_NOT_SUBMITTED
self.first_sub_date = None
self.last_end_date = None
else:
new_status = GuiGroup.GP_RUNNING
self.last_end_date = None
state_changed = self.status != new_status
self.status = new_status
return state_changed
class GuiJob(GuiWorkflowItem):
def __init__(self,
it_id,
command,
tmp_stderrout_dir,
parent=-1,
row=-1,
it_type=None,
data=None,
children_nb=0,
name="no name",
job_id=NOT_SUBMITTED_JOB_ID,
priority=None,
parallel_job_info=None,
gui_workflow=None):
super().__init__(it_id, parent, row, data, children_nb)
self.status = "not submitted"
self.exit_info = ("", "", "", "")
self.stdout = ""
self.stderr = ""
self.submission_date = None
self.execution_date = None
self.ending_date = None
self.priority = priority
self.queue = None
self.serial_duration = None
self.name = name
self.job_id = job_id
self.drmaa_id = None
self.tmp_stderrout_dir = tmp_stderrout_dir
self.parallel_job_info = parallel_job_info
if gui_workflow is not None:
self.gui_workflow = weakref.ref(gui_workflow)
else:
self.gui_workflow = None
cmd_seq = []
for command_el in command:
if isinstance(command_el, tuple) and isinstance(command_el[0], FileTransfer):
cmd_seq.append(
"<FileTransfer " + command_el[0].client_path + " >")
elif isinstance(command_el, FileTransfer):
cmd_seq.append(
"<FileTransfer " + command_el.client_path + " >")
elif isinstance(command_el, SharedResourcePath):
cmd_seq.append("<SharedResourcePath " + command_el.namespace +
" " + command_el.uuid + " " + command_el.relative_path + " >")
elif isinstance(command_el, TemporaryPath):
cmd_seq.append("<TemporaryPath " + command_el.name + " >")
elif isinstance(command_el, str):
cmd_seq.append(command_el)
elif isinstance(command_el, str):
cmd_seq.append(command_el.decode('utf-8'))
else:
cmd_seq.append(repr(command_el))
separator = " "
self.command = separator.join(cmd_seq)
def updateState(self, status, queue, exit_info, date_info, drmaa_id):
self.initiated = True
state_changed = False
state_changed = self.status != status or state_changed
self.status = status
state_changed = self.exit_info != exit_info or state_changed
self.exit_info = exit_info
self.submission_date = date_info[0]
self.execution_date = date_info[1]
self.ending_date = date_info[2]
self.drmaa_id = drmaa_id
state_changed = self.queue != queue or state_changed
self.queue = queue
if self.exit_info:
exit_status, exit_value, term_signal, resource_usage = self.exit_info
rud = {}
if resource_usage:
if six.PY3 and isinstance(resource_usage, bytes):
# in py3 RU is bytes, we want unicode/str
resource_usage = resource_usage.decode()
ru = resource_usage.split()
for ruel in ru:
ruel = ruel.split("=")
rud[ruel[0]] = ruel[1]
if ruel[0] == "start_time" and ruel[1] != "0":
t = time.localtime(float(ruel[1].replace(',', '.')))
self.execution_date = datetime(year=t[0], month=t[
1], day=t[2], hour=t[3], minute=t[4], second=t[5])
elif ruel[0] == "end_time" and ruel[1] != "0":
t = time.localtime(float(ruel[1].replace(',', '.')))
self.ending_date = datetime(year=t[0], month=t[
1], day=t[2], hour=t[3], minute=t[4], second=t[5])
elif ruel[0] == "submission_time" and ruel[1] != "0":
t = time.localtime(float(ruel[1].replace(',', '.')))
self.submission_date = datetime(year=t[0], month=t[
1], day=t[2], hour=t[3], minute=t[4], second=t[5])
if self.ending_date:
self.serial_duration \
= self.ending_date - self.execution_date
if "cput" in rud:
tlist = rud["cput"].split(':')
if len(tlist) >= 1 and '.' in tlist[-1]:
# slurm time format: hh:MM.sss
tlist.append(tlist[-1].split('.')[1])
tlist[-2] = tlist[-2].split('.')[0]
tlist = [int(round(float(x))) for x in tlist]
tlist = [0] * (6 - len(tlist)) + tlist + [0, 0, 0]
tlist[0] += 2000 # to avoid error about year range
t = time.struct_time(tlist)
t = datetime.fromtimestamp(time.mktime(t))
t0 = datetime.fromtimestamp(time.mktime(
time.struct_time([2000] + [0] * 8)))
#t = datetime.strptime(rud["cput"], "%H:%M:%S")
#t0 = datetime.strptime("00:00:00", "%H:%M:%S")
self.serial_duration = t - t0
elif "cpupercent" in rud:
duration = self.serial_duration.total_seconds() \
* float(rud["cpupercent"]) / 100.
self.serial_duration = timedelta(seconds=duration)
elif 'ncpus' in rud:
duration = self.serial_duration.total_seconds() \
* int(rud["ncpus"])
self.serial_duration = timedelta(seconds=duration)
return state_changed
def updateStdOutErr(self, connection):
if self.data and self.job_id != NOT_SUBMITTED_JOB_ID:
stdout_path = os.path.join(
self.tmp_stderrout_dir, "tmp_stdout_file")
stderr_path = os.path.join(
self.tmp_stderrout_dir, "tmp_stderr_file")
if os.path.exists(stdout_path):
os.unlink(stdout_path)
if os.path.exists(stderr_path):
os.unlink(stderr_path)
connection.retrieve_job_stdouterr(
self.job_id, stdout_path, stderr_path)
stdout = ""
if os.path.exists(stdout_path):
with open(stdout_path) as f:
line = f.readline()
while line:
stdout = stdout + line + "\n"
line = f.readline()
os.chmod(stdout_path, 0o666)
self.stdout = stdout
stderr = ""
if os.path.exists(stderr_path):
with open(stderr_path) as f:
line = f.readline()
while line:
stderr = stderr + line + "\n"
line = f.readline()
os.chmod(stderr_path, 0o666)
self.stderr = stderr
def update_job_command(self, connection):
if self.job_id != NOT_SUBMITTED_JOB_ID:
try:
command = connection.get_job_command(self.job_id)
self.command = command
except DatabaseError:
# if the job / workflow has been deleted in the meantime,
# then just do nothoing.
pass
def update_job_params(self, connection):
if self.job_id != NOT_SUBMITTED_JOB_ID:
in_params = connection.updated_job_parameters(self.job_id)
self.data.param_dict.update(in_params)
if self.data.has_outputs:
out_params = connection.get_job_output_params(self.job_id)
self.output_params = out_params
class GuiTransfer(GuiWorkflowItem):
DIRECTORY = "directory"
FILE = "file"
def __init__(self,
it_id,
parent=-1,
row=-1,
data=None,
children_nb=0,
name="no name",
engine_path=None,
engine_id=None):
super().__init__(
it_id, parent, row, data, children_nb)
self.transfer_status = " "
self.size = None
self.transmitted = None
self.elements_status = None
self.percentage_achievement = 0
self.transfer_type = None
self.name = name
self.engine_path = engine_path
self.engine_id = engine_id
def updateState(self, transfer_status_info):
self.initiated = True
state_changed = False
transfer_status = transfer_status_info[0]
engine_path = transfer_status_info[2]
client_path = transfer_status_info[3]
client_paths = transfer_status_info[4]
if transfer_status_info[1]:
if len(transfer_status_info[1]) == 2:
self.transfer_type = GuiTransfer.FILE
size, transmitted = transfer_status_info[1]
elements_status = None
elif len(transfer_status_info[1]) == 3:
self.transfer_type = GuiTransfer.DIRECTORY
(size, transmitted, elements_status) = transfer_status_info[1]
if size != 0:
self.percentage_achievement = int(
float(transmitted) / size * 100.0)
else:
self.percentage_achievement = 100
else:
(size, transmitted, elements_status) = (None, None, None)
self.percentage_achievement = 0
state_changed = state_changed or transfer_status != self.transfer_status
state_changed = state_changed or size != self.size
state_changed = state_changed or transmitted != self.transmitted
state_changed = state_changed or elements_status != self.elements_status
state_changed = state_changed or engine_path != self.engine_path
if hasattr(self.data, 'client_path'):
state_changed |= client_path != self.data.client_path
if hasattr(self.data, 'client_paths'):
state_changed |= client_paths != self.data.client_paths
self.transfer_status = transfer_status
self.size = size
self.transmitted = transmitted
self.elements_status = elements_status
self.engine_path = engine_path
if hasattr(self.data, 'client_path'):
self.data.client_path = client_path
self.data.client_paths = client_paths
return state_changed
class GuiInputTransfer(GuiTransfer):
def __init__(self,
it_id,
parent=-1,
row=-1,
data=None,
children_nb=0,
name="no name",
engine_path=None,
engine_id=None):
super().__init__(it_id,
parent,
row,
data,
children_nb,
name,
engine_path,
engine_id)
class GuiOutputTransfer(GuiTransfer):
def __init__(self,
it_id,
parent=-1,
row=-1,
data=None,
children_nb=0,
name="no name",
engine_path=None,
engine_id=None):
super().__init__(it_id,
parent,
row,
data,
children_nb,
name,
engine_path,
engine_id)