# -*- coding: utf-8 -*-
""" Module to handle the node of a pipeline and its plugs.
:Contains:
:Class:
- PlugFilter (must be declared before AttributesFilter)
- AttributesFilter
- CapsulNodeController
- FilterWidget
- NodeController
"""
##########################################################################
# Populse_mia - Copyright (C) IRMaGe/CEA, 2018
# Distributed under the terms of the CeCILL license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL_V2.1-en.html
# for details.
##########################################################################
import os
import sys
from functools import partial
import sip
# capsul imports
from capsul.attributes.completion_engine import ProcessCompletionEngine
from capsul.pipeline.pipeline_nodes import PipelineNode
from capsul.pipeline.process_iteration import ProcessIteration
from capsul.qt_gui.widgets.attributed_process_widget import (
AttributedProcessWidget,
)
from matplotlib.backends.qt_compat import QtWidgets
# PyQt5 imports
from PyQt5 import Qt
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtGui import QIcon
from PyQt5.QtWidgets import (
QApplication,
QDialog,
QDialogButtonBox,
QGroupBox,
QHBoxLayout,
QLabel,
QLineEdit,
QMessageBox,
QPushButton,
QToolButton,
QVBoxLayout,
QWidget,
)
# soma-base imports
from soma.controller import trait_ids
from traits.api import TraitError, Undefined
# Populse_MIA imports
from populse_mia.data_manager.filter import Filter
from populse_mia.data_manager.project import (
COLLECTION_BRICK,
COLLECTION_CURRENT,
TAG_FILENAME,
)
from populse_mia.software_properties import Config
from populse_mia.user_interface.data_browser.advanced_search import (
AdvancedSearch,
)
from populse_mia.user_interface.data_browser.data_browser import (
TableDataBrowser,
not_defined_value,
)
from populse_mia.user_interface.data_browser.rapid_search import RapidSearch
from populse_mia.user_interface.pipeline_manager.process_mia import ProcessMIA
from populse_mia.user_interface.pop_ups import (
PopUpSelectTagCountTable,
PopUpVisualizedTags,
)
from . import type_editors
if sys.version_info[0] >= 3:
unicode = str
def values(d):
"""blabla"""
return list(d.values())
else:
[docs]
def values(d):
"""blabla"""
return d.values()
[docs]
class PlugFilter(QWidget):
"""Filter widget used on a node plug.
The widget displays a browser with the selected files of the database,
a rapid search and an advanced search to filter these files. Once the
filtering is done, the result (as a list of files) is set to the plug.
.. Methods:
- ok_clicked: set the new value to the node plug and closes the widget
- reset_search_bar: reset the search bar of the rapid search
- search_str: update the files to display in the browser
- set_plug_value: emit a signal to set the file names to the node plug
- update_tag_to_filter: update the tag to Filter
- update_tags: update the list of visualized tags
"""
plug_value_changed = pyqtSignal(list)
[docs]
def __init__(
self,
project,
scans_list,
process,
node_name,
plug_name,
node_controller,
main_window,
):
"""
Initialization of the PlugFilter widget
:param project: current project in the software
:param scans_list: list of database files to filter
:param process: process instance of the selected node
:param node_name: name of the current node
:param plug_name: name of the selected node plug
:param node_controller: parent node controller
:param main_window: parent main window
"""
super(PlugFilter, self).__init__(None)
from populse_mia.data_manager.project import COLLECTION_CURRENT
from populse_mia.user_interface.data_browser.rapid_search import (
RapidSearch,
)
self.project = project
self.node_controller = node_controller
self.main_window = main_window
self.process = process
self.plug_name = plug_name
# If the filter is saved in the node plug (not the case now)
# if hasattr(self.process, 'filters'):
# if self.plug_name in self.process.filters.keys():
# print("Already a filter for {0} plug of {1} process".format(
# self.plug_name, self.process.name))
# # TODO: fill the advanced search with the corresponding
# filter:orphan:
# Verifying that the scan names begin not with a "/" or a "\"
doc_list = []
for brick in self.main_window.pipeline_manager.brick_list:
doc = self.project.session.get_document(COLLECTION_BRICK, brick)
if doc is not None:
for key in doc["Output(s)"]:
if isinstance(doc["Output(s)"][key], str):
if doc["Output(s)"][key] != "":
doc_delete = os.path.relpath(
doc["Output(s)"][key], self.project.folder
)
doc_list.append(doc_delete)
if scans_list:
scans_list_copy = []
for scan in scans_list:
scan_no_pfolder = scan.replace(self.project.folder, "")
if scan_no_pfolder[0] in ["\\", "/"]:
scan_no_pfolder = scan_no_pfolder[1:]
if scan_no_pfolder not in doc_list:
scans_list_copy.append(scan_no_pfolder)
self.scans_list = scans_list_copy
# If there is no element in scans_list, this means that all the scans
# of the database needs to be taken into account
else:
self.scans_list = self.project.session.get_documents_names(
COLLECTION_CURRENT
)
self.setWindowTitle("Filter - " + node_name + " - " + plug_name)
# Graphical components
self.table_data = TableDataBrowser(
self.project,
self,
self.node_controller.visibles_tags,
False,
True,
link_viewer=False,
)
# Reducing the list of scans to selection
all_scans = self.table_data.scans_to_visualize
self.table_data.scans_to_visualize = self.scans_list
self.table_data.scans_to_search = self.scans_list
self.table_data.update_visualized_rows(all_scans)
search_bar_layout = QHBoxLayout()
self.rapid_search = RapidSearch(self)
self.rapid_search.textChanged.connect(partial(self.search_str))
sources_images_dir = Config().getSourceImageDir()
self.button_cross = QToolButton()
self.button_cross.setStyleSheet("background-color:rgb(255, 255, 255);")
self.button_cross.setIcon(
QIcon(os.path.join(sources_images_dir, "gray_cross.png"))
)
self.button_cross.clicked.connect(self.reset_search_bar)
search_bar_layout.addWidget(self.rapid_search)
search_bar_layout.addWidget(self.button_cross)
self.advanced_search = AdvancedSearch(
self.project,
self,
self.scans_list,
self.node_controller.visibles_tags,
from_pipeline=True,
)
self.advanced_search.show_search()
push_button_tags = QPushButton("Visualized tags")
push_button_tags.clicked.connect(self.update_tags)
self.push_button_tag_filter = QPushButton(TAG_FILENAME)
self.push_button_tag_filter.clicked.connect(self.update_tag_to_filter)
push_button_ok = QPushButton("OK")
push_button_ok.clicked.connect(self.ok_clicked)
push_button_cancel = QPushButton("Cancel")
push_button_cancel.clicked.connect(self.close)
# Layout
buttons_layout = QHBoxLayout()
buttons_layout.addWidget(push_button_tags)
buttons_layout.addWidget(self.push_button_tag_filter)
buttons_layout.addStretch(1)
buttons_layout.addWidget(push_button_ok)
buttons_layout.addWidget(push_button_cancel)
main_layout = QVBoxLayout()
main_layout.addLayout(search_bar_layout)
main_layout.addWidget(self.advanced_search)
main_layout.addWidget(self.table_data)
main_layout.addLayout(buttons_layout)
self.setLayout(main_layout)
screen_resolution = QApplication.instance().desktop().screenGeometry()
width, height = screen_resolution.width(), screen_resolution.height()
self.setMinimumWidth(round(0.6 * width))
self.setMinimumHeight(round(0.8 * height))
[docs]
def ok_clicked(self):
"""Set the new value to the node plug and closes the widget."""
# To use if the filters are set on plugs, which is not the case
# if isinstance(self.process, ProcessMIA):
# (fields, conditions, values, links, nots) =
# self.advanced_search.get_filters(False)
#
#
# plug_filter = Filter(None, nots, values, fields,
# links, conditions, "")
# self.process.filters[self.plug_name] = plug_filter
self.set_plug_value()
self.close()
[docs]
def reset_search_bar(self):
"""Reset the search bar of the rapid search."""
self.rapid_search.setText("")
self.advanced_search.rows = []
self.advanced_search.show_search()
# All rows reput
old_scan_list = self.table_data.scans_to_visualize
self.table_data.scans_to_visualize = self.scans_list
self.table_data.scans_to_search = self.scans_list
self.table_data.update_visualized_rows(old_scan_list)
[docs]
def search_str(self, str_search):
"""Update the files to display in the browser.
:param str_search: string typed in the rapid search
"""
old_scan_list = self.table_data.scans_to_visualize
# Every scan taken if empty search
if str_search == "":
return_list = self.table_data.scans_to_search
else:
# Scans with at least a not defined value
if str_search == not_defined_value:
filter = self.prepare_not_defined_filter(
self.project.session.get_shown_tags()
)
# Scans matching the search
else:
filter = self.rapid_search.prepare_filter(
str_search,
self.project.session.get_shown_tags(),
self.table_data.scans_to_search,
)
generator = self.project.session.filter_documents(
COLLECTION_CURRENT, filter
)
# Creating the list of scans
return_list = [getattr(scan, TAG_FILENAME) for scan in generator]
self.table_data.scans_to_visualize = return_list
self.advanced_search.scans_list = return_list
# Rows updated
self.table_data.update_visualized_rows(old_scan_list)
[docs]
def set_plug_value(self):
"""Emit a signal to set the file names to the node plug."""
result_names = []
points = self.table_data.selectedIndexes()
# If the user has selected some items
if points:
for point in points:
row = point.row()
tag_name = self.push_button_tag_filter.text()
if tag_name.startswith("&"):
tag_name = tag_name[1:]
# We get the FileName of the scan from the first row
scan_name = self.table_data.item(row, 0).text()
value = self.project.session.get_value(
COLLECTION_CURRENT, scan_name, tag_name
)
if tag_name == TAG_FILENAME:
value = os.path.abspath(
os.path.join(self.project.folder, value)
)
result_names.append(value)
else:
filter = self.table_data.get_current_filter()
for i in range(len(filter)):
scan_name = filter[i]
tag_name = self.push_button_tag_filter.text()
value = self.project.session.get_value(
COLLECTION_CURRENT, scan_name, tag_name
)
if tag_name == TAG_FILENAME:
value = os.path.abspath(
os.path.join(self.project.folder, value)
)
result_names.append(value)
self.plug_value_changed.emit(result_names)
[docs]
def update_tag_to_filter(self):
"""Update the tag to Filter."""
popUp = PopUpSelectTagCountTable(
self.project,
self.node_controller.visibles_tags,
self.push_button_tag_filter.text(),
)
if popUp.exec_():
self.push_button_tag_filter.setText(popUp.selected_tag)
[docs]
class AttributesFilter(PlugFilter):
"""Filter widget used on an attributes set for completion.
The widget displays a browser with the selected files of the database,
a rapid search and an advanced search to filter these files. Once the
filtering is done, the result (as a list of files) is set to the plug.
.. Methods:
- ok_clicked: close the widget
"""
attributes_selected = pyqtSignal(dict)
[docs]
def ok_clicked(self):
"""Close the widget"""
self.close()
attributes = {}
points = self.table_data.selectedIndexes()
# If the user has selected some items
if points:
for point in points:
row = point.row()
for tag_name in self.project.session.get_fields_names(
COLLECTION_CURRENT
):
# We get the FileName of the scan from the first row
scan_name = self.table_data.item(row, 0).text()
value = self.project.session.get_value(
COLLECTION_CURRENT, scan_name, tag_name
)
attributes.setdefault(tag_name, []).append(value)
else:
filter = self.table_data.get_current_filter()
for i in range(len(filter)):
scan_name = filter[i]
for tag_name in self.project.session.get_fields_names(
COLLECTION_CURRENT
):
value = self.project.session.get_value(
COLLECTION_CURRENT, scan_name, tag_name
)
attributes.setdefault(tag_name, []).append(value)
self.attributes_selected.emit(attributes)
# Node controller V2 style
[docs]
class CapsulNodeController(QWidget):
"""
Implementation of NodeController using Capsul AttributedProcessWidget
widget.
.. Methods:
- display_parameters: display the parameters of the selected node
- static_release: remove notification
- release_process: remove notification from process
- update_parameters: update the parameters values
- parameters_changed: emit the value_changed signal
- update_node_name: change the name of the selected node and updates
the pipeline
- rename_subprocesses: change the name of a node
- filter_attributes: display a filter widget
- update_attributes_from_filter: update attributes from filter widget
"""
value_changed = pyqtSignal(list)
[docs]
def __init__(self, project, scan_list, pipeline_manager_tab, main_window):
super().__init__()
self.project = project
self.scan_list = scan_list
self.main_window = main_window
self.node_name = ""
self.visibles_tags = []
self.pipeline = (
pipeline_manager_tab.pipelineEditorTabs.get_current_pipeline()
)
# Layouts
v_box_final = QVBoxLayout()
self.setLayout(v_box_final)
self.process_widget = None
# Node name
hlayout = QHBoxLayout()
label_node_name = QLabel()
label_node_name.setText("Node name:")
self.line_edit_node_name = QLineEdit()
hlayout.addWidget(label_node_name)
hlayout.addWidget(self.line_edit_node_name)
v_box_final.addLayout(hlayout)
[docs]
def display_parameters(self, node_name, process, pipeline):
"""Display the parameters of the selected node.
The node parameters are read and line labels/line edits/push buttons
are created for each of them. This methods consists mainly in widget
and layout organization.
:param node_name: name of the node
:param process: process of the node
:param pipeline: current pipeline
"""
self.node_name = node_name
self.pipeline = pipeline
# The pipeline global inputs and outputs node name cannot be modified
if self.node_name not in ("inputs", "outputs"):
self.line_edit_node_name.setText(self.node_name)
self.line_edit_node_name.setReadOnly(False)
self.line_edit_node_name.returnPressed.connect(
self.update_node_name
)
else:
self.line_edit_node_name.setText("Pipeline inputs/outputs")
self.line_edit_node_name.setReadOnly(True)
if self.process_widget:
# item = self.layout().takeAt(1)
self.static_release(
self.process_widget.attributed_process, self.parameters_changed
)
self.process_widget.deleteLater()
# del item
self.process_widget = None
# get the list of inputs connected from outputs of upstream nodes
# in order to disable their "filter" file button
node = pipeline.nodes.get(node_name)
connected_inputs = set()
if node is not None:
for plug_name, plug in node.plugs.items():
if not plug.output and plug.links_from:
connected_inputs.add(plug_name)
userlevel = Config().get_user_level()
self.process = process
# force initializing the completion engine
ProcessCompletionEngine.get_completion_engine(process)
# fmt: off
self.process_widget = AttributedProcessWidget(
process,
override_control_types={
"File": type_editors.PopulseFileControlWidget,
"Directory": type_editors.PopulseDirectoryControlWidget,
"List_File": type_editors.
PopulseOffscreenListFileControlWidget,
"Undefined": type_editors.PopulseUndefinedControlWidget,
},
separate_outputs=True,
user_data={
"project": self.project,
"scan_list": self.scan_list,
"main_window": self.main_window,
"node_controller": self,
"connected_inputs": connected_inputs,
},
scroll=False,
userlevel=userlevel,
)
# fmt: on
if hasattr(process, "completion_engine"):
compl = process.completion_engine
atts = compl.get_attribute_values()
if len(atts.user_traits()) != 0:
btn = QPushButton("Filter")
btn.setSizePolicy(Qt.QSizePolicy.Fixed, Qt.QSizePolicy.Fixed)
btn.clicked.connect(self.filter_attributes)
self.process_widget.attrib_widget.layout().insertWidget(0, btn)
self.layout().addWidget(self.process_widget)
self.process.on_trait_change(self.parameters_changed, dispatch="ui")
# this cannot be done in __del__ since the C++ part will be already
# destroyed by then.
# However this signal seems never to be emitted, and I don't understand
# why. So release_process() has to be called manually from the
# pipeline manager. Sigh.
# self.process_widget.destroyed.connect(partial(
# self.static_release,
# process,
# self.parameters_changed))
[docs]
@staticmethod
def static_release(process, param_changed):
"""Remove notification"""
process.on_trait_change(param_changed, remove=True)
[docs]
def release_process(self):
"""Remove notification from process"""
if hasattr(self, "process"):
self.process.on_trait_change(self.parameters_changed, remove=True)
try:
if not sip.isdeleted(self):
self.value_changed.disconnect()
except TypeError:
pass # it was not connected: OK
[docs]
def update_parameters(self, process=None):
"""Update the parameters values.
Does nothing any longer since the controller widget already reacts to
changes in the process parameters.
:param process: process of the node
"""
pass
[docs]
def parameters_changed(self, _, plug_name, old_value, new_value):
"""Emit the value_changed signal."""
# plug_name_type = type(plug_name)
plug_type = type(new_value)
self.value_changed.emit(
[
"plug_value",
self.node_name,
old_value,
plug_name,
plug_type,
new_value,
]
)
[docs]
def update_node_name(
self,
new_node_name=None,
old_node_name=None,
from_undo=False,
from_redo=False,
):
"""Change the name of the selected node and updates the pipeline.
Because the nodes are stored in a dictionary, we have to create
a new node that has the same traits as the selected one and create
new links that are the same than the selected node.
:param new_node_name: new node name (is None except when this method
is called from an undo/redo)
:param old_node_name: old node name (is None except when this method
is called from an undo/redo)
:param from_undo: boolean, True if the action has been made using an
undo
:param from_redo: boolean, True if the action has been made using a
redo
"""
if not new_node_name:
new_node_name = self.line_edit_node_name.text()
if not old_node_name:
old_node_name = self.node_name
if isinstance(self.process, ProcessIteration):
if not new_node_name.startswith("iterated_"):
new_node_name = "iterated_" + new_node_name
self.line_edit_node_name.setText(new_node_name)
if new_node_name in list(self.pipeline.nodes.keys()):
print(
"\nUpdate of the node name from '{0}' to '{1}': impossible, "
"the node '{1}' already exists !".format(
old_node_name, new_node_name
)
)
else:
self.pipeline.rename_node(old_node_name, new_node_name)
self.rename_subprocesses(
self.pipeline.nodes[new_node_name], new_node_name
)
# Updating the node_name attribute
self.node_name = new_node_name
self.pipeline.update_nodes_and_plugs_activation()
# To undo/redo
self.value_changed.emit(
[
"node_name",
self.pipeline.nodes[new_node_name],
new_node_name,
old_node_name,
]
)
# For history
history_maker = ["update_node_name"]
if from_undo:
# TODO: next line is strange!
history_maker.append
else:
history_maker.append(self.pipeline.nodes[new_node_name])
history_maker.append(new_node_name)
history_maker.append(old_node_name)
# fmt:off
(
self.main_window.pipeline_manager.pipelineEditorTabs.
get_current_editor().update_history
)(history_maker, from_undo, from_redo)
# fmt: on
self.main_window.statusBar().showMessage(
'Node name "{0}" has been changed to "{1}".'.format(
old_node_name, new_node_name
)
)
[docs]
def rename_subprocesses(self, node, parent_node_name):
"""Change the name of a node."""
if (
getattr(node.process, "context_name", node.process.name).split(
"."
)[0]
== "Pipeline"
):
if (
len(
getattr(
node.process, "context_name", node.process.name
).split(".")
)
>= 3
):
node.process.context_name = (
"Pipeline."
+ parent_node_name
+ "."
+ ".".join(
getattr(
node.process, "context_name", node.process.name
).split(".")[2:]
)
)
else:
node.process.context_name = "Pipeline." + parent_node_name
else:
node.process.context_name = parent_node_name
if isinstance(node, PipelineNode):
for name, subnode in node.process.nodes.items():
if name == "":
continue
else:
self.rename_subprocesses(subnode, parent_node_name)
[docs]
def filter_attributes(self):
"""Display a filter widget."""
self.pop_up = AttributesFilter(
self.project,
self.scan_list,
self.process,
self.node_name,
"attributes",
self,
self.main_window,
)
self.pop_up.show()
self.pop_up.attributes_selected.connect(
self.update_attributes_from_filter
)
[docs]
def update_attributes_from_filter(self, attributes):
"""Update attributes from filter widget"""
compl = self.process.completion_engine
atts = compl.get_attribute_values()
num_set = 0
for name, value in attributes.items():
if atts.trait(name):
if isinstance(getattr(atts, name), list):
setattr(atts, name, value)
else:
setattr(atts, name, value[0])
num_set += 1
if num_set == 0 and len(attributes) != 0:
mbox_icon = QMessageBox.Information
mbox_title = "Unmatching tags / attributes"
mbox_text = (
"The selected data tags do not match the expected "
"attributes set for process parameters completion"
)
mbox = QMessageBox(mbox_icon, mbox_title, mbox_text)
Qt.QTimer.singleShot(2000, mbox.accept)
mbox.exec()
# Node controller V1 style
[docs]
class NodeController(QWidget):
"""
Allow to change the input and output values of a pipeline node
.. Methods:
- clearLayout: clear the layouts of the widget
- display_filter: display a filter widget
- display_parameters: display the parameters of the selected node
- get_index_from_plug_name: return the index of the plug label.
- update_node_name: update the name of the selected node
- rename_subprocesses: change the name of a node
- update_parameters: update the parameters values
- update_plug_value: update the value of a node plug
- update_plug_value_from_filter: update the plug value from a filter
result
- release_process: remove notification from process (not implemented)
"""
value_changed = pyqtSignal(list)
[docs]
def __init__(self, project, scan_list, pipeline_manager_tab, main_window):
"""
Initialization of the Node Controller
:param project: current project in the software
:param scan_list: list of the selected database files
:param pipeline_manager_tab: parent widget
:param main_window: main window of the software
"""
super(NodeController, self).__init__(pipeline_manager_tab)
self.project = project
self.scan_list = scan_list
self.main_window = main_window
self.node_name = ""
self.pipeline = (
pipeline_manager_tab.pipelineEditorTabs.get_current_pipeline()
)
# Layouts
self.v_box_final = QVBoxLayout()
self.h_box_node_name = QHBoxLayout()
[docs]
def clearLayout(self, layout):
"""Clear the layouts of the widget.
:param layout: widget with a layout
"""
for i in reversed(range(len(layout.children()))):
if type(layout.layout().itemAt(i)) == QtWidgets.QWidgetItem:
layout.layout().itemAt(i).widget().setParent(None)
if (
type(layout.layout().itemAt(i)) == QtWidgets.QHBoxLayout
or type(layout.layout().itemAt(i)) == QtWidgets.QVBoxLayout
):
layout.layout().itemAt(i).deleteLater()
for j in reversed(range(len(layout.layout().itemAt(i)))):
layout.layout().itemAt(i).itemAt(j).widget().setParent(
None
)
if layout.layout() is not None:
sip.delete(layout.layout())
[docs]
def display_filter(self, node_name, plug_name, parameters, process):
"""Display a filter widget.
:param node_name: name of the node
:param plug_name: name of the plug
:param parameters: tuple containing the index of the plug, the current
pipeline instance and the type of the plug value
:param process: process of the node
"""
self.pop_up = PlugFilter(
self.project,
self.scan_list,
process,
node_name,
plug_name,
self,
self.main_window,
)
self.pop_up.show()
self.pop_up.plug_value_changed.connect(
partial(self.update_plug_value_from_filter, plug_name, parameters)
)
# def display_parameters(self, node_name: object,
# process: object, pipeline: object) -> object:
[docs]
def display_parameters(self, node_name, process, pipeline):
"""Display the parameters of the selected node.
The node parameters are read and line labels/line edits/push buttons
are created for each of them. This methods consists mainly in widget
and layout organization.
:param node_name: name of the node
:param process: process of the node
:param pipeline: current pipeline
"""
self.node_name = node_name
self.current_process = process
self.line_edit_input = []
self.line_edit_output = []
self.labels_input = []
self.labels_output = []
# Refreshing the layouts
if len(self.children()) > 0:
self.clearLayout(self)
self.v_box_final = QVBoxLayout()
# Node name
label_node_name = QLabel()
label_node_name.setText("Node name:")
self.line_edit_node_name = QLineEdit()
self.pipeline = pipeline
# The pipeline global inputs and outputs node name cannot be modified
if self.node_name not in ("inputs", "outputs"):
self.line_edit_node_name.setText(self.node_name)
self.line_edit_node_name.returnPressed.connect(
self.update_node_name
)
else:
self.line_edit_node_name.setText("Pipeline inputs/outputs")
self.line_edit_node_name.setReadOnly(True)
self.h_box_node_name = QHBoxLayout()
self.h_box_node_name.addWidget(label_node_name)
self.h_box_node_name.addWidget(self.line_edit_node_name)
# Inputs
self.button_group_inputs = QGroupBox("Inputs")
self.v_box_inputs = QVBoxLayout()
idx = 0
for name, trait in process.user_traits().items():
if name == "nodes_activation":
continue
if trait.userlevel is not None and trait.userlevel > 0:
continue
if not trait.output:
label_input = QLabel()
label_input.setText(str(name))
self.labels_input.insert(idx, label_input)
try:
value = getattr(process, name)
except TraitError:
value = Undefined
trait_type = trait_ids(process.trait(name))
self.line_edit_input.insert(idx, QLineEdit())
self.line_edit_input[idx].setText(str(value))
self.line_edit_input[idx].returnPressed.connect(
partial(
self.update_plug_value,
"in",
name,
pipeline,
type(value),
)
)
h_box = QHBoxLayout()
h_box.addWidget(label_input)
h_box.addWidget(self.line_edit_input[idx])
# Adding the possibility to filter pipeline global
# inputs except if the input is "database_scans"
# which means that the scans will be filtered with InputFilter
if self.node_name == "inputs" and name != "database_scans":
if (
"File" in trait_type
or "List_File" in trait_type
or "Any" in trait_type
):
parameters = (idx, pipeline, type(value))
push_button = QPushButton("Filter")
push_button.clicked.connect(
partial(
self.display_filter,
self.node_name,
name,
parameters,
process,
)
)
h_box.addWidget(push_button)
self.v_box_inputs.addLayout(h_box)
idx += 1
self.button_group_inputs.setLayout(self.v_box_inputs)
# Outputs
self.button_group_outputs = QGroupBox("Outputs")
self.v_box_outputs = QVBoxLayout()
idx = 0
for name, trait in process.traits(output=True).items():
if trait.userlevel is not None and trait.userlevel > 0:
continue
label_output = QLabel()
label_output.setText(str(name))
self.labels_output.insert(idx, label_output)
value = getattr(process, name)
trait_type = trait_ids(process.trait(name))
self.line_edit_output.insert(idx, QLineEdit())
self.line_edit_output[idx].setText(str(value))
self.line_edit_output[idx].returnPressed.connect(
partial(
self.update_plug_value, "out", name, pipeline, type(value)
)
)
h_box = QHBoxLayout()
h_box.addWidget(label_output)
h_box.addWidget(self.line_edit_output[idx])
self.v_box_outputs.addLayout(h_box)
idx += 1
self.button_group_outputs.setLayout(self.v_box_outputs)
self.v_box_final.addLayout(self.h_box_node_name)
self.v_box_final.addWidget(self.button_group_inputs)
self.v_box_final.addWidget(self.button_group_outputs)
# fmt: off
(
self.main_window.pipeline_manager.pipelineEditorTabs.
get_current_editor
)().node_parameters_tmp[node_name] = {}
(
self.main_window.pipeline_manager.pipelineEditorTabs.
get_current_editor
)().node_parameters_tmp[node_name]["inputs"] = [
x.text() for x in self.line_edit_input
]
(
self.main_window.pipeline_manager.pipelineEditorTabs.
get_current_editor
)().node_parameters_tmp[node_name]["outputs"] = [
x.text() for x in self.line_edit_output
]
if (
"outputs" in
self.main_window.pipeline_manager.pipelineEditorTabs.
get_current_editor().node_parameters_tmp
):
del (
self.main_window.pipeline_manager.pipelineEditorTabs.
get_current_editor().node_parameters_tmp["outputs"]
)
# fmt: on
# Commented on January, 4th 2020
# if self.main_window.pipeline_manager.pipelineEditorTabs\
# .get_current_editor().node_parameters_tmp == \
# self.main_window.pipeline_manager.pipelineEditorTabs\
# .get_current_editor().node_parameters and \
# self.main_window.pipeline_manager.test_init and \
# self.main_window.pipeline_manager.pipelineEditorTabs\
# .get_current_editor().initialized:
# self.main_window.pipeline_manager.run_pipeline_action \
# .setDisabled(False)
# else:
# self.main_window.pipeline_manager.run_pipeline_action\
# .setDisabled(True)
# if self.main_window.pipeline_manager.iterationTable\
# .check_box_iterate.isChecked():
# self.main_window.pipeline_manager.run_pipeline_action\
# .setDisabled(False)
self.main_window.pipeline_manager.run_pipeline_action.setDisabled(
False
)
# End - commented on January, 4th 2020
self.setLayout(self.v_box_final)
[docs]
def get_index_from_plug_name(self, plug_name, in_or_out):
"""Return the index of the plug label.
:param plug_name: name of the plug
:param in_or_out: "in" if the plug is an input plug, "out" else
:return: the corresponding index
"""
if in_or_out == "in":
for idx, label in enumerate(self.labels_input):
if label.text() == plug_name:
return idx
else:
for idx, label in enumerate(self.labels_output):
if label.text() == plug_name:
return idx
[docs]
def update_node_name(self, new_node_name=None):
"""Change the name of the selected node and updates the pipeline.
Because the nodes are stored in a dictionary, we have to create
a new node that has the same traits as the selected one and create
new links that are the same than the selected node.
:param new_node_name: new node name (is None except when this method
is called from an undo/redo)
"""
# Copying the old node
old_node_name = self.node_name
if not new_node_name:
new_node_name = self.line_edit_node_name.text()
if isinstance(
self.pipeline.list_process_in_pipeline[0], ProcessIteration
):
if not new_node_name.startswith("iterated_"):
new_node_name = "iterated_" + new_node_name
self.line_edit_node_name.setText(new_node_name)
if new_node_name in list(self.pipeline.nodes.keys()):
print("Node name already in pipeline")
else:
self.pipeline.rename_node(old_node_name, new_node_name)
self.rename_subprocesses(
self.pipeline.nodes[new_node_name], new_node_name
)
# Updating the node_name attribute
self.node_name = new_node_name
# To undo/redo
self.value_changed.emit(
[
"node_name",
self.pipeline.nodes[new_node_name],
new_node_name,
old_node_name,
]
)
self.main_window.statusBar().showMessage(
'Node name "{0}" has been changed to "{1}".'.format(
old_node_name, new_node_name
)
)
[docs]
def rename_subprocesses(self, node, parent_node_name):
"""Change the name of a node."""
if (
getattr(node.process, "context_name", node.process.name).split(
"."
)[0]
== "Pipeline"
):
if (
len(
getattr(
node.process, "context_name", node.process.name
).split(".")
)
>= 3
):
node.process.context_name = (
"Pipeline."
+ parent_node_name
+ "."
+ ".".join(
getattr(
node.process, "context_name", node.process.name
).split(".")[2:]
)
)
else:
node.process.context_name = "Pipeline." + parent_node_name
else:
node.process.context_name = parent_node_name
if isinstance(node, PipelineNode):
for name, subnode in node.process.nodes.items():
if name == "":
continue
else:
self.rename_subprocesses(subnode, parent_node_name)
[docs]
def update_parameters(self, process=None):
"""Update the parameters values.
:param process: process of the node
"""
# commented on January, 4th 2020
# self.main_window.pipeline_manager.run_pipeline_action.setDisabled(
# True)
if process is None:
try:
process = self.current_process
except AttributeError:
# if no node has been clicked, no need to update the widget
return
idx = 0
for name, trait in process.user_traits().items():
if name == "nodes_activation":
continue
if not trait.output:
try:
value = getattr(process, name)
except TraitError:
value = Undefined
if idx < len(self.line_edit_input):
self.line_edit_input[idx].setText(str(value))
idx += 1
idx = 0
for name, trait in process.traits(output=True).items():
value = getattr(process, name)
if idx < len(self.line_edit_output):
self.line_edit_output[idx].setText(str(value))
idx += 1
# self.main_window.pipeline_manager.pipelineEditorTabs\
# .update_current_node()
[docs]
def update_plug_value(
self, in_or_out, plug_name, pipeline, value_type, new_value=None
):
"""Update the value of a node plug.
:param in_or_out: "in" if the plug is an input plug, "out" else
:param plug_name: name of the plug
:param pipeline: current pipeline
:param value_type: type of the plug value
:param new_value: new value for the plug (is None except when this
method is called from an undo/redo)
"""
index = self.get_index_from_plug_name(plug_name, in_or_out)
# Reading the value from the plug's line edit
if not new_value:
if in_or_out == "in":
new_value = self.line_edit_input[index].text()
elif in_or_out == "out":
new_value = self.line_edit_output[index].text()
else:
new_value = None
try:
new_value = eval(new_value)
# We try to handle the undefined value with the eval() function
# See FixME below.
except SyntaxError:
new_value = new_value.replace("<undefined>", "'<undefined>'")
try:
new_value = eval(new_value)
except Exception as err:
print("{0}: {1}.".format(err.__class__, err))
except NameError:
pass
except Exception as err:
print("{0}: {1}.".format(err.__class__, err))
# except NameError:
# print("NameError for value {0}".format(new_value))
# except SyntaxError:
# pass
# print("SynthaxError for value {0}".format(new_value))
if value_type not in [float, int, str, list]:
value_type = str
if self.node_name in ["inputs", "outputs"]:
node_name = ""
else:
node_name = self.node_name
old_value = pipeline.nodes[node_name].get_plug_value(plug_name)
try:
# FIXME:Since we replace, above, "<undefined>" with "<undefined>"
# in order to handle syntax error with eval() we should handle
# all cases here (big job).
# For the moment we manage only the dictionary
if isinstance(new_value, dict):
new_value = {
k: Undefined if v == "<undefined>" else v
for k, v in new_value.items()
}
pipeline.nodes[node_name].set_plug_value(plug_name, new_value)
except (TraitError, OSError) as err:
msg = QMessageBox()
msg.setText("{}".format(err))
msg.setIcon(QMessageBox.Warning)
msg.setWindowTitle(err.__class__.__name__)
msg.exec_()
if in_or_out == "in":
self.line_edit_input[index].setText(str(old_value))
elif in_or_out == "out":
self.line_edit_output[index].setText(str(old_value))
return
# Update pipeline to "propagate" the node value
pipeline.update_nodes_and_plugs_activation()
if in_or_out == "in":
self.line_edit_input[index].setText(str(new_value))
elif in_or_out == "out":
self.line_edit_output[index].setText(str(new_value))
# To undo/redo
self.value_changed.emit(
[
"plug_value",
self.node_name,
old_value,
plug_name,
value_type,
new_value,
]
)
self.main_window.statusBar().showMessage(
'Plug "{0}" of node "{1}" has been changed to "{2}".'.format(
plug_name, node_name, new_value
)
)
[docs]
def update_plug_value_from_filter(
self, plug_name, parameters, filter_res_list
):
"""Update the plug value from a filter result.
:param plug_name: name of the plug
:param parameters: tuple containing the index of the plug, the current
pipeline instance and the type of the plug value
:param filter_res_list: list of the filtered files
"""
pipeline = parameters[1]
value_type = parameters[2]
# If the list contains only one element, setting
# this element as the plug value
len_list = len(filter_res_list)
if len_list > 1:
res = filter_res_list
elif len_list == 1:
res = filter_res_list[0]
else:
res = []
self.update_plug_value("in", plug_name, pipeline, value_type, res)
[docs]
def release_process(self):
"""
Remove notification from process
"""
# only implemented in CapsulNodeController
pass