# -*- coding: utf-8 -*-
'''
A Pipeline structure viewer widget, which displays pipeline nodes as boxes and links as lines, and provides pipeline editor features.
The only main class you should look at is the :class:`PipelineDeveloperView` widget, the remaining is internal infrastructure::
pv = PipelineDeveloperView(pipeline, allow_open_controller=True,
enable_edition=True,show_sub_pipelines=True)
pv.show()
Classes
=======
:class:`ColorType`
------------------
:class:`Plug`
-------------
:class:`EmbeddedSubPipelineItem`
--------------------------------
:class:`boxItem`
----------------
:class:`NodeGWidget`
--------------------
:class:`HandleItem`
-------------------
:class:`Link`
-------------
:class:`PipelineScene`
----------------------
:class:`PipelineDeveloperView`
-------------------------------
'''
# System import
import os
from pprint import pprint
import weakref
import tempfile
import soma.subprocess
import shutil
import importlib
import sys
import types
import inspect
import six
import json
import io
import traceback
import html
# Capsul import
from soma.qt_gui.qt_backend import QtCore, QtGui, Qt
from soma.qt_gui.qt_backend.Qt import QMessageBox
from soma.sorted_dictionary import SortedDictionary
from capsul.api import Switch, PipelineNode, OptionalOutputSwitch
from capsul.pipeline import pipeline_tools
from capsul.api import Pipeline
from capsul.api import Process
from capsul.api import get_process_instance
from capsul.pipeline.pipeline_nodes import Node, ProcessNode
from soma.qt_gui.qt_backend.Qt import QGraphicsView
from capsul.qt_gui.widgets.pipeline_file_warning_widget \
import PipelineFileWarningWidget
import capsul.pipeline.xml as capsulxml
from capsul.study_config import process_instance
from capsul.pipeline.process_iteration import ProcessIteration
from soma.controller import Controller
from soma.utils.functiontools import SomaPartial
from six.moves import range
from six.moves import zip
from soma.utils.weak_proxy import proxy_method
from traits import api as traits
from soma.qt_gui import qt_backend
qt_backend.init_traitsui_handler()
from soma.qt_gui.controller_widget import ScrollControllerWidget
from capsul.qt_gui.widgets.attributed_process_widget \
import AttributedProcessWidget
# -----------------------------------------------------------------------------
# Globals and constants
# -----------------------------------------------------------------------------
GRAY_1 = QtGui.QColor.fromRgbF(0.7, 0.7, 0.8, 0.1)
GRAY_2 = QtGui.QColor.fromRgbF(0.4, 0.4, 0.4, 1)
LIGHT_GRAY_1 = QtGui.QColor.fromRgbF(0.2, 0.2, 0.2, 1)
LIGHT_GRAY_2 = QtGui.QColor.fromRgbF(0.2, 0.2, 0.2, 1)
# Colors for links and plugs
ORANGE_1 = QtGui.QColor.fromRgb(220, 80, 20)
ORANGE_2 = QtGui.QColor.fromRgb(220, 120, 20)
BLUE_1 = QtGui.QColor.fromRgb(50, 150, 250)
BLUE_2 = QtGui.QColor.fromRgb(50, 50, 250)
PURPLE_2 = QtGui.QColor.fromRgb(200, 0, 200)
RED_2 = QtGui.QColor.fromRgb(200, 0, 0)
GREEN_2 = QtGui.QColor.fromRgb(0, 100, 0)
BLACK_2 = QtGui.QColor.fromRgb(10, 10, 10)
WHITE_2 = QtGui.QColor.fromRgb(255, 255, 255)
ANTHRACITE_1 = QtGui.QColor.fromRgbF(0.05, 0.05, 0.05)
LIGHT_ANTHRACITE_1 = QtGui.QColor.fromRgbF(0.25, 0.25, 0.25)
# -----------------------------------------------------------------------------
# Classes and functions
# -----------------------------------------------------------------------------
class ColorType(object):
def __init__(self):
pass
def colorLink(self, x):
if not isinstance(x, str):
# x is a trait
trait_type_str = x.trait_type.__class__.__name__
if x.output and x.input_filename is False:
trait_type_str = 'File_out'
x = trait_type_str
return {
'Str': PURPLE_2,
'Float': ORANGE_1,
'Int': BLUE_2,
'List': RED_2,
'File': ORANGE_2,
'File_out': GREEN_2,
}[x]
[docs]
class Plug(QtGui.QGraphicsPolygonItem):
def __init__(self, color, name, height, width, activated=True,
optional=False, parent=None):
super(Plug, self).__init__(parent)
self.name = name
# self.color = self._color(activated, optional)
self.color = color
if optional:
brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
brush.setColor(self.color)
polygon = QtGui.QPolygonF([QtCore.QPointF(0, 0),
QtCore.QPointF(width / 1.5, 0),
QtCore.QPointF(width / 1.5,
(height - 5)),
QtCore.QPointF(0, (height - 5))
])
# self.setPen(QtGui.QPen(QtCore.Qt.NoPen))
else:
brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
brush.setColor(self.color)
polygon = QtGui.QPolygonF([QtCore.QPointF(0, 0),
QtCore.QPointF(
width, (height - 5) / 2.0),
QtCore.QPointF(0, height - 5)
])
self.setPolygon(polygon)
self.setBrush(brush)
self.setZValue(3)
self.setAcceptedMouseButtons(QtCore.Qt.LeftButton)
# def _color(self, activated, optional):
# if optional:
# if activated:
# color = QtCore.Qt.darkGreen
# else:
# color = QtGui.QColor('#BFDB91')
# else:
# if activated:
# color = QtCore.Qt.black
# else:
# color = QtCore.Qt.gray
# return color
# def update_plug(self, activated, optional):
# color = self._color(activated, optional)
# brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
# brush.setColor(color)
# self.setBrush(brush)
def update_plug(self, color):
brush = QtGui.QBrush(QtCore.Qt.SolidPattern)
brush.setColor(color)
self.setBrush(brush)
def get_plug_point(self):
point = QtCore.QPointF(
self.boundingRect().size().width() / 2.0,
self.boundingRect().size().height() / 2.0)
return self.mapToParent(point)
[docs]
def mousePressEvent(self, event):
super(Plug, self).mousePressEvent(event)
if event.button() == QtCore.Qt.LeftButton:
self.scene().plug_clicked.emit(self.name)
event.accept()
elif event.button() == QtCore.Qt.RightButton:
# print('plug: right click')
self.scene().plug_right_clicked.emit(self.name)
event.accept()
[docs]
class EmbeddedSubPipelineItem(QtGui.QGraphicsProxyWidget):
'''
QGraphicsItem containing a sub-pipeline view
'''
def __init__(self, sub_pipeline_wid):
super(EmbeddedSubPipelineItem, self).__init__()
old_height = sub_pipeline_wid.sizeHint().height()
sizegrip = QtGui.QSizeGrip(None)
new_height = old_height \
+ sub_pipeline_wid.horizontalScrollBar().height()
sub_pipeline_wid.setCornerWidget(sizegrip)
sub_pipeline_wid.setHorizontalScrollBarPolicy(
QtCore.Qt.ScrollBarAlwaysOn)
sub_pipeline_wid.resize(sub_pipeline_wid.sizeHint().width(), new_height)
self.setWidget(sub_pipeline_wid)
[docs]
class boxItem(QtGui.QGraphicsRectItem):
def __init__(self, parent=None):
super(boxItem, self).__init__(parent)
# self.setFlags(self.ItemIsFocusable)
self.penBox = 0;
self.name = ""
[docs]
def focusInEvent(self, event):
self.setPen(QtGui.QPen(QtGui.QColor(150, 150, 250), 3, QtCore.Qt.DashDotLine))
return QtGui.QGraphicsRectItem.focusInEvent(self, event)
[docs]
def focusOutEvent(self, event):
self.setPen(self.penBox)
return QtGui.QGraphicsRectItem.focusOutEvent(self, event)
[docs]
def keyPressEvent(self, event):
if event.key() == QtCore.Qt.Key_Delete:
self.scene()._node_keydelete_clicked(self)
event.accept()
else:
super(boxItem, self).keyPressEvent(event)
[docs]
class HandleItem(QtGui.QGraphicsRectItem):
""" A handle that can be moved by the mouse """
def __init__(self, parent=None):
super(HandleItem, self).__init__(Qt.QRectF(-10.0, -10.0, 10.0, 10.0), parent)
# self.setRect(Qt.QRectF(-4.0,-4.0,4.0,4.0))
self.posChangeCallbacks = []
self.setPen(QtGui.QPen(QtCore.Qt.NoPen))
self.setBrush(QtGui.QBrush(QtCore.Qt.yellow))
self.setFlag(self.ItemIsMovable, True)
self.setFlag(self.ItemSendsScenePositionChanges, True)
self.setCursor(QtGui.QCursor(QtCore.Qt.SizeFDiagCursor))
self.wmin = 0.0
self.hmin = 0.0
self.hmax = 0.0
self.effectiveOpacity()
self.setOpacity(0.01)
[docs]
def itemChange(self, change, value):
if change == self.ItemPositionChange:
self.x, self.y = value.x(), value.y()
if self.x < self.wmin:
self.x = self.wmin
if self.y < self.hmin:
self.y = self.hmin
# TODO: make this a signal?
# This cannot be a signal because this is not a QObject
for cb in self.posChangeCallbacks:
res = cb(self.x, self.y)
if res:
self.x, self.y = res
if self.x < self.wmin:
self.x = self.wmin
if self.y < self.hmin:
self.y = self.hmin
value = QtCore.QPointF(self.x, self.y)
# value = Qt.QPointF(x, y) #### ??
self.hmax = value.y()
return value
# Call superclass method:
return super(HandleItem, self).itemChange(change, value)
[docs]
def mouseReleaseEvent(self, mouseEvent):
self.setSelected(False)
self.setPos(self.x, self.y)
return QtGui.QGraphicsRectItem.mouseReleaseEvent(self, mouseEvent)
[docs]
class Link(QtGui.QGraphicsPathItem):
def __init__(self, origin, target, active, weak, color, parent=None):
super(Link, self).__init__(parent)
self._set_pen(active, weak, color)
self.setFlag(QtGui.QGraphicsItem.ItemIsSelectable, False)
self.setFlag(QtGui.QGraphicsItem.ItemIsFocusable, True)
path = QtGui.QPainterPath()
path.moveTo(origin.x(), origin.y())
path.cubicTo(origin.x() + 90, origin.y(),
target.x() - 90, target.y(),
target.x() - 5, target.y())
self.setPath(path)
self.setZValue(0.5)
self.active = active
self.weak = weak
self.color = color
self.effectiveOpacity()
def _set_pen(self, active, weak, color):
self.pen = QtGui.QPen()
self.pen.setWidth(3)
if active:
self.pen.setBrush(color)
else:
self.pen.setBrush(QtCore.Qt.gray)
if weak:
self.pen.setStyle(QtCore.Qt.DashLine)
self.pen.setCapStyle(QtCore.Qt.RoundCap)
self.pen.setJoinStyle(QtCore.Qt.RoundJoin)
self.setPen(self.pen)
[docs]
def update(self, origin, target):
path = QtGui.QPainterPath()
path.moveTo(origin.x(), origin.y())
path.cubicTo(origin.x() + 90, origin.y(),
target.x() - 90, target.y(),
target.x() - 5, target.y())
self.setPath(path)
def update_activation(self, active, weak, color):
if color == 'current':
color = self.color
self._set_pen(active, weak, color)
self.active = active
self.weak = weak
def fonced_viewer(self, det):
if det:
# color=QtGui.QColor(150, 150, 250)
self.setOpacity(0.2)
else:
# color=self.color
self.setOpacity(1)
# self._set_pen(self.active, self.weak, color)
[docs]
def mousePressEvent(self, event):
item = self.scene().itemAt(event.scenePos(), Qt.QTransform())
# print('Link click, item:', item)
if event.button() == QtCore.Qt.RightButton:
# not a signal since we don't jhave enough identity information in
# self: the scene has to help us.
self.scene()._link_right_clicked(self)
else:
super(Link, self).mousePressEvent(event)
event.accept()
[docs]
def focusInEvent(self, event):
self.setPen(QtGui.QPen(QtGui.QColor(150, 150, 250), 3, QtCore.Qt.DashDotDotLine))
return QtGui.QGraphicsPathItem.focusInEvent(self, event)
[docs]
def focusOutEvent(self, event):
self.setPen(self.pen)
return QtGui.QGraphicsPathItem.focusOutEvent(self, event)
[docs]
def keyPressEvent(self, event):
if event.key() == QtCore.Qt.Key_Delete:
self.scene()._link_keydelete_clicked(self)
event.accept()
else:
super(Link, self).keyPressEvent(event)
[docs]
class PipelineScene(QtGui.QGraphicsScene):
# Signal emitted when a sub pipeline has to be open.
subpipeline_clicked = QtCore.Signal(str, Process,
QtCore.Qt.KeyboardModifiers)
# Signal emitted when a node box is clicked
process_clicked = QtCore.Signal(str, Process)
node_clicked = QtCore.Signal(str, Node)
# Signal emitted when a node box is clicked with ctrl
node_clicked_ctrl = QtCore.Signal(str, Process)
# Signal emitted when a switch box is clicked
switch_clicked = QtCore.Signal(str, Switch)
# Signal emitted when a node box is right-clicked
node_right_clicked = QtCore.Signal(str, Controller)
# Signal emitted when a plug is clicked
plug_clicked = QtCore.Signal(str)
# Signal emitted when a plug is clicked with the right mouse button
plug_right_clicked = QtCore.Signal(str)
# Signal emitted when a link is right-clicked
link_right_clicked = QtCore.Signal(str, str, str, str)
link_keydelete_clicked = QtCore.Signal(str, str, str, str)
node_keydelete_clicked = QtCore.Signal(str)
def __init__(self, parent=None, userlevel=0):
super(PipelineScene, self).__init__(parent)
self.gnodes = {}
self.glinks = {}
self._pos = 50
self.pos = {}
self.dim = {} # add by Irmage OM for recorded dimension of Nodes
self.colored_parameters = True
self.logical_view = False
self._enable_edition = False
self.labels = []
self._userlevel = userlevel
# pen = QtGui.QPen(QtGui.QColor(250,100,0),2)
# self.l = QtCore.QLineF(-10,0,10,0)
# self.addLine(self.l,pen)
# self.l = QtCore.QLineF(0,-10,0,10)
# self.addLine(self.l,pen)
self.colType = ColorType()
self._update_pipeline_timer = Qt.QTimer()
self._update_pipeline_timer.setSingleShot(True)
self._update_pipeline_timer.timeout.connect(
proxy_method(self, 'update_pipeline_now'))
self.changed.connect(self.update_paths)
def __del__(self):
# print('PipelineScene.__del__')
try:
self._release()
except RuntimeError:
pass # C++ object deleted, attributes are already destroyed
def _release(self):
# print('PipelineScene._release')
if hasattr(self, 'pos'):
del self.pos
if hasattr(self, 'dim'):
del self.dim
if hasattr(self, 'labels'):
del self.labels
if hasattr(self, 'glinks'):
del self.glinks
if 'gnodes' in self.__dict__:
from soma.qt_gui.qt_backend import sip
gnode = None
for gnode in self.gnodes.values():
gnode._release()
self.removeItem(gnode)
sip.transferback(gnode)
del gnode
del self.gnodes
try:
self.changed.disconnect()
except TypeError:
pass # already done
# force delete gnodes: needs to use gc.collect()
import gc
gc.collect()
@property
def userlevel(self):
return self._userlevel
@userlevel.setter
def userlevel(self, value):
if self._userlevel != value:
self._userlevel = value
for name, gnode in self.gnodes.items():
gnode.userlevel = value
self.update_pipeline()
def _add_node(self, name, gnode):
self.addItem(gnode)
################# add by Irmage OM ####################
dim = self.dim.get(name)
# print("_add_node : dim : ",dim," , type =",type(dim).__name__)
if dim is not None:
if isinstance(dim, Qt.QPointF):
dim=(dim.x(),dim.y())
gnode.updateSize(dim[0],dim[1])
#gnode.sizer.setPos(dim[0],dim[1])
# gnode.update_node()
######################################################
pos = self.pos.get(name)
if pos is None:
gnode.setPos(2 * self._pos, self._pos)
self._pos += 100
else:
if not isinstance(pos, Qt.QPointF):
pos = Qt.QPointF(pos[0], pos[1])
gnode.setPos(pos)
self.gnodes[name] = gnode
# gnode.update_node()
#repositioning 'inputs' node
if name == 'inputs':
pos_left_most=(0,0)
for el in self.gnodes:
if el!='inputs' and el!='outputs':
if pos_left_most[0] > self.gnodes[el].pos().x():
pos_left_most=(self.gnodes[el].pos().x(),self.gnodes[el].pos().y())
xl = pos_left_most[0]-(2*self.gnodes[name].boundingRect().size().width())
yl = pos_left_most[1]
self.gnodes[name].setPos(xl,yl)
# gnode.update_node()
#repositioning 'outputs' node
if name == 'outputs':
pos_right_most=(0,0)
for el in self.gnodes:
if el!='inputs' and el!='outputs':
if pos_right_most[0] < self.gnodes[el].pos().x() + self.gnodes[el].boundingRect().size().width() :
pos_right_most=(self.gnodes[el].pos().x() + self.gnodes[el].boundingRect().size().width(),self.gnodes[el].pos().y())
xl = pos_right_most[0]+self.gnodes[name].boundingRect().size().width()
yl = pos_right_most[1]
self.gnodes[name].setPos(xl,yl)
# gnode.update_node()
################" add by Irmage #############################################
self.setSceneRect(QtCore.QRectF())
#############################################################################_node_keydelete_clicked
def add_node(self, node_name, node):
if not isinstance(node, ProcessNode):
process = node
if hasattr(node, 'process'):
process = node.process
if isinstance(node, PipelineNode):
sub_pipeline = process
elif process and isinstance(process, ProcessIteration):
sub_pipeline = process.process
else:
sub_pipeline = None
gnode = NodeGWidget(
node_name, node.plugs, self.pipeline,
sub_pipeline=sub_pipeline, process=process,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view, labels=self.labels,
userlevel=self.userlevel)
self._add_node(node_name, gnode)
gnode.update_node()
return gnode
def add_link(self, source, dest, active, weak):
# print("add link ", source, dest)
source_gnode_name, source_param = source
if not source_gnode_name:
source_gnode_name = 'inputs'
dest_gnode_name, dest_param = dest
if not dest_gnode_name:
dest_gnode_name = 'outputs'
if self.logical_view:
source_param = 'outputs'
dest_param = 'inputs'
try:
typeq = self.typeLink(source_gnode_name, source_param)
# color = self.colorLink(typeq)
color = self.colType.colorLink(typeq)
except Exception:
color = ORANGE_2
# verif=((str(dest_gnode_name), str(dest_param)))
# print(str(verif) in str(self.glinks.keys()))
source_dest = ((str(source_gnode_name), str(source_param)),
(str(dest_gnode_name), str(dest_param)))
if source_dest in self.glinks:
# already done
if self.logical_view:
# keep strongest link representation
glink = self.glinks[source_dest]
if active or glink.active:
active = True
if not weak or not glink.weak:
weak = False
if glink.weak != weak or glink.active != active:
glink.update_activation(active, weak, "current")
return # already done
source_gnode = self.gnodes[source_gnode_name]
dest_gnode = self.gnodes.get(dest_gnode_name)
if dest_gnode is not None:
if dest_param in dest_gnode.in_plugs \
and source_param in source_gnode.out_plugs:
glink = Link(
source_gnode.mapToScene(
source_gnode.out_plugs[source_param].get_plug_point()),
dest_gnode.mapToScene(
dest_gnode.in_plugs[dest_param].get_plug_point()),
active, weak, color)
self.glinks[source_dest] = glink
self.addItem(glink)
def _remove_link(self, source_dest):
source, dest = source_dest
source_gnode_name, source_param = source
if not source_gnode_name:
source_gnode_name = 'inputs'
source_gnode = self.gnodes.get(source_gnode_name)
dest_gnode_name, dest_param = dest
if not dest_gnode_name:
dest_gnode_name = 'outputs'
if self.logical_view:
# is it useful ?
source_param = 'outputs'
dest_param = 'inputs'
dest_gnode = self.gnodes.get(dest_gnode_name)
new_source_dest = ((str(source_gnode_name), str(source_param)),
(str(dest_gnode_name), str(dest_param)))
glink = self.glinks.get(new_source_dest)
if glink is not None:
self.removeItem(glink)
del self.glinks[new_source_dest]
def update_paths(self, regions=[]):
for name, i in six.iteritems(self.gnodes):
self.pos[i.name] = i.pos()
br = i.box.boundingRect()
self.dim[i.name] = (br.width(), br.height())
dropped = []
for source_dest, glink in six.iteritems(self.glinks):
source, dest = source_dest
source_gnode_name, source_param = source
dest_gnode_name, dest_param = dest
source_gnode = self.gnodes[source_gnode_name]
dest_gnode = self.gnodes[dest_gnode_name]
if source_param not in source_gnode.out_plugs \
or dest_param not in dest_gnode.in_plugs:
dropped.append(source_dest)
else:
glink.update(source_gnode.mapToScene(
source_gnode.out_plugs[source_param].get_plug_point()),
dest_gnode.mapToScene(
dest_gnode.in_plugs[dest_param].get_plug_point()))
for source_dest in dropped:
self._remove_link(source_dest)
def set_pipeline(self, pipeline):
self.pipeline = pipeline
self.labels = []
pipeline_inputs = SortedDictionary()
pipeline_outputs = SortedDictionary()
if pipeline is not None:
for name, plug in six.iteritems(pipeline.nodes[''].plugs):
if plug.output:
pipeline_outputs[name] = plug
else:
pipeline_inputs[name] = plug
if pipeline_inputs:
self._add_node(
'inputs', NodeGWidget(
'inputs', pipeline_inputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
for node_name, node in six.iteritems(pipeline.nodes):
if not node_name:
continue
self.add_node(node_name, node)
if pipeline_outputs:
self._add_node(
'outputs', NodeGWidget(
'outputs', pipeline_outputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
for source_node_name, source_node in six.iteritems(pipeline.nodes):
for source_parameter, source_plug \
in six.iteritems(source_node.plugs):
for (dest_node_name, dest_parameter, dest_node, dest_plug,
weak_link) in source_plug.links_to:
if dest_node is pipeline.nodes.get(dest_node_name):
self.add_link(
(source_node_name, source_parameter),
(dest_node_name, dest_parameter),
active=source_plug.activated \
and dest_plug.activated,
weak=weak_link)
def update_pipeline(self):
self._update_pipeline_timer.start(20)
def update_pipeline_now(self):
if self.logical_view:
self._update_logical_pipeline()
else:
self._update_regular_pipeline()
def _update_regular_pipeline(self):
# normal view
pipeline = self.pipeline
removed_nodes = []
# print(self.gnodes)
for node_name, gnode in six.iteritems(self.gnodes):
removed = False
if gnode.logical_view:
gnode.clear_plugs()
gnode.logical_view = False
if node_name in ('inputs', 'outputs'):
node = pipeline.nodes['']
# in case traits have been added/removed
if node_name == 'inputs':
pipeline_inputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if not plug.output:
trait = node.get_trait(name)
if not trait.hidden \
and (trait.userlevel is None
or trait.userlevel <= self.userlevel):
pipeline_inputs[name] = plug
gnode.parameters = pipeline_inputs
if len(gnode.parameters) == 0:
# no inputs: remove the gnode
removed_nodes.append(node_name)
removed = True
else:
pipeline_outputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if plug.output:
trait = node.get_trait(name)
if not trait.hidden \
and (trait.userlevel is None
or trait.userlevel <= self.userlevel):
pipeline_outputs[name] = plug
gnode.parameters = pipeline_outputs
if len(gnode.parameters) == 0:
# no outputs: remove the gnode
removed_nodes.append(node_name)
removed = True
else:
node = pipeline.nodes.get(node_name)
if node is None: # removed node
removed_nodes.append(node_name)
removed = True
continue
if not removed:
gnode.active = node.activated
gnode.update_node()
# handle removed nodes
for node_name in removed_nodes:
gnode = self.gnodes[node_name]
self.removeItem(gnode)
self.gnodes.pop(node_name, None)
self.dim.pop(node_name, None)
self.pos.pop(node_name, None)
from soma.qt_gui.qt_backend import sip
sip.transferback(gnode)
#import objgraph
#objgraph.show_backrefs(gnode)
del gnode
# check for added nodes
added_nodes = []
for node_name, node in six.iteritems(pipeline.nodes):
if node_name == '':
pipeline_inputs = SortedDictionary()
pipeline_outputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if plug.output:
pipeline_outputs[name] = plug
else:
pipeline_inputs[name] = plug
if pipeline_inputs and 'inputs' not in self.gnodes:
self._add_node(
'inputs', NodeGWidget(
'inputs', pipeline_inputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
if pipeline_outputs and 'outputs' not in self.gnodes:
self._add_node(
'outputs', NodeGWidget(
'outputs', pipeline_outputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
elif node_name not in self.gnodes:
process = None
if isinstance(node, Switch):
process = node
if hasattr(node, 'process'):
process = node.process
if isinstance(node, PipelineNode):
sub_pipeline = node.process
else:
sub_pipeline = None
self.add_node(node_name, node)
# links
to_remove = []
for source_dest, glink in six.iteritems(self.glinks):
source, dest = source_dest
source_node_name, source_param = source
dest_node_name, dest_param = dest
if source_node_name == 'inputs':
source_node_name = ''
if dest_node_name == 'outputs':
dest_node_name = ''
source_node = pipeline.nodes.get(source_node_name)
if source_node is None:
to_remove.append(source_dest)
continue
source_plug = source_node.plugs.get(source_param)
dest_node = pipeline.nodes.get(dest_node_name)
if dest_node is None:
to_remove.append(source_dest)
continue
dest_plug = dest_node.plugs.get(dest_param)
remove_glink = False
if source_plug is None or dest_plug is None:
# plug[s] removed
remove_glink = True
else:
active = source_plug.activated and dest_plug.activated
weak = [x[4] for x in source_plug.links_to \
if x[:2] == (dest_node_name, dest_param)]
if len(weak) == 0:
# link removed
remove_glink = True
else:
weak = weak[0]
if remove_glink:
to_remove.append(source_dest)
else:
glink.update_activation(active, weak, "current")
for source_dest in to_remove:
self._remove_link(source_dest)
# check added links
for source_node_name, source_node in six.iteritems(pipeline.nodes):
for source_parameter, source_plug \
in six.iteritems(source_node.plugs):
for (dest_node_name, dest_parameter, dest_node, dest_plug,
weak_link) in source_plug.links_to:
if dest_node is pipeline.nodes.get(dest_node_name):
self.add_link(
(source_node_name, source_parameter),
(dest_node_name, dest_parameter),
active=source_plug.activated \
and dest_plug.activated,
weak=weak_link)
self._update_steps()
def _update_steps(self):
pipeline = self.pipeline
if not hasattr(pipeline, 'pipeline_steps'):
return
steps = pipeline.pipeline_steps
if steps is None:
return
for node_name, node in six.iteritems(pipeline.nodes):
gnode = self.gnodes.get(node_name)
if gnode is None:
continue
labels = ['step: %s' % n for n in steps.user_traits()
if node_name in steps.trait(n).nodes]
#print('update step labels on', node_name, ':', labels)
gnode.update_labels(labels)
def _update_logical_pipeline(self):
# update nodes plugs and links in logical view mode
pipeline = self.pipeline
# nodes state
removed_nodes = []
for node_name, gnode in six.iteritems(self.gnodes):
if not gnode.logical_view:
gnode.clear_plugs()
gnode.logical_view = True
if node_name in ('inputs', 'outputs'):
node = pipeline.nodes['']
else:
node = pipeline.nodes.get(node_name)
if node is None: # removed node
removed_nodes.append(node_name)
continue
gnode.active = node.activated
gnode.update_node()
# handle removed nodes
for node_name in removed_nodes:
self.removeItem(self.gnodes[node_name])
from soma.qt_gui.qt_backend import sip
sip.transferback(self.gnodes[node_name])
del self.gnodes[node_name]
# check for added nodes
added_nodes = []
for node_name, node in six.iteritems(pipeline.nodes):
if node_name == '':
pipeline_inputs = SortedDictionary()
pipeline_outputs = SortedDictionary()
for name, plug in six.iteritems(node.plugs):
if plug.output:
pipeline_outputs['outputs'] = plug
else:
pipeline_inputs['inputs'] = plug
if pipeline_inputs and 'inputs' not in self.gnodes:
self._add_node(
'inputs', NodeGWidget(
'inputs', pipeline_inputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
if pipeline_outputs and 'outputs' not in self.gnodes:
self._add_node(
'outputs', NodeGWidget(
'outputs', pipeline_outputs, pipeline,
process=pipeline,
colored_parameters=self.colored_parameters,
logical_view=self.logical_view,
userlevel=self.userlevel))
elif node_name not in self.gnodes:
process = None
if isinstance(node, Switch):
process = node
if hasattr(node, 'process'):
process = node.process
if isinstance(node, PipelineNode):
sub_pipeline = node.process
else:
sub_pipeline = None
self.add_node(node_name, node)
# links
# delete all links
for source_dest, glink in six.iteritems(self.glinks):
self.removeItem(glink)
self.glinks = {}
# recreate links
for source_node_name, source_node in six.iteritems(pipeline.nodes):
for source_parameter, source_plug \
in six.iteritems(source_node.plugs):
for (dest_node_name, dest_parameter, dest_node, dest_plug,
weak_link) in source_plug.links_to:
if dest_node is pipeline.nodes.get(dest_node_name):
self.add_link(
(source_node_name, source_parameter),
(dest_node_name, dest_parameter),
active=source_plug.activated \
and dest_plug.activated,
weak=weak_link)
self._update_steps()
def set_enable_edition(self, state=True):
self._enable_edition = state
def edition_enabled(self):
return self._enable_edition
[docs]
def keyPressEvent(self, event):
super(PipelineScene, self).keyPressEvent(event)
if not event.isAccepted():
if event.key() == QtCore.Qt.Key_P:
# print position of boxes
event.accept()
pview = self.parent()
pview.print_node_positions()
elif event.key() == QtCore.Qt.Key_T:
for item in self.items():
if isinstance(item, boxItem):
item.focusOutEvent(Qt.QFocusEvent(Qt.QEvent.FocusOut))
# toggle logical / full view
pview = self.parent()
pview.switch_logical_view()
event.accept()
elif event.key() == QtCore.Qt.Key_A:
# auto-set nodes positions
pview = self.parent()
pview.auto_dot_node_positions()
# elif Qt.QKeySequence(event.key()+int(event.modifiers())) == Qt.QKeySequence("Ctrl+Z"):
# self.undoTyping_clicked.emit()
[docs]
def link_tooltip_text(self, source_dest):
'''Tooltip text for the fiven link
Parameters
----------
source_dest: tuple (2 tuples of 2 strings)
link description:
((source_node, source_param), (dest_node, dest_param))
'''
source_node_name = source_dest[0][0]
dest_node_name = source_dest[1][0]
if source_node_name in ('inputs', 'outputs'):
proc = self.pipeline
source_node_name = ''
source_node = self.pipeline.nodes[source_node_name]
else:
source_node = self.pipeline.nodes[source_node_name]
proc = source_node
if hasattr(source_node, 'process'):
proc = source_node.process
if dest_node_name in ('inputs', 'outputs'):
dest_node_name = ''
splug = source_node.plugs[source_dest[0][1]]
link = [l for l in splug.links_to \
if l[0] == dest_node_name and l[1] == source_dest[1][1]][0]
if splug.activated and link[3].activated:
active = '<font color="#ffa000">activated</font>'
else:
active = '<font color="#a0a0a0">inactive</font>'
if link[4]:
weak = '<font color="#e0c0c0">weak</font>'
else:
weak = '<b>strong</b>'
name = source_dest[0][1]
value = getattr(proc, name)
# trait = proc.user_traits()[name]
trait_type = proc.user_traits()[name].trait_type
trait_type_str = str(trait_type)
trait_type_str = trait_type_str[: trait_type_str.find(' object ')]
trait_type_str = trait_type_str[trait_type_str.rfind('.') + 1:]
inst_type = self.get_instance_type_string(value)
typestr = ('%s (%s)' % (inst_type, trait_type_str)).replace(
'<', '').replace('>', '')
msg = '''<h3>%s</h3>
<table cellspacing="6">
<tr>
<td><b>Link:</b></td>
<td>%s</td>
<td>%s</td>
</tr>
</table>
<table>
<tr>
<td><b>type:</b></td>
<td>%s</td>
</tr>
<tr>
<td><b>value:</b></td>
<td>%s</td>
</tr>
''' \
% (source_dest[0][1], active, weak, typestr,
html.escape(str(value)))
if isinstance(trait_type, traits.File) \
or isinstance(trait_type, traits.Directory) \
or isinstance(trait_type, traits.Any):
if self.is_existing_path(value):
msg += ''' <tr>
<td></td>
<td>existing path</td>
</tr>
'''
elif not isinstance(trait_type, traits.Any):
msg += ''' <tr>
<td></td>
<td><font color="#a0a0a0">non-existing path</font></td>
</tr>
'''
msg += '</table>'
return msg
@staticmethod
def get_instance_type_string(value):
if value is None:
return 'None'
if value is traits.Undefined:
return 'Undefined'
if isinstance(value, (list, traits.TraitListObject)):
return 'list'
return type(value).__name__
@staticmethod
def is_existing_path(value):
if value not in (None, traits.Undefined) \
and type(value) in (str, six.text_type) and os.path.exists(value):
return True
return False
@staticmethod
def html_doc(doc_text):
# TODO: sphinx transform
text = doc_text.replace('<', '<')
text = text.replace('>', '>')
return text
[docs]
def plug_tooltip_text(self, node, name):
'''Tooltip text for a node plug
'''
if node.name in ('inputs', 'outputs'):
proc = self.pipeline
splug = self.pipeline.pipeline_node.plugs[name]
else:
src = self.pipeline.nodes[node.name]
splug = src.plugs.get(name)
if not splug:
return None
proc = src
if hasattr(src, 'process'):
proc = src.process
if splug.output:
output = '<font color="#d00000">output</font>'
else:
output = '<font color="#00d000">input</font>'
if splug.enabled:
enabled = 'enabled'
else:
enabled = '<font color="#a0a0a0">disabled</font>'
if splug.activated:
activated = 'activated'
else:
activated = '<font color="#a0a0a0">inactive</font>'
if splug.optional:
optional = '<font color="#00d000">optional</font>'
else:
optional = 'mandatory'
value = getattr(proc, name)
trait = proc.user_traits()[name]
trait_type = trait.trait_type
trait_type_str = trait_type.__class__.__name__
if trait.output and trait.input_filename is False:
trait_type_str += ', output filename'
typestr = ('%s (%s)' % (self.get_instance_type_string(value),
trait_type_str)).replace(
'<', '').replace('>', '')
msg = '''<h3>%s</h3>
<table cellspacing="6">
<tr>
<td><b>Plug:</b></td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
<td>%s</td>
</tr>
</table>
<table>
<tr>
<td><b>type:</b></td>
<td>%s</td>
</tr>
<tr>
<td><b>value:</b></td>
<td>%s</td>
</tr>
''' \
% (name, output, optional, enabled, activated, typestr,
html.escape(str(value)))
if isinstance(trait_type, traits.File) \
or isinstance(trait_type, traits.Directory) \
or isinstance(trait_type, traits.Any):
if self.is_existing_path(value):
msg += ''' <tr>
<td></td>
<td>existing path</td>
</tr>
'''
elif not isinstance(trait_type, traits.Any):
msg += ''' <tr>
<td></td>
<td><font color="#a0a0a0">non-existing path</font></td>
</tr>
'''
msg += '</table>'
desc = trait.desc
if desc:
msg += '\n<h3>Description:</h3>\n'
msg += self.html_doc(desc)
return msg
def node_tooltip_text(self, node):
process = node.process
msg = getattr(process, '__doc__', '')
# msg = self.html_doc(doc)
return msg
def _parentgnode(self, item):
if qt_backend.get_qt_backend() != 'PyQt5':
return item.parentItem()
# in PyQt5 (certain versions at least, Ubuntu 16.04) parentItem()
# returns something inappropriate, having the wrong type
# QGraphicsVideoItem, probably a cast mistake, and which leads to
# a segfault, so we have to get it a different way.
nodes = [node for node in self.gnodes.values()
if item in node.childItems()]
if len(nodes) == 1:
return nodes[0]
[docs]
def helpEvent(self, event):
'''
Display tooltips on plugs and links
'''
if self.logical_view:
event.setAccepted(False)
super(PipelineScene, self).helpEvent(event)
return
item = self.itemAt(event.scenePos(), Qt.QTransform())
if isinstance(item, Link):
for source_dest, glink in six.iteritems(self.glinks):
if glink is item:
text = self.link_tooltip_text(source_dest)
item.setToolTip(text)
break
elif isinstance(item, Plug):
node = self._parentgnode(item)
found = False
for name, plug in six.iteritems(node.in_plugs):
if plug is item:
found = True
break
if not found:
for name, plug in six.iteritems(node.out_plugs):
if plug is item:
found = True
break
if found:
text = self.plug_tooltip_text(node, name)
item.setToolTip(text)
elif isinstance(item, QtGui.QGraphicsRectItem):
node = self._parentgnode(item)
if isinstance(node, NodeGWidget):
text = self.node_tooltip_text(node)
item.setToolTip(text)
elif isinstance(item, QtGui.QGraphicsProxyWidget):
# PROBLEM: tooltips in child graphics scenes seem not to popup.
#
# to force them we would have to translate the event position to
# the sub-scene position, and call the child scene helpEvent()
# method, with a custom event.
# However this is not possible, since QGraphicsSceneHelpEvent
# does not provide a public (nor even protected) constructor, and
# secondarily helpEvent() is protected.
event.setAccepted(False)
super(PipelineScene, self).helpEvent(event)
def remove_node(self, node_name):
print(self.gnodes)
gnode = self.gnodes.get(node_name)
if gnode is None:
# already done (possibly via a notification)
return
todel = set()
from soma.qt_gui.qt_backend import sip
for link, glink in six.iteritems(self.glinks):
if link[0][0] == node_name or link[1][0] == node_name:
self.removeItem(glink)
todel.add(link)
for link in todel:
del self.glinks[link]
self.removeItem(gnode)
sip.transferback(self.gnodes[node_name])
del self.gnodes[node_name]
def _link_right_clicked(self, link):
# find the link in list
# print('Scene._link_right_clicked:', link)
for source_dest, glink in six.iteritems(self.glinks):
if glink is link:
self.link_right_clicked.emit(
source_dest[0][0], source_dest[0][1],
source_dest[1][0], source_dest[1][1])
break
def _link_keydelete_clicked(self, link):
for source_dest, glink in six.iteritems(self.glinks):
if glink is link:
self.link_keydelete_clicked.emit(
source_dest[0][0], source_dest[0][1],
source_dest[1][0], source_dest[1][1])
break
def _node_keydelete_clicked(self, node):
self.node_keydelete_clicked.emit(node.name)
def typeLink(self, name_node, name_plug):
if name_node in ('inputs', 'outputs'):
proc = self.pipeline
splug = self.pipeline.pipeline_node.plugs[name_plug]
else:
src = self.pipeline.nodes[name_node]
splug = src.plugs[name_plug]
proc = src
if hasattr(src, 'process'):
proc = src.process
value = getattr(proc, name_plug)
trait = proc.user_traits()[name_plug]
trait_type = trait.trait_type
trait_type_str = str(trait_type)
trait_type_str = trait_type_str[: trait_type_str.find(' object ')]
trait_type_str = trait_type_str[trait_type_str.rfind('.') + 1:]
return trait_type_str
[docs]
class PipelineDeveloperView(QGraphicsView):
'''
Pipeline representation as a graph, using boxes and arrows.
Based on Qt QGraphicsView, this can be used as a Qt QWidget.
Qt signals are emitted on a right click on a node box, and on a double
click on a sub-pipeline box, to allow handling at a higher level. Default
behaviors can be enabled using constructor parameters.
Ctrl + double click opens sub-pipelines in embedded views inside their
parent box.
Attributes
----------
subpipeline_clicked
node_right_clicked
process_clicked
node_clicked
node_clicked_ctrl
plug_clicked
plug_right_clicked
link_right_clicked
colored_parameters
scene
Methods
-------
__init__
set_pipeline
is_logical_view
set_logical_view
zoom_in
zoom_out
openProcessController
add_embedded_subpipeline
onLoadSubPipelineClicked
onOpenProcessController
enableNode
enable_step
disable_preceding_steps
disable_following_steps
enable_preceding_steps
enable_following_steps
set_switch_value
disable_done_steps
enable_all_steps
check_files
auto_dot_node_positions
save_dot_image_ui
reset_initial_nodes_positions
window
'''
subpipeline_clicked = QtCore.Signal(str, Process,
QtCore.Qt.KeyboardModifiers)
'''Signal emitted when a sub pipeline has to be open.'''
process_clicked = QtCore.Signal(str, Process)
node_clicked = QtCore.Signal(str, Node)
'''Signal emitted when a node box has to be open.'''
node_clicked_ctrl = QtCore.Signal(str, Process)
'''Signal emitted when a node box has to be in the foreground.'''
switch_clicked = QtCore.Signal(str, Switch)
'''Signal emitted when a switch box has to be open.'''
node_right_clicked = QtCore.Signal(str, Controller)
'''Signal emitted when a node box is right-clicked'''
plug_clicked = QtCore.Signal(str)
'''Signal emitted when a plug is clicked'''
plug_right_clicked = QtCore.Signal(str)
'''Signal emitted when a plug is right-clicked'''
link_right_clicked = QtCore.Signal(str, str, str, str)
'''Signal emitted when a link is right-clicked'''
edit_sub_pipeline = QtCore.Signal(Pipeline)
'''Signal emitted when a sub-pipeline has to be edited'''
open_filter = QtCore.Signal(str)
'''Signal emitted when an Input Filter has to be opened'''
export_to_db_scans = QtCore.Signal(str)
'''Signal emitted when an Input Filter has to be linked to database_scans'''
link_keydelete_clicked = QtCore.Signal(str, str, str, str)
node_keydelete_clicked = QtCore.Signal(str)
scene = None
'''
type: PipelineScene
the main scene.
'''
colored_parameters = True
'''
If enabled (default), parameters in nodes boxes are displayed with color
codes representing their state, and the state of their values: output
parameters, empty values, existing files, non-existing files...
When colored_parameters is set, however, callbacks have to be installed to
track changes in traits values, so this actually has an overhead.
When colored_parameters is used, the color code is as follows:
* black pamameter name: input
* red parameter name: output
* italics parameter name: Undefined, None, or empty string value
* bold parameter name: existing file or directory name
* regular font parameter name: non-existing file, or non-file parameter type
* black plug: mandatory
* green plug: optional
* grey plug: mandatory, inactive
* light green plug: optional, inactive
* grey link: inactive
* orange link: active
* dotted line link: weak link
'''
[docs]
class ProcessNameEdit(Qt.QLineEdit):
''' A specialized QLineEdit with completion for process name
'''
def __init__(self, parent=None,
class_type_check=process_instance.is_process):
super(PipelineDeveloperView.ProcessNameEdit,
self).__init__(parent)
self.compl = QtGui.QCompleter([])
self.setCompleter(self.compl)
self.textEdited.connect(self.on_text_edited)
self.py_cache = {} # cache for loaded python files
self.class_type_check = class_type_check
@staticmethod
def _execfile(filename):
glob_dict = {}
exec(compile(open(filename, "rb").read(), filename, 'exec'),
glob_dict, glob_dict)
return glob_dict
def load_py(self, filename):
if filename not in self.py_cache:
try:
self.py_cache[filename] = self._execfile(filename)
except Exception as e:
print('exception while executing file %s:' % filename, e)
return {}
return self.py_cache[filename]
def get_processes_or_modules(self, filename):
file_dict = self.load_py(filename)
processes = []
for name, item in six.iteritems(file_dict):
if self.class_type_check(item) or inspect.ismodule(item):
processes.append(name)
return processes
def on_text_edited(self, text):
compl = set()
modpath = str(text).split('.')
current_mod = None
paths = []
sel = set()
mod = None
if len(modpath) > 1:
current_mod = '.'.join(modpath[:-1])
try:
mod = importlib.import_module(current_mod)
except ImportError:
mod = None
if mod:
if os.path.basename(mod.__file__).startswith(
'__init__.py'):
paths = [os.path.dirname(mod.__file__)]
# add process/pipeline objects in current_mod
procs = [item for k, item
in six.iteritems(mod.__dict__)
if self.class_type_check(item)
or inspect.ismodule(item)]
compl.update(['.'.join([current_mod, c.__name__])
for c in procs])
if not mod:
# no current module
# is it a path name ?
pathname, filename = os.path.split(str(text))
if os.path.isdir(pathname):
# look for class in python file filename.py#classname
elements = filename.split('.py#')
if len(elements) == 2:
filename = elements[0] + '.py'
object_name = elements[1]
full_path = os.path.join(pathname, filename)
processes = self.get_processes_or_modules(full_path)
if object_name != '':
processes = [p for p in processes
if p.startswith(object_name)]
compl.update(['#'.join((full_path, p))
for p in processes])
else:
# look for matching xml files
for f in os.listdir(pathname):
if (f.endswith('.xml') or f.endswith('.json')
or os.path.isdir(os.path.join(pathname,
f))) \
and f.startswith(filename):
compl.add(os.path.join(pathname, f))
elif f.endswith('.py'):
compl.add(os.path.join(pathname, f))
else:
paths = sys.path
for path in paths:
if path == '':
path = '.'
try:
for f in os.listdir(path):
if f.endswith('.py'):
sel.add(f[:-3])
elif f.endswith('.pyc') or f.endswith('.pyo'):
sel.add(f[:-4])
elif f.endswith('.xml') or f.endswith('.json'):
sel.add(f)
elif '.' not in f \
and os.path.isdir(os.path.join(
path, f)):
sel.add(f)
except OSError:
pass
begin = modpath[-1]
cm = []
if current_mod is not None:
cm = [current_mod]
compl.update(['.'.join(cm + [f]) for f in sel \
if f.startswith(modpath[-1])])
model = self.compl.model()
model.setStringList(list(compl))
[docs]
def __init__(self, pipeline=None, parent=None, show_sub_pipelines=False,
allow_open_controller=False, logical_view=False,
enable_edition=False, userlevel=0):
'''PipelineDeveloperView
Parameters
----------
pipeline: Pipeline (optional)
pipeline object to be displayed
If omitted an empty pipeline will be used, and edition mode will be
activated.
parent: QWidget (optional)
parent widget
show_sub_pipelines: bool (optional)
if set, sub-pipelines will appear as red/pink boxes and a double
click on one of them will open another window with the sub-pipeline
structure in it
allow_open_controller: bool (optional)
if set, a right click on any box will open another window with the
underlying node controller, allowing to see and edit parameters
values, switches states, etc.
logical_view: bool (optional)
if set, plugs and links between plugs are hidden, only links
between nodes are displayed.
enable_edition: bool (optional)
if set, pipeline edition features are available in GUI and menus:
adding process boxes, drawing links etc. If pipeline is not
specified, then edition will be activated anyway.
'''
super(PipelineDeveloperView, self).__init__(parent)
# self.setAlignment(QtCore.Qt.AlignTop | QtCore.Qt.AlignLeft)
self.setAlignment(QtCore.Qt.AlignCenter)
self.centerOn(0,0)
self.setRenderHints(Qt.QPainter.Antialiasing | Qt.QPainter.SmoothPixmapTransform)
self.setBackgroundBrush(QtGui.QColor(60, 60, 60))
self.scene = None
self.colored_parameters = True
self._show_sub_pipelines = show_sub_pipelines
self._allow_open_controller = allow_open_controller
self._logical_view = logical_view
self._enable_edition = enable_edition
self._pipeline_filename = ""
self._restricted_edition = False
self.disable_overwrite = False
self._userlevel = userlevel
self.doc_browser = None
self.set_pipeline(pipeline)
self._grab = False
self._grab_link = False
self.plug_clicked.connect(self._plug_clicked)
self.plug_right_clicked.connect(self._plug_right_clicked)
self.link_right_clicked.connect(self._link_clicked)
self.node_clicked_ctrl.connect(self._node_clicked_ctrl)
self.link_keydelete_clicked.connect(self._link_delete_clicked)
self.node_keydelete_clicked.connect(self._node_delete_clicked)
def __del__(self):
# print('PipelineDeveloperView.__del__')
self.release_pipeline(delete=True)
# super(PipelineDeveloperView, self).__del__()
@property
def userlevel(self):
return self._userlevel
@userlevel.setter
def userlevel(self, value):
self._userlevel = value
if self.scene:
self.scene.userlevel = value
for widget in self.findChildren(QtGui.QWidget):
if hasattr(widget, 'userlevel'):
widget.userlevel = value
[docs]
def ensure_pipeline(self, pipeline):
'''
Check that we have a pipeline or a process
'''
if pipeline is None:
pipeline = Pipeline()
enable_edition = True
if not isinstance(pipeline, Pipeline):
if isinstance(pipeline, Process):
process = pipeline
pipeline = Pipeline()
pipeline.set_study_config(process.get_study_config())
pipeline.add_process(process.name, process)
pipeline.autoexport_nodes_parameters()
pipeline.node_position["inputs"] = (0., 0.)
pipeline.node_position[process.name] = (300., 0.)
pipeline.node_position["outputs"] = (600., 0.)
# pipeline.scene_scale_factor = 0.5
pipeline.node_dimension[process.name] = (300., 200.) #add by Irmage OM
else:
raise Exception("Expect a Pipeline or a Process, not a "
"'{0}'.".format(repr(pipeline)))
return pipeline
def _set_pipeline(self, pipeline):
pos = {}
dim = {}
if self.scene:
pos = self.scene.pos
dim = self.scene.dim #add by Irmage OM
# pprint(dict((i, (j.x(), j.y())) for i, j in six.iteritems(pos)))
if hasattr(pipeline, 'node_position'):
for i, j in six.iteritems(pipeline.node_position):
if isinstance(j, QtCore.QPointF):
pos[i] = j
else:
pos[i] = QtCore.QPointF(*j)
############### add by Irmage OM #######################
if hasattr(pipeline, 'node_dimension'):
for i, j in six.iteritems(pipeline.node_dimension):
if isinstance(j, QtCore.QPointF):
dim[i] = (j.x(), j.y())
else:
dim[i] = j
# print("_set_pipeline : ",pos," ; ",dim)
#######################################################
self.release_pipeline()
self.scene.set_pipeline(pipeline)
self.scene.pos = pos
self.scene.dim = dim
if pipeline is not None:
self.setWindowTitle(pipeline.name)
# Try to initialize the scene scale factor
if hasattr(pipeline, "scene_scale_factor"):
self.scale(
pipeline.scene_scale_factor, pipeline.scene_scale_factor)
self.reset_initial_nodes_positions()
################" add by Irmage #############################################
self.fitInView(self.sceneRect(), QtCore.Qt.KeepAspectRatio)
#############################################################################
[docs]
def set_pipeline(self, pipeline):
'''
Assigns a new pipeline to the view.
'''
pipeline = self.ensure_pipeline(pipeline)
self._set_pipeline(pipeline)
if pipeline is not None:
# Setup callback to update view when pipeline state is modified
pipeline.on_trait_change(self._reset_pipeline, 'selection_changed',
dispatch='ui')
pipeline.on_trait_change(self._reset_pipeline,
'user_traits_changed', dispatch='ui')
if hasattr(pipeline, 'pipeline_steps'):
pipeline.pipeline_steps.on_trait_change(
self._reset_pipeline, dispatch='ui')
[docs]
def release_pipeline(self, delete=False):
'''
Releases the pipeline currently viewed (and remove the callbacks)
If ``delete`` is set, this means the view is within deletion process
and a new scene should not be built
'''
# Setup callback to update view when pipeline state is modified
from soma.qt_gui.qt_backend import sip
pipeline = None
if self.scene is not None and hasattr(self.scene, 'pipeline'):
pipeline = self.scene.pipeline
if pipeline is not None:
if hasattr(pipeline, 'pipeline_steps'):
pipeline.pipeline_steps.on_trait_change(
self._reset_pipeline, remove=True)
pipeline.on_trait_change(self._reset_pipeline, 'selection_changed',
remove=True)
pipeline.on_trait_change(self._reset_pipeline,
'user_traits_changed', remove=True)
if sip.isdeleted(self):
# prevent 'C++ object has been deleted' error
return
self.setScene(None)
if self.scene:
# force destruction of scene internals now that the Qt object
# still exists
self.scene._release()
# the scene is not deleted after all refs are released, even
# after self.setScene(None). This is probably a bug in PyQt:
# the C++ layer keeps ownership of the scene, whereas it should
# not: the Qt doc specifies for QGraphicsView.setScene():
# "The view does not take ownership of scene.", however in PyQt it
# does, and only releases it when the QGraphicsView is deleted.
# Thus we have to force it by hand:
from soma.qt_gui.qt_backend import sip
sip.transferback(self.scene)
self.scene = None
import gc
gc.collect()
if not delete and (pipeline is not None or self.scene is None):
self.scene = PipelineScene(self, userlevel=self.userlevel)
self.scene.set_enable_edition(self._enable_edition)
self.scene.logical_view = self._logical_view
self.scene.colored_parameters = self.colored_parameters
self.scene.subpipeline_clicked.connect(self.subpipeline_clicked)
self.scene.subpipeline_clicked.connect(self.onLoadSubPipelineClicked)
self.scene.process_clicked.connect(self._node_clicked)
self.scene.node_clicked.connect(self._node_clicked)
self.scene.node_clicked_ctrl.connect(self._node_clicked_ctrl)
self.scene.switch_clicked.connect(self.switch_clicked)
self.scene.node_right_clicked.connect(self.node_right_clicked)
self.scene.node_right_clicked.connect(self.onOpenProcessController)
self.scene.plug_clicked.connect(self.plug_clicked)
self.scene.plug_right_clicked.connect(self.plug_right_clicked)
self.scene.link_right_clicked.connect(self.link_right_clicked)
self.scene.link_keydelete_clicked.connect(self.link_keydelete_clicked)
self.scene.node_keydelete_clicked.connect(self.node_keydelete_clicked)
self.scene.pos = {}
self.scene.dim = {}
self.setWindowTitle('<no pipeline>')
self.setScene(self.scene)
[docs]
def is_logical_view(self):
'''
in logical view mode, plugs and links between plugs are hidden, only
links between nodes are displayed.
'''
return self._logical_view
[docs]
def set_logical_view(self, state):
'''
in logical view mode, plugs and links between plugs are hidden, only
links between nodes are displayed.
Parameters
----------
state: bool (mandatory)
to set/unset the logical view mode
'''
self._logical_view = state
self._reset_pipeline()
def _reset_pipeline(self):
# self._set_pipeline(pipeline)
self.scene.logical_view = self._logical_view
self.scene.update_pipeline()
[docs]
def zoom_in(self):
'''
Zoom the view in, applying a 1.2 zoom factor
'''
scf = 1.2
self.scale(scf, scf)
cur_pos = self.mapFromGlobal(Qt.QCursor.pos())
c_pos = Qt.QPointF(self.width() / 2, self.height() / 2)
p = (cur_pos - c_pos) * (scf - 1.)
self.horizontalScrollBar().setValue(
self.horizontalScrollBar().value() + int(p.x()))
self.verticalScrollBar().setValue(
self.verticalScrollBar().value() + int(p.y()))
[docs]
def zoom_out(self):
'''
Zoom the view out, applying a 1/1.2 zool factor
'''
scf = 1. / 1.2
self.scale(scf, scf)
cur_pos = self.mapFromGlobal(Qt.QCursor.pos())
c_pos = Qt.QPointF(self.width() / 2, self.height() / 2)
p = (cur_pos - c_pos) * (scf - 1.)
self.horizontalScrollBar().setValue(
self.horizontalScrollBar().value() + int(p.x()))
self.verticalScrollBar().setValue(
self.verticalScrollBar().value() + int(p.y()))
[docs]
def edition_enabled(self):
'''
Get the editable state
'''
return self._enable_edition
[docs]
def set_enable_edition(self, state=True):
'''
Set the editable state. Edition allows to modify a pipeline: adding /
removing process boxes and switches, drawing links, etc.
'''
self._enable_edition = state
self.scene.set_enable_edition(state)
[docs]
def is_restricted_edition_mode(self):
'''
Get the restricted mode status
Returns
-------
enabled: bool
'''
return self._restricted_edition
[docs]
def set_restricted_edition_mode(self, enabled):
'''
Set the restricted edition mode. In restricted mode, some background
menu actions ("add process", "open node controller"...) are not
available.
Parameters
----------
enabled: bool
'''
self._restricted_edition = enabled
[docs]
def wheelEvent(self, event):
done = False
if event.modifiers() == QtCore.Qt.ControlModifier:
item = self.itemAt(event.pos())
if not isinstance(item, QtGui.QGraphicsProxyWidget):
done = True
if qt_backend.get_qt_backend() == 'PyQt5':
delta = event.angleDelta().y()
else:
delta = event.delta()
if delta < 0:
self.zoom_out()
else:
self.zoom_in()
event.accept()
if not done:
super(PipelineDeveloperView, self).wheelEvent(event)
[docs]
def mousePressEvent(self, event):
super(PipelineDeveloperView, self).mousePressEvent(event)
if not event.isAccepted():
if event.button() == QtCore.Qt.RightButton:
self.open_background_menu()
else:
self._grab = True
self._grabpos = event.pos()
# print("background clicked")
for source_dest, glink in six.iteritems(self.scene.glinks):
glink.fonced_viewer(False)
for node_name, gnode in six.iteritems(self.scene.gnodes):
gnode.fonced_viewer(False)
[docs]
def mouseReleaseEvent(self, event):
self._grab = False
if self._grab_link:
event.accept()
try:
self._release_grab_link(event)
except Exception as e:
print("source to destination types are not compatible")
print(e)
super(PipelineDeveloperView, self).mouseReleaseEvent(event)
self.scene.update()
[docs]
def mouseMoveEvent(self, event):
if self._grab:
event.accept()
translation = event.pos() - self._grabpos
self._grabpos = event.pos()
self.horizontalScrollBar().setValue(
self.horizontalScrollBar().value() - int(translation.x()))
self.verticalScrollBar().setValue(
self.verticalScrollBar().value() - int(translation.y()))
elif self._grab_link:
self._move_grab_link(event)
event.accept()
else:
super(PipelineDeveloperView, self).mouseMoveEvent(event)
[docs]
def dragEnterEvent(self, event):
"""Event handler when the mouse enters the widget.
:param event: event
"""
if event.mimeData().hasFormat('component/name'):
event.accept()
[docs]
def dragMoveEvent(self, event):
"""Event handler when the mouse moves in the widget.
:param event: event
"""
if event.mimeData().hasFormat('component/name'):
event.accept()
[docs]
def dropEvent(self, event):
"""Event handler when something is dropped in the widget.
:param event: event
"""
if event.mimeData().hasFormat('component/name'):
self.click_pos = QtGui.QCursor.pos()
path = bytes(event.mimeData().data('component/name'))
self.drop_process(path.decode('utf8'))
[docs]
def drop_process(self, path):
"""Find the dropped process in the system's paths.
:param path: class's path (e.g. "nipype.interfaces.spm.Smooth") (str)
"""
package_name, process_name = os.path.splitext(path)
process_name = process_name[1:]
__import__(package_name)
pkg = sys.modules[package_name]
for name, instance in sorted(list(pkg.__dict__.items())):
if name == process_name:
if issubclass(instance, Node):
# it's a node
try:
QtGui.QApplication.setOverrideCursor(
QtCore.Qt.WaitCursor)
self.add_named_node(None, instance)
QtGui.QApplication.restoreOverrideCursor()
return
except Exception as e:
print(e)
return
try:
process = get_process_instance(instance)
except Exception as e:
print(e)
return
else:
QtGui.QApplication.setOverrideCursor(QtCore.Qt.WaitCursor)
self.add_named_process(instance)
QtGui.QApplication.restoreOverrideCursor()
[docs]
def add_embedded_subpipeline(self, subpipeline_name, scale=None):
'''
Adds an embedded sub-pipeline inside its parent node.
'''
gnode = self.scene.gnodes.get(str(subpipeline_name))
if gnode is not None:
sub_pipeline \
= self.scene.pipeline.nodes[str(subpipeline_name)].process
gnode.add_subpipeline_view(
sub_pipeline, self._allow_open_controller, scale=scale)
[docs]
def onLoadSubPipelineClicked(self, node_name, sub_pipeline, modifiers):
""" Event to load a open a sub-pipeline view.
If ctrl is pressed the new view will be embedded in its parent node
box.
"""
if self._show_sub_pipelines:
if modifiers & QtCore.Qt.ControlModifier:
try:
self.add_embedded_subpipeline(node_name)
return
except KeyError:
print('node not found in:')
print(list(self.scene.gnodes.keys()))
sub_view = PipelineDeveloperView(
sub_pipeline,
show_sub_pipelines=self._show_sub_pipelines,
allow_open_controller=self._allow_open_controller,
enable_edition=self.edition_enabled(),
logical_view=self._logical_view, userlevel=self.userlevel)
# set self.window() as QObject parent (not QWidget parent) to
# prevent the sub_view to close/delete immediately
QtCore.QObject.setParent(sub_view, self.window())
sub_view.setAttribute(QtCore.Qt.WA_DeleteOnClose)
sub_view.setWindowTitle(node_name)
sub_view.doc_browser = self
self.scene.update()
sub_view.show()
[docs]
def window(self):
'''
window() is overloaded from QWidget.window() to handle embedded views
cases.
A PipelineDeveloperView may be displayed inside a NodeGWidget.
In this case, we want to go up to the parent scene's window to the
"real" top window, where QWidget.window() will end in the current
graphics scene.
'''
if hasattr(self, '_graphics_item'):
return self._graphics_item.scene().views()[0].window()
else:
return super(PipelineDeveloperView, self).window()
[docs]
def onOpenProcessController(self, node_name, process):
""" Event to open a sub-process/sub-pipeline controller
"""
if self._allow_open_controller:
self.open_node_menu(node_name, process)
[docs]
def openProcessController(self):
sub_view = QtGui.QScrollArea()
node_name = self.current_node_name
if node_name in ('inputs', 'outputs'):
node_name = ''
process = self.scene.pipeline.nodes[node_name]
if hasattr(process, 'process'):
process = process.process
# force instantiating a completion engine (since
# AttributedProcessWidget does not force it)
if hasattr(process, 'get_study_config'): # exclude custom nodes
engine = process.get_study_config().engine
from capsul.attributes.completion_engine \
import ProcessCompletionEngine
ce = ProcessCompletionEngine.get_completion_engine(process)
cwidget = AttributedProcessWidget(
process, enable_attr_from_filename=True, enable_load_buttons=True,
userlevel=self.userlevel)
sub_view.setWidget(cwidget)
sub_view.setWidgetResizable(True)
sub_view.setAttribute(QtCore.Qt.WA_DeleteOnClose)
sub_view.setWindowTitle(self.current_node_name)
# try to resize to a width that doesn't need an horizontal scrollbar
sub_view.resize(
cwidget.controller_widget.parent().parent().sizeHint().width(),
sub_view.sizeHint().height())
sub_view.show()
# set self.window() as QObject parent (not QWidget parent) to
# prevent the sub_view to close/delete immediately
QtCore.QObject.setParent(sub_view, self.window())
def emit_export_to_db_scans(self):
self.export_to_db_scans.emit(self.current_node_name)
def emit_open_filter(self):
self.open_filter.emit(self.current_node_name)
def emit_edit_sub_pipeline(self):
node = self.scene.pipeline.nodes[self.current_node_name]
sub_pipeline = node.process
if isinstance(sub_pipeline, weakref.ProxyTypes):
# get the "real" object
sub_pipeline = sub_pipeline.__init__.__self__
self.edit_sub_pipeline.emit(sub_pipeline)
[docs]
def show_optional_outputs(self):
'''
Added to choose to visualize optional outputs.
'''
gnode = self.scene.gnodes[self.current_node_name]
connected_plugs = []
# The show_opt_outputs attribute is not changed yet
if gnode.show_opt_outputs:
# Verifying that the plugs are not connected to another node
for param, pipeline_plug in six.iteritems(gnode.parameters):
output = (not pipeline_plug.output if gnode.name in (
'inputs', 'outputs') else pipeline_plug.output)
if output:
if pipeline_plug.optional and pipeline_plug.links_to and gnode.show_opt_outputs:
connected_plugs.append(param)
if connected_plugs:
if len(connected_plugs) == 1:
text = "Please remove links from this plug:\n"
else:
text = "Please remove links from these plugs:\n"
for plug_name in connected_plugs:
text += plug_name + ", "
text = text[:-2] + '.'
msg = QMessageBox()
msg.setIcon(QMessageBox.Critical)
msg.setText(text)
msg.setWindowTitle("Error while changing the view of the node")
msg.setStandardButtons(QMessageBox.Ok | QMessageBox.Cancel)
msg.exec_()
return
# Changing the show_opt_outputs attribute
gnode.change_output_view()
self.scene.update_pipeline()
[docs]
def enableNode(self, checked):
if self.current_node_name in ['inputs', 'outputs']:
node_name = ''
else:
node_name = self.current_node_name
self.scene.pipeline.nodes[node_name].enabled = checked
[docs]
def enable_step(self, step_name, state):
setattr(self.scene.pipeline.pipeline_steps, step_name, state)
[docs]
def disable_preceding_steps(self, step_name, dummy):
# don't know why we get this additional dummy parameter (False)
steps = self.scene.pipeline.pipeline_steps
for step in steps.user_traits():
if step == step_name:
break
setattr(steps, step, False)
[docs]
def disable_following_steps(self, step_name, dummy):
steps = self.scene.pipeline.pipeline_steps
found = False
for step in steps.user_traits():
if found:
setattr(steps, step, False)
elif step == step_name:
found = True
[docs]
def enable_preceding_steps(self, step_name, dummy):
steps = self.scene.pipeline.pipeline_steps
for step in steps.user_traits():
if step == step_name:
break
setattr(steps, step, True)
[docs]
def enable_following_steps(self, step_name, dummy):
steps = self.scene.pipeline.pipeline_steps
found = False
for step in steps.user_traits():
if found:
setattr(steps, step, True)
elif step == step_name:
found = True
[docs]
def set_switch_value(self, switch, value, dummy):
switch.switch = value
[docs]
def disable_done_steps(self):
pipeline_tools.disable_runtime_steps_with_existing_outputs(
self.scene.pipeline)
[docs]
def enable_all_steps(self):
self.scene.pipeline.enable_all_pipeline_steps()
[docs]
def check_files(self):
overwritten_outputs = pipeline_tools.nodes_with_existing_outputs(
self.scene.pipeline)
missing_inputs = pipeline_tools.nodes_with_missing_inputs(
self.scene.pipeline)
if len(overwritten_outputs) == 0 and len(missing_inputs) == 0:
QtGui.QMessageBox.information(
self, 'Pipeline ready', 'All input files are available. '
'No output file will be overwritten.')
else:
dialog = QtGui.QWidget()
layout = QtGui.QVBoxLayout(dialog)
warn_widget = PipelineFileWarningWidget(
missing_inputs, overwritten_outputs)
layout.addWidget(warn_widget)
hlay = QtGui.QHBoxLayout()
layout.addLayout(hlay)
hlay.addStretch()
ok = QtGui.QPushButton('OK')
self.ok_button = ok
hlay.addWidget(ok)
ok.clicked.connect(dialog.close)
dialog.show()
self._warn_files_widget = dialog
[docs]
def auto_dot_node_positions(self):
'''
Calculate pipeline nodes positions using graphviz/dot, and place the
pipeline view nodes accordingly.
'''
scene = self.scene
scale = 67. # dpi
nodes_sizes = dict([(name,
(gnode.boundingRect().width(),
gnode.boundingRect().height()))
for name, gnode in six.iteritems(scene.gnodes)])
dgraph = pipeline_tools.dot_graph_from_pipeline(
scene.pipeline, nodes_sizes=nodes_sizes)
tfile, tfile_name = tempfile.mkstemp()
os.close(tfile)
pipeline_tools.save_dot_graph(dgraph, tfile_name)
toutfile, toutfile_name = tempfile.mkstemp()
os.close(toutfile)
try:
cmd = ['dot', '-Tplain', '-o', toutfile_name, tfile_name]
try:
soma.subprocess.check_call(cmd)
except FileNotFoundError:
# dot is not installed in the PATH. Give up
return
nodes_pos = self._read_dot_pos(toutfile_name)
rects = dict([(name, node.boundingRect())
for name, node in six.iteritems(scene.gnodes)])
pos = dict([(name, (-rects[name].width() / 2 + pos[0] * scale,
-rects[name].height() / 2 - pos[1] * scale))
for id, name, pos in nodes_pos])
minx = min([x[0] for x in six.itervalues(pos)])
miny = min([x[1] for x in six.itervalues(pos)])
pos = dict([(name, (p[0] - minx, p[1] - miny))
for name, p in six.iteritems(pos)])
# print('pos:')
# print(pos)
scene.pos = pos
for node, position in six.iteritems(pos):
gnode = scene.gnodes[node]
if isinstance(position, Qt.QPointF):
gnode.setPos(position)
else:
gnode.setPos(*position)
finally:
os.unlink(tfile_name)
os.unlink(toutfile_name)
def _read_dot_pos(self, filename):
'''
Read the nodes positions from a file generated by graphviz/dot, in
"plain" text format.
Returns
-------
nodes_pos: dict
keys are nodes IDs (names), and values are 2D positions
'''
fileobj = open(filename)
nodes_pos = []
if sys.version_info[0] >= 3:
file_iter = fileobj.readlines()
else:
file_iter = fileobj
for line in file_iter:
if line.startswith('node'):
line_els0 = line.split()
line_els = []
for el in line_els0:
if el.startswith('"') and el.endswith('"'):
line_els.append(el[1:-1])
else:
line_els.append(el)
id = line_els[1]
pos = tuple([float(x) for x in line_els[2:4]])
name = line_els[6]
nodes_pos.append((id, name, pos))
elif line.startswith('edge'):
break
return nodes_pos
[docs]
def save_dot_image_ui(self):
'''
Ask for a filename using the file dialog, and save a graphviz/dot
representation of the pipeline.
The pipeline representation follows the current visualization mode
("regular" or "logical" with smaller boxes) with one link of a given
type (active, weak) between two given boxes: all parameters are not
represented.
'''
file_dialog = QtGui.QFileDialog(filter='Images (*.png *.xpm *.jpg *.ps *.eps);; All (*)')
file_dialog.setDefaultSuffix('.png')
file_dialog.setFileMode(QtGui.QFileDialog.AnyFile)
file_dialog.setAcceptMode(QtGui.QFileDialog.AcceptSave)
if file_dialog.exec_():
filename = file_dialog.selectedFiles()
'''filename = QtGui.QFileDialog.getSaveFileName(
None, 'Save image of the pipeline', '',
'Images (*.png *.xpm *.jpg *.ps *.eps);; All (*)')'''
if filename:
pipeline_tools.save_dot_image(self.scene.pipeline, filename[0])
[docs]
def reset_initial_nodes_positions(self):
'''
Set each pipeline node to its "saved" position, ie the one which may
be found in the "node_position" variable of the pipeline.
'''
scene = self.scene
if scene.pipeline is None:
return
# ############## add by Irmage OM ###################
# dim = getattr(scene.pipeline, 'node_dimension')
# if dim is not None:
# scene.dim = dim
# print()
# for node, dimension in six.iteritems(dim):
# gnode = scene.gnodes.get(node)
# if gnode is not None:
# if isinstance(dimension, QtCore.QPointF):
# dimension = (dim.x(),dim.y())
# # else:
# # dimension = dim.width(),dim.height()
# gnode.update(0,0,*dimension)
# #####################################################
pos = getattr(scene.pipeline, 'node_position')
if pos is not None:
scene.pos = pos
for node, position in six.iteritems(pos):
gnode = scene.gnodes.get(node)
if gnode is not None:
if isinstance(position, QtCore.QPointF):
position = (position.x(), position.y())
gnode.setPos(*position)
def switch_logical_view(self):
self.set_logical_view(not self.is_logical_view())
def print_node_positions(self):
def conv_pos(p):
if isinstance(p, Qt.QPointF):
return (p.x(), p.y())
return p
posdict = dict([(key, conv_pos(value)) \
for key, value in six.iteritems(self.scene.pos)])
pprint(posdict)
def del_node(self, node_name=None):
pipeline = self.scene.pipeline
if not node_name:
node_name = self.current_node_name
if node_name not in ('inputs', 'outputs'):
node = pipeline.nodes[node_name]
pipeline.remove_node(node_name)
elif node_name in self.scene.gnodes:
# delete all input or output export plugs in the pipeline
if node_name == 'inputs':
plugs = [name
for name, plug in
six.iteritems(pipeline.pipeline_node.plugs)
if not plug.output]
else:
plugs = [name
for name, plug in
six.iteritems(pipeline.pipeline_node.plugs)
if plug.output]
for plug in plugs:
self.scene.pipeline.remove_trait(plug)
self.scene.pipeline.update_nodes_and_plugs_activation()
def export_node_plugs(self, node_name, inputs=True, outputs=True,
optional=False):
pipeline = self.scene.pipeline
node = pipeline.nodes[node_name]
for parameter_name, plug in six.iteritems(node.plugs):
if parameter_name in ("nodes_activation", "selection_changed"):
continue
if (((node_name, parameter_name) not in pipeline.do_not_export and
((outputs and plug.output and not plug.links_to) or
(inputs and not plug.output and not plug.links_from)) and
(optional or not node.get_trait(parameter_name).optional))):
pipeline.export_parameter(node_name, parameter_name)
def export_plugs(self, inputs=True, outputs=True, optional=False):
for node_name in self.scene.pipeline.nodes:
if node_name != "":
self.export_node_plugs(node_name, inputs=inputs,
outputs=outputs, optional=optional)
def export_node_unconnected_mandatory_plugs(self):
self.export_node_plugs(self.current_node_name)
def export_node_all_unconnected_plugs(self):
self.export_node_plugs(self.current_node_name, optional=True)
def export_node_unconnected_mandatory_inputs(self):
self.export_node_plugs(
self.current_node_name, inputs=True, outputs=False)
def export_node_all_unconnected_inputs(self):
self.export_node_plugs(
self.current_node_name, inputs=True, outputs=False, optional=True)
def export_node_unconnected_mandatory_outputs(self):
self.export_node_plugs(
self.current_node_name, inputs=False, outputs=True)
def export_node_all_unconnected_outputs(self):
self.export_node_plugs(
self.current_node_name, inputs=False, outputs=True, optional=True)
def export_unconnected_mandatory_plugs(self):
self.export_plugs()
def export_all_unconnected_plugs(self):
self.export_plugs(optional=True)
def export_unconnected_mandatory_inputs(self):
self.export_plugs(inputs=True, outputs=False)
def export_all_unconnected_inputs(self):
self.export_plugs(inputs=True, outputs=False, optional=True)
def export_unconnected_mandatory_outputs(self):
self.export_plugs(inputs=False, outputs=True)
def export_all_unconnected_outputs(self):
self.export_plugs(inputs=False, outputs=True, optional=True)
def _change_step(self):
node_name = self.current_node_name
node = self.scene.pipeline.nodes[node_name]
steps = getattr(self.scene.pipeline, 'pipeline_steps', None)
steps_defined = True
if steps is None:
steps = Controller()
steps_defined = False
wid = Qt.QDialog()
wid.setModal(True)
lay = Qt.QVBoxLayout()
wid.setLayout(lay)
listw = Qt.QListWidget()
listw.setSelectionMode(listw.MultiSelection)
lay.addWidget(listw)
n = 0
for step in steps.user_traits():
listw.addItem(step)
nodes = steps.trait(step).nodes
if node_name in nodes:
item = listw.item(n)
item.setSelected(True)
n += 1
addlay = Qt.QHBoxLayout()
lay.addLayout(addlay)
addb = Qt.QPushButton('+')
addlay.addWidget(addb)
remb = Qt.QPushButton('-')
addlay.addWidget(remb)
def add_clicked():
d = Qt.QDialog()
d.setModal(True)
la = Qt.QHBoxLayout()
d.setLayout(la)
l = Qt.QLineEdit()
la.addWidget(l)
l.returnPressed.connect(d.accept)
r = d.exec_()
if r:
name = l.text()
if name not in steps.user_traits():
n = listw.count()
listw.addItem(name)
listw.item(n).setSelected(True)
def remove_clicked():
selected = []
for i in range(listw.count()):
item = listw.item(i)
if item.isSelected():
selected.append((i, item.text()))
if len(selected) != 0:
r = Qt.QMessageBox.question(
wid, 'remove steps',
'remove the following steps from the whole pipeline ?\n%s'
% repr([s[1] for s in selected]))
if r == Qt.QMessageBox.Yes:
for s in reversed(selected):
listw.takeItem(s[0])
def up_clicked():
selected = []
for i in range(listw.count()):
item = listw.item(i)
if item.isSelected():
selected.append(i)
if len(selected) != 0 and selected[0] != 0:
for i in selected:
item = listw.takeItem(i)
listw.insertItem(i-1, item)
item.setSelected(True)
def down_clicked():
selected = []
for i in range(listw.count()):
item = listw.item(i)
if item.isSelected():
selected.append(i)
if len(selected) != 0 and selected[-1] != listw.count() - 1:
for i in reversed(selected):
item = listw.takeItem(i)
listw.insertItem(i + 1, item)
item.setSelected(True)
addb.clicked.connect(add_clicked)
remb.clicked.connect(remove_clicked)
up = Qt.QPushButton('^')
addlay.addWidget(up)
down = Qt.QPushButton('v')
addlay.addWidget(down)
up.clicked.connect(up_clicked)
down.clicked.connect(down_clicked)
oklay = Qt.QHBoxLayout()
lay.addLayout(oklay)
ok = Qt.QPushButton('OK')
oklay.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
oklay.addWidget(cancel)
ok.clicked.connect(wid.accept)
cancel.clicked.connect(wid.reject)
res = wid.exec_()
if res:
items = set()
sitems = []
for i in range(listw.count()):
item = listw.item(i)
name = item.text()
sel = item.isSelected()
items.add(name)
sitems.append(name)
trait = steps.trait(name)
if sel:
if trait is None:
self.scene.pipeline.add_pipeline_step(
name, [node_name])
steps = self.scene.pipeline.pipeline_steps
else:
nodes = steps.trait(name).nodes
if node_name not in nodes:
nodes.append(node_name)
elif trait is not None:
if node_name in trait.nodes:
trait.nodes.remove(node_name)
steps = list(steps.user_traits().keys())
for step in steps:
if step not in items:
self.scene.pipeline.remove_pipeline_step(step)
# reorder traits if needed
steps = self.scene.pipeline.pipeline_steps
if list(steps.user_traits().keys()) != sitems:
values = [steps.trait(step).nodes for step in sitems]
for step in sitems:
steps.remove_trait(step)
for step, nodes in zip(sitems, values):
self.scene.pipeline.add_pipeline_step(step, nodes)
self.scene.update_pipeline()
[docs]
def add_process(self):
'''
Insert a process node in the pipeline. Asks for the process
module/name, and the node name before inserting.
'''
proc_name_gui = PipelineDeveloperView.ProcessModuleInput()
proc_name_gui.resize(800, proc_name_gui.sizeHint().height())
res = proc_name_gui.exec_()
if res:
proc_module = six.text_type(proc_name_gui.proc_line.text())
node_name = str(proc_name_gui.name_line.text())
self.add_named_process(proc_module, node_name)
def add_named_process(self, proc_module, node_name=None):
pipeline = self.scene.pipeline
if not node_name:
if isinstance(proc_module, six.string_types):
class_name = proc_module
else:
class_name = proc_module.__name__
i = 1
node_name = '%s_%d' % (class_name.lower(), i)
while node_name in pipeline.nodes and i < 100:
i += 1
node_name = '%s_%d' % (class_name.lower(), i)
engine = pipeline.get_study_config().engine
try:
process = engine.get_process_instance(proc_module)
except Exception:
traceback.print_exc()
return
pipeline.add_process(node_name, process)
node = pipeline.nodes[node_name]
gnode = self.scene.add_node(node_name, node)
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
return process
[docs]
def add_node(self):
'''
Insert a custom node in the pipeline. Asks for the node
module/name, and the node name before inserting.
'''
def is_pipeline_node(item):
return item is not Node and isinstance(item, Node)
node_name_gui = PipelineDeveloperView.ProcessModuleInput(
display_str='node module/name', class_type_check=is_pipeline_node)
node_name_gui.resize(800, node_name_gui.sizeHint().height())
res = node_name_gui.exec_()
if res:
node_module = six.text_type(node_name_gui.proc_line.text())
node_name = str(node_name_gui.name_line.text())
self.add_named_node(node_name, node_module)
def add_named_node(self, node_name, node_module):
def configure_node(cls):
conf_controller = cls.configure_controller()
w = Qt.QDialog()
w.setWindowTitle('Custom node parameterization')
l = Qt.QVBoxLayout()
w.setLayout(l)
c = ScrollControllerWidget(conf_controller, live=True)
l.addWidget(c)
h = Qt.QHBoxLayout()
l.addLayout(h)
ok = Qt.QPushButton('OK')
h.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
h.addWidget(cancel)
ok.clicked.connect(w.accept)
cancel.clicked.connect(w.reject)
res = w.exec_()
if res:
c.update_controller()
return conf_controller
else:
return None
def get_node_instance(class_str, pipeline):
cls_and_name = process_instance.get_node_class(class_str)
if cls_and_name is None:
return None
name, cls = cls_and_name
if hasattr(cls, 'configure_controller'):
conf_controller = configure_node(cls)
if conf_controller is None:
return None # abort
else:
conf_controller = Controller()
if hasattr(cls, 'build_node'):
node = cls.build_node(pipeline, name, conf_controller)
else:
# probably bound to fail...
node = cls(pipeline, name, [], [])
return node
pipeline = self.scene.pipeline
try:
node = get_node_instance(node_module, pipeline)
except Exception as e:
print(e)
return
if node is None:
return
if not node_name and node:
class_name = node.__class__.__name__
i = 1
node_name = '%s_%d' % (class_name.lower(), i)
while node_name in pipeline.nodes and i < 100:
i += 1
node_name = '%s_%d' % (class_name.lower(), i)
pipeline.nodes[node_name] = node
pipeline._set_subprocess_context_name(node, node_name)
gnode = self.scene.add_node(node_name, node)
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
[docs]
def add_iterative_process(self):
'''
Insert an iterative process node in the pipeline. Asks for the process
module/name, the node name, and iterative plugs before inserting.
'''
pipeline = self.scene.pipeline
engine = pipeline.get_study_config().engine
proc_name_gui = PipelineDeveloperView.IterativeProcessInput(engine)
proc_name_gui.resize(800, proc_name_gui.sizeHint().height())
res = proc_name_gui.exec_()
if res:
proc_module = six.text_type(proc_name_gui.proc_line.text())
node_name = str(proc_name_gui.name_line.text())
try:
process = engine.get_process_instance(
six.text_type(proc_name_gui.proc_line.text()))
except Exception as e:
print(e)
return
iterative_plugs = proc_name_gui.iterative_plugs()
do_not_export = list(process.user_traits().keys())
pipeline.add_iterative_process(node_name, process, iterative_plugs,
do_not_export=do_not_export)
node = pipeline.nodes[node_name]
gnode = self.scene.add_node(node_name, node)
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
[docs]
def add_switch(self):
'''
Insert a switch node in the pipeline. Asks for the switch
inputs/outputs, and the node name before inserting.
'''
class SwitchInput(QtGui.QDialog):
def __init__(self):
super(SwitchInput, self).__init__()
self.setWindowTitle('switch parameters/name:')
layout = QtGui.QGridLayout(self)
layout.addWidget(QtGui.QLabel('inputs:'), 0, 0)
self.inputs_line = QtGui.QLineEdit()
layout.addWidget(self.inputs_line, 0, 1)
layout.addWidget(QtGui.QLabel('outputs:'), 1, 0)
self.outputs_line = QtGui.QLineEdit()
layout.addWidget(self.outputs_line, 1, 1)
layout.addWidget(QtGui.QLabel('node name'), 2, 0)
self.name_line = QtGui.QLineEdit()
layout.addWidget(self.name_line, 2, 1)
ok = QtGui.QPushButton('OK')
layout.addWidget(ok, 3, 0)
cancel = QtGui.QPushButton('Cancel')
layout.addWidget(cancel, 3, 1)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
switch_name_gui = SwitchInput()
switch_name_gui.resize(600, switch_name_gui.sizeHint().height())
res = switch_name_gui.exec_()
if res:
pipeline = self.scene.pipeline
node_name = str(switch_name_gui.name_line.text()).strip()
inputs = str(switch_name_gui.inputs_line.text()).split()
outputs = str(switch_name_gui.outputs_line.text()).split()
pipeline.add_switch(node_name, inputs, outputs)
# add_switch triggers an update
gnode = self.scene.gnodes[node_name]
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
[docs]
def add_optional_output_switch(self):
'''
Insert an optional output switch node in the pipeline. Asks for the
switch inputs/outputs, and the node name before inserting.
'''
class SwitchInput(QtGui.QDialog):
def __init__(self):
super(SwitchInput, self).__init__()
self.setWindowTitle('switch parameters/name:')
layout = QtGui.QGridLayout(self)
layout.addWidget(QtGui.QLabel('input:'), 0, 0)
self.inputs_line = QtGui.QLineEdit()
layout.addWidget(self.inputs_line, 0, 1)
layout.addWidget(QtGui.QLabel('output:'), 1, 0)
self.outputs_line = QtGui.QLineEdit()
layout.addWidget(self.outputs_line, 1, 1)
layout.addWidget(QtGui.QLabel('node name'), 2, 0)
self.name_line = QtGui.QLineEdit()
layout.addWidget(self.name_line, 2, 1)
ok = QtGui.QPushButton('OK')
layout.addWidget(ok, 3, 0)
cancel = QtGui.QPushButton('Cancel')
layout.addWidget(cancel, 3, 1)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
switch_name_gui = SwitchInput()
switch_name_gui.resize(600, switch_name_gui.sizeHint().height())
res = switch_name_gui.exec_()
if res:
pipeline = self.scene.pipeline
node_name = str(switch_name_gui.name_line.text()).strip()
input = str(switch_name_gui.inputs_line.text()).strip()
output = str(switch_name_gui.outputs_line.text()).strip()
if output == '' and node_name != '':
output = node_name
elif output != '' and node_name == '':
node_name = output
pipeline.add_optional_output_switch(node_name, input, output)
# add_optional_output_switch does *not* trigger an update
self._reset_pipeline()
gnode = self.scene.gnodes[node_name]
gnode.setPos(self.mapToScene(self.mapFromGlobal(self.click_pos)))
def _plug_clicked(self, name):
if self.is_logical_view() or not self.edition_enabled():
# in logival view, links are not editable since they do not reflect
# the details of reality
return
node_name, plug_name = str(name).split(':')
plug_name = str(plug_name)
gnode = self.scene.gnodes[node_name]
plug = gnode.out_plugs.get(plug_name)
typeq = self.scene.typeLink(node_name, plug_name)
try:
# color = self.scene.colorLink(typeq)
color = self.scene.colType.colorLink(typeq)
except Exception:
color = ORANGE_2
if not plug:
return # probably an input plug
plug_pos = plug.mapToScene(plug.mapFromParent(plug.get_plug_point()))
self._grabpos = self.mapFromScene(plug_pos)
self._temp_link = Link(
plug_pos,
self.mapToScene(self.mapFromGlobal(QtGui.QCursor.pos())),
True, False, color)
self._temp_link.pen.setBrush(RED_2)
self.scene.addItem(self._temp_link)
self._grab_link = True
self._grabbed_plug = (node_name, plug_name)
def _move_grab_link(self, event):
pos = self.mapToScene(event.pos())
self._temp_link.update(self.mapToScene(self._grabpos), pos)
def _release_grab_link(self, event, ret=False):
max_square_dist = 100.
self._grab_link = False
# delete the temp link
self.scene.removeItem(self._temp_link)
del self._temp_link
pos = self.mapToScene(event.pos())
item = self.scene.itemAt(pos, Qt.QTransform())
plug = None
if isinstance(item, Link):
# look for its dest plug
plug = None
for source_dest, link in six.iteritems(self.scene.glinks):
if link is item:
plug = source_dest[1]
break
if plug is not None:
# check the plug is not too far from the drop point
gnode = self.scene.gnodes[plug[0]]
gplug = gnode.in_plugs[plug[1]]
plug_pos = gplug.mapToScene(
gplug.mapFromParent(gplug.get_plug_point()))
pdiff = plug_pos - pos
dist2 = pdiff.x() * pdiff.x() + pdiff.y() * pdiff.y()
if dist2 > max_square_dist:
plug = None
elif isinstance(item, Plug):
plug = str(item.name).split(':')
if plug is not None:
if self._grabbed_plug[0] not in ('', 'inputs'):
src = '%s.%s' % self._grabbed_plug
else:
src = self._grabbed_plug[1]
if plug[0] not in ('', 'outputs'):
dst = '%s.%s' % tuple(plug)
else:
dst = plug[1]
# if (src != dst) and ("inputs."+src != dst) and not self.isInputYet(dst) :
if (src != dst) and ("inputs." + src != dst):
self.scene.pipeline.add_link('%s->%s' % (src, dst))
self.scene.update_pipeline()
if ret:
self._grabbed_plug = None
return '%s->%s' % (src, dst)
self._grabbed_plug = None
# def isInputYet(self,dest):##################################################################### add by OM
# for listK in self.scene.glinks.keys():
# if ( eval(str(eval(str(listK))[1]))[0]+"."+ eval(str(eval(str(listK))[1]))[1]==dest or eval(str(eval(str(listK))[1]))[0]+"."+ eval(str(eval(str(listK))[1]))[1]=="outputs."+dest):
# print("input '",dest, "' already used !!")
# return True
# return False
def _node_delete_clicked(self, name_node):
self.current_node_name = name_node
self.del_node()
def _link_delete_clicked(self, src_node, src_plug, dst_node, dst_plug):
src_node = str(src_node)
src_plug = str(src_plug)
dst_node = str(dst_node)
dst_plug = str(dst_plug)
# print(src_node,",",src_plug,",",dst_node,",",dst_plug)
if self.is_logical_view() or not self.edition_enabled():
# in logical view, links are not real links
return
if src_node in ('', 'inputs'):
src = src_plug
snode = self.scene.pipeline.pipeline_node
else:
src = '%s.%s' % (src_node, src_plug)
snode = self.scene.pipeline.nodes[src_node]
if dst_node in ('', 'outputs'):
dst = dst_plug
dnode = self.scene.pipeline.pipeline_node
else:
dst = '%s.%s' % (dst_node, dst_plug)
dnode = self.scene.pipeline.nodes[dst_node]
name = '%s->%s' % (src, dst)
self._current_link = name # (src_node, src_plug, dst_node, dst_plug)
self._del_link()
del self._current_link
def _link_clicked(self, src_node, src_plug, dst_node, dst_plug):
src_node = str(src_node)
src_plug = str(src_plug)
dst_node = str(dst_node)
dst_plug = str(dst_plug)
if self.is_logical_view() or not self.edition_enabled():
# in logical view, links are not real links
return
if src_node in ('', 'inputs'):
src = src_plug
snode = self.scene.pipeline.pipeline_node
else:
src = '%s.%s' % (src_node, src_plug)
snode = self.scene.pipeline.nodes[src_node]
if dst_node in ('', 'outputs'):
dst = dst_plug
dnode = self.scene.pipeline.pipeline_node
else:
dst = '%s.%s' % (dst_node, dst_plug)
dnode = self.scene.pipeline.nodes[dst_node]
name = '%s->%s' % (src, dst)
self._current_link = name # (src_node, src_plug, dst_node, dst_plug)
self._current_link_def = (src_node, src_plug, dst_node, dst_plug)
menu = QtGui.QMenu('Link: %s' % name)
title = menu.addAction('Link: %s' % name)
title.setEnabled(False)
menu.addSeparator()
weak = False
splug = snode.plugs[src_plug]
for link in splug.links_to:
if link[0] == dst_node and link[1] == dst_plug:
weak = link[4]
break
weak_action = menu.addAction('Weak link')
weak_action.setCheckable(True)
weak_action.setChecked(bool(weak))
weak_action.toggled.connect(self._change_weak_link)
menu.addSeparator()
del_link = menu.addAction('Delete link')
del_link.triggered.connect(self._del_link)
menu.exec_(QtGui.QCursor.pos())
del self._current_link
del self._current_link_def
def get_doc_browser(self, create=False):
doc_browser = self.doc_browser
pv = self
proxy = False
while isinstance(doc_browser, PipelineDeveloperView):
# it's a proxy to a parent view
pv = doc_browser
doc_browser = pv.doc_browser
proxy = True
if doc_browser or not create:
return doc_browser
try:
# use the newer Qt5 QtWebEngine
from soma.qt_gui.qt_backend import QtWebEngine
from soma.qt_gui.qt_backend.QtWebEngineWidgets \
import QWebEngineView, QWebEnginePage
use_webengine = True
except ImportError as e:
print('\n{}'.format(e))
try:
from soma.qt_gui.qt_backend import QtWebKit
QWebEngineView = QtWebKit.QWebView
QWebPage = QtWebKit.QWebPage
QWebEnginePage = QWebPage
use_webengine = False
except ImportError as e:
print('\n{}\n\nThe process documentation cannot be '
'displayed ...'.format(e))
return None
self._use_webengine = use_webengine
class DocBrowser(QWebEngineView):
def __init__(self, pview, *args, **kwargs):
super(DocBrowser, self).__init__(*args, **kwargs)
self.setAttribute(Qt.Qt.WA_DeleteOnClose)
self.pview = pview
def closeEvent(self, event):
self.pview.doc_browser = None
event.accept()
super(DocBrowser, self).closeEvent(event)
doc_browser = DocBrowser(pv) # QWebEngineView()
pv.doc_browser = doc_browser
doc_browser.show()
return doc_browser
def _node_clicked(self, name, node):
self.show_node_doc(node)
if isinstance(node, Process):
self.process_clicked.emit(name, node)
else:
self.node_clicked.emit(name, node)
@staticmethod
def get_node_html_doc(node):
doc_path = getattr(node, '_doc_path', None)
if doc_path and os.path.isabs(doc_path):
return doc_path
modname = node.__module__
init_modname = modname
while True:
mod = sys.modules[modname]
mod_doc_path = getattr(mod, '_doc_path', None)
if mod_doc_path:
if doc_path:
return os.path.join(mod_doc_path, doc_path)
node_type = 'process'
if isinstance(node, Pipeline):
node_type = 'pipeline'
path = os.path.join(
mod_doc_path, node_type,
'%s.html' % '.'.join((node.__module__,
node.__class__.__name__)))
if os.path.exists(path) or path.startswith('http://') \
or path.startswith('https://'):
return path
# try using the 1st sub-module
modsplit = init_modname.split('.')
if len(modsplit) >= 3:
path = os.path.join(
mod_doc_path, modsplit[1], node_type,
'%s.html' % '.'.join((node.__module__,
node.__class__.__name__)))
if os.path.exists(path) or path.startswith('http://') \
or path.startswith('https://'):
return path
return None
s = modname.rsplit('.', 1)
if len(s) == 1:
break
modname = s[0]
def show_doc(self, node_name=None):
pipeline = self.scene.pipeline
if not node_name:
node_name = self.current_node_name
if node_name in ('inputs', 'outputs'):
node = pipeline.pipeline_node
else:
node = pipeline.nodes[node_name]
if isinstance(node, ProcessNode):
node = node.process
doc_browser = self.get_doc_browser(create=True)
self.show_node_doc(node)
def show_node_doc(self, node):
doc_browser = self.get_doc_browser()
if doc_browser:
doc_path = self.get_node_html_doc(node)
if doc_path:
if not doc_path.startswith('http://') \
and not doc_path.startswith('https://') \
and not doc_path.startswith('file://'):
doc_path = 'file://%s' % os.path.abspath(doc_path)
doc_browser.setUrl(Qt.QUrl(doc_path))
else:
gethelp = getattr(node, 'get_help')
msg = None
if gethelp:
msg = node.get_help(returnhelp=True)
if not msg:
msg = node.getattr(node, '__doc__', None)
if msg:
doc_browser.setContent(Qt.QByteArray(msg.encode('utf-8')),
'text/plain')
def _node_clicked_ctrl(self, name, process):
for source_dest, glink in six.iteritems(self.scene.glinks):
glink.fonced_viewer(False)
# print("source-dest ",source_dest)
if name not in str(source_dest):
glink.fonced_viewer(True)
# else:
# print(source_dest[0])
for node_name, gnode in six.iteritems(self.scene.gnodes):
# print(" node_name",node_name)
gnode.fonced_viewer(False)
if name not in str(node_name):
gnode.fonced_viewer(True)
def _change_weak_link(self, weak):
# src_node, src_plug, dst_node, dst_plug = self._current_link
link_def = self._current_link
self.scene.pipeline.remove_link(link_def)
self.scene.pipeline.add_link(link_def, weak_link=weak)
self.scene.update_pipeline()
def _del_link(self):
print('\nRemoving the link: ', self._current_link)
src_node, src_plug, dst_node, dst_plug = self._current_link_def
link_def = self._current_link
pipeline = self.scene.pipeline
pipeline.remove_link(link_def)
if (src_node in ('', 'inputs') and
len(pipeline.pipeline_node.plugs[src_plug].links_to) == 0):
# remove orphan pipeline plug
pipeline.remove_trait(src_plug)
elif (dst_node in ('', 'outputs') and
len(pipeline.pipeline_node.plugs[dst_plug].links_from) == 0):
# remove orphan pipeline plug
pipeline.remove_trait(dst_plug)
self.scene.update_pipeline()
def _plug_right_clicked(self, name):
for node_name, gnode in six.iteritems(self.scene.gnodes):
if node_name in 'inputs':
self.inputYet = True
if node_name in 'outputs':
self.outputYet = False
if self.is_logical_view() or not self.edition_enabled():
# in logival view, links are not editable since they do not reflect
# the details of reality
return
node_name, plug_name = str(name).split(':')
plug_name = str(plug_name)
if node_name in ('inputs', 'outputs'):
node = self.scene.pipeline.pipeline_node
else:
node = self.scene.pipeline.nodes[node_name]
plug = node.plugs[plug_name]
output = plug.output
self._temp_node = node
self._temp_plug = plug
self._temp_plug_name = (node_name, plug_name)
menu = QtGui.QMenu('Plug: %s' % name)
title = menu.addAction('Plug: %s' % name)
title.setEnabled(False)
menu.addSeparator()
if node_name not in ('inputs', 'outputs'):
# not a main node: allow export
if output:
links = plug.links_to
else:
links = plug.links_from
existing = False
for link in links:
if link[0] == '':
existing = True
break
export_action = menu.addAction('export plug')
export_action.triggered.connect(self._export_plug)
if existing:
export_action.setEnabled(False)
if isinstance(node, ProcessNode) \
and isinstance(node.process, ProcessIteration):
iter_action = menu.addAction('iterative plug')
iter_action.setCheckable(True)
iter_action.setChecked(
plug_name in node.process.iterative_parameters)
iter_action.toggled[bool].connect(self._change_iterative_plug)
else:
del_plug = menu.addAction('Remove plug')
del_plug.triggered.connect(self._remove_plug)
edit_plug = menu.addAction('Rename / edit plug')
edit_plug.triggered.connect(self._edit_plug)
protect_action = menu.addAction('protected')
protect_action.setCheckable(True)
protect_action.setChecked(node.is_parameter_protected(plug_name))
protect_action.toggled[bool].connect(self._protect_plug)
complete_action = menu.addAction('completion enabled')
complete_action.setCheckable(True)
complete_action.setChecked(
not node.get_trait(plug_name).forbid_completion)
complete_action.toggled[bool].connect(self._enable_plug_completion)
menu.exec_(QtGui.QCursor.pos())
del self._temp_plug
del self._temp_plug_name
del self._temp_node
class _PlugEdit(QtGui.QDialog):
def __init__(self, show_weak=True, parent=None):
super(PipelineDeveloperView._PlugEdit, self).__init__(parent)
layout = QtGui.QVBoxLayout(self)
hlay1 = QtGui.QHBoxLayout()
layout.addLayout(hlay1)
hlay1.addWidget(QtGui.QLabel('Plug name:'))
self.name_line = QtGui.QLineEdit()
hlay1.addWidget(self.name_line)
hlay2 = QtGui.QHBoxLayout()
layout.addLayout(hlay2)
self.optional = QtGui.QCheckBox('Optional')
hlay2.addWidget(self.optional)
if show_weak:
self.weak = QtGui.QCheckBox('Weak link')
hlay2.addWidget(self.weak)
hlay3 = QtGui.QHBoxLayout()
layout.addLayout(hlay3)
ok = QtGui.QPushButton('OK')
hlay3.addWidget(ok)
cancel = QtGui.QPushButton('Cancel')
hlay3.addWidget(cancel)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
def _export_plug(self):
dial = self._PlugEdit()
dial.name_line.setText(self._temp_plug_name[1])
dial.optional.setChecked(self._temp_plug.optional)
res = dial.exec_()
if res:
# for node_name, gnode in six.iteritems(self.scene.gnodes):
# print("list Nodes",node_name)
try:
self.scene.pipeline.export_parameter(
self._temp_plug_name[0], self._temp_plug_name[1],
pipeline_parameter=str(dial.name_line.text()),
is_optional=dial.optional.isChecked(),
weak_link=dial.weak.isChecked())
# print(str(dial.name_line.text()))
# self.scene.gnodes.changeHmin(15)
except Exception as e:
print('exception while export plug:', e)
pass
self.scene.update_pipeline()
def _change_iterative_plug(self, checked):
node = self._temp_node
node_name, name = self._temp_plug_name
node.process.change_iterative_plug(name, checked)
self.scene.update_pipeline()
def _protect_plug(self, checked):
node = self._temp_node
node_name, name = self._temp_plug_name
node.protect_parameter(name, checked)
def _enable_plug_completion(self, checked):
node = self._temp_node
node_name, name = self._temp_plug_name
node.get_trait(name).forbid_completion = not checked
def _remove_plug(self):
if self._temp_plug_name[0] in ('inputs', 'outputs'):
# print 'remove plug:', self._temp_plug_name[1]
#print('#' * 50)
#print(self._temp_plug_name)
#print(self._temp_plug)
#for trait_name, trait in self.scene.pipeline.traits().items():
#print(trait_name, trait)
#if trait.handler is None:
#print('HANDLER IS NONE')
#else:
#print('HANDLER:', trait.handler)
#if trait.has_items:
#print("HANDLER HAS ITEMS")
self.scene.pipeline.remove_trait(self._temp_plug_name[1])
self.scene.update_pipeline()
def _edit_plug(self):
dial = self._PlugEdit(show_weak=False)
dial.name_line.setText(self._temp_plug_name[1])
dial.name_line.setEnabled(False) ## FIXME
dial.optional.setChecked(self._temp_plug.optional)
res = dial.exec_()
if res:
plug = self._temp_plug
plug.optional = dial.optional.isChecked()
# print 'TODO.'
self.scene.update_pipeline()
def _prune_plugs(self):
pipeline = self.scene.pipeline
pnode = pipeline.pipeline_node
to_del = []
for plug_name, plug in six.iteritems(pnode.plugs):
if plug.output and len(plug.links_from) == 0:
to_del.append(plug_name)
elif not plug.output and len(plug.links_to) == 0:
to_del.append(plug_name)
for plug_name in to_del:
pipeline.remove_trait(plug_name)
self.scene.update_pipeline()
def confirm_erase_pipeline(self):
if len(self.scene.pipeline.nodes) <= 1:
return True
confirm = Qt.QMessageBox.warning(
self,
'New pipeline',
'The current pipeline will be lost. Continue ?',
Qt.QMessageBox.Ok | Qt.QMessageBox.Cancel,
Qt.QMessageBox.Cancel)
if confirm != Qt.QMessageBox.Ok:
return False
return True
def new_pipeline(self):
if not self.confirm_erase_pipeline():
return
w = Qt.QDialog(self)
w.setModal(True)
w.setWindowTitle('Pipeline name')
l = Qt.QVBoxLayout()
w.setLayout(l)
le = Qt.QLineEdit()
l.addWidget(le)
l2 = Qt.QHBoxLayout()
l.addLayout(l2)
ok = Qt.QPushButton('OK')
l2.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
l2.addWidget(cancel)
ok.clicked.connect(w.accept)
cancel.clicked.connect(w.reject)
le.returnPressed.connect(w.accept)
res = w.exec_()
if res:
class_kwargs = {
'__module__': '__main__',
'do_autoexport_nodes_parameters': False,
'node_position': {},
'node_dimension': {}
}
name = le.text()
if type(name) is not str: # unicode ?
name = name.encode()
pipeline_class = type(name, (Pipeline,), class_kwargs)
pipeline = pipeline_class()
self.set_pipeline(pipeline)
self._pipeline_filename = ''
def load_pipeline(self, filename='', load_pipeline=True):
class LoadProcessUi(Qt.QDialog):
def __init__(self, parent=None, old_filename=''):
super(LoadProcessUi, self).__init__(parent)
self.old_filename = old_filename
lay = Qt.QVBoxLayout()
self.setLayout(lay)
l2 = Qt.QHBoxLayout()
lay.addLayout(l2)
l2.addWidget(Qt.QLabel('Pipeline:'))
self.proc_edit = PipelineDeveloperView.ProcessNameEdit()
l2.addWidget(self.proc_edit)
self.loadbt = Qt.QPushButton('...')
l2.addWidget(self.loadbt)
l3 = Qt.QHBoxLayout()
lay.addLayout(l3)
ok = Qt.QPushButton('OK')
l3.addWidget(ok)
cancel = Qt.QPushButton('Cancel')
l3.addWidget(cancel)
ok.clicked.connect(self.accept)
cancel.clicked.connect(self.reject)
self.loadbt.clicked.connect(self.get_filename)
self.proc_edit.returnPressed.connect(self.accept)
def get_filename(self):
filename = qt_backend.getOpenFileName(
None, 'Load the pipeline', self.old_filename,
'Compatible files (*.xml *.json *.py);; All (*)')
if filename:
self.proc_edit.setText(filename)
if not self.confirm_erase_pipeline():
return
if not filename:
old_filename = getattr(self, '_pipeline_filename', '')
dialog = LoadProcessUi(self, old_filename=old_filename)
dialog.setWindowTitle('Load pipeline')
dialog.setModal(True)
dialog.resize(800, dialog.sizeHint().height())
res = dialog.exec_()
if res:
filename = dialog.proc_edit.text()
if filename:
if not load_pipeline:
return filename
else:
try:
if self.scene.pipeline:
# keep the same engine
engine = self.scene.pipeline.get_study_config().engine
pipeline = engine.get_process_instance(filename)
else:
pipeline = get_process_instance(filename)
except Exception as e:
print(e)
pipeline = None
if pipeline is not None:
self.set_pipeline(pipeline)
self._pipeline_filename = filename
return filename
[docs]
def save_pipeline(self):
'''
Ask for a filename using the file dialog, and save the pipeline as a
XML, JSON or python file.
'''
pipeline = self.scene.pipeline
old_filename = getattr(self, '_pipeline_filename', '')
filename = qt_backend.getSaveFileName(
None, 'Save the pipeline', old_filename,
'Compatible files (*.xml *.json *.py);; All (*)')
if filename:
posdict = {}
for key, value in six.iteritems(self.scene.pos):
if hasattr(value, 'x'):
posdict[key] = (value.x(), value.y())
else:
posdict[key] = (value[0], value[1])
dimdict = {}
for key, value in six.iteritems(self.scene.dim):
if hasattr(value, 'boundingRect'):
dimdict[key] = (value.boundingRect().width(),
value.boundingRect().height())
else:
dimdict[key] = (value[0], value[1])
pipeline.node_dimension = dimdict
old_pos = pipeline.node_position
old_dim = pipeline.node_dimension
pipeline.node_position = posdict
pipeline_tools.save_pipeline(pipeline, filename)
self._pipeline_filename = six.text_type(filename)
pipeline.node_position = old_pos
pipeline.node_dimension = old_dim
#def load_pipeline_parameters(self):
#"""
#Loading and setting pipeline parameters (inputs and outputs) from a Json file.
#"""
#pipeline = self.scene.pipeline
#filename = qt_backend.getOpenFileName(
#None, 'Load pipeline parameters', '',
#'Compatible files (*.json)')
#pipeline_tools.load_pipeline_parameters(filename, pipeline)
#def save_pipeline_parameters(self):
#"""
#Saving pipeline parameters (inputs and outputs) to a Json file.
#"""
#pipeline = self.scene.pipeline
#filename = qt_backend.getSaveFileName(
#None, 'Save pipeline parameters', '',
#'Compatible files (*.json)')
#pipeline_tools.save_pipeline_parameters(filename, pipeline)
[docs]
def load_pipeline_parameters(self, root_path=''):
"""
Loading and setting pipeline parameters (inputs and outputs) from a Json file.
:return:
"""
def hinted_tuple_hook(obj):
if '__tuple__' in obj:
return tuple(obj['items'])
else:
return obj
filename = qt_backend.getOpenFileName(
None, 'Load the pipeline parameters', root_path,
'Compatible files (*.json)')
if filename:
with io.open(filename, 'r', encoding='utf8') as fileJson:
dic = json.load(fileJson)
dic = json.loads(dic, object_hook=hinted_tuple_hook)
if "pipeline_parameters" not in list(dic.keys()):
raise KeyError('No "pipeline_parameters" key found in {0}.'.format(filename))
for trait_name, trait_value in dic["pipeline_parameters"].items():
if trait_name not in list(self.scene.pipeline.user_traits().keys()):
print('No "{0}" parameter in pipeline.'.format(trait_name))
try:
setattr(self.scene.pipeline, trait_name, trait_value)
except traits.TraitError:
print("Error for the plug {0}".format(trait_name))
self.scene.pipeline.update_nodes_and_plugs_activation()
[docs]
def save_pipeline_parameters(self):
"""
Saving pipeline parameters (inputs and outputs) to a Json file.
:return:
"""
class MultiDimensionalArrayEncoder(json.JSONEncoder):
def encode(self, obj):
def hint_tuples(item):
if isinstance(item, tuple):
return {'__tuple__': True,
'items': [hint_tuples(e) for e in item]}
if isinstance(item, list):
return [hint_tuples(e) for e in item]
if isinstance(item, dict):
return dict((key, hint_tuples(value)) for key, value in
item.items())
else:
return item
return super(MultiDimensionalArrayEncoder, self).encode(
hint_tuples(obj))
pipeline = self.scene.pipeline
filename = qt_backend.getSaveFileName(
None, 'Save the pipeline parameters', '',
'Compatible files (*.json)')
if not filename: # save widget was cancelled by the user
return ''
if os.path.splitext(filename)[1] == '': # which means no extension
filename += '.json'
elif os.path.splitext(filename)[1] != '.json':
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('The parameters must be saved in the ".json" format, '
'not the "{0}" format'.format(
os.path.splitext(filename)[1]))
msg.setWindowTitle("Warning")
msg.setStandardButtons(QMessageBox.Ok)
msg.buttonClicked.connect(msg.close)
msg.exec_()
self.save_pipeline_parameters()
return ''
if os.path.exists(filename) and self.disable_overwrite:
msg = QMessageBox()
msg.setIcon(QMessageBox.Warning)
msg.setText('This file already exists, you do not have the '
'rights to overwrite it.')
msg.setWindowTitle("Warning")
msg.setStandardButtons(QMessageBox.Ok)
msg.buttonClicked.connect(msg.close)
msg.exec_()
self.save_pipeline_parameters()
return ''
if filename:
from traits.api import Undefined
# Generating the dictionary
param_dic = {}
for trait_name, trait in pipeline.user_traits().items():
if trait_name in ["nodes_activation"]:
continue
value = getattr(pipeline, trait_name)
if value is Undefined:
value = ""
param_dic[trait_name] = value
# In the future, more information may be added to this dictionary
dic = {}
dic["pipeline_parameters"] = param_dic
jsonstring = MultiDimensionalArrayEncoder().encode(dic)
# Saving the dictionary in the Json file
if sys.version_info[0] >= 3:
with open(filename, 'w', encoding='utf8') as file:
json.dump(jsonstring, file)
else:
with open(filename, 'w') as file:
json.dump(jsonstring, file)