"""Module that handles pipeline iteration.
:Contains:
:Class:
- IterationTable
"""
##########################################################################
# Populse_mia - Copyright (C) IRMaGe/CEA, 2018
# Distributed under the terms of the CeCILL license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL_V2.1-en.html
# for details.
##########################################################################
import os
# PyQt5 imports
from PyQt5.QtCore import pyqtSignal
from PyQt5.QtGui import QPixmap
from PyQt5.QtWidgets import (
QCheckBox,
QComboBox,
QHBoxLayout,
QLabel,
QPushButton,
QTableWidget,
QTableWidgetItem,
QVBoxLayout,
QWidget,
)
from populse_mia.data_manager import COLLECTION_CURRENT, TAG_FILENAME
from populse_mia.software_properties import Config
# MIA imports
from populse_mia.user_interface.pipeline_manager.process_mia import ProcessMIA
from populse_mia.user_interface.pop_ups import (
ClickableLabel,
PopUpSelectIteration,
PopUpSelectTagCountTable,
)
[docs]
class IterationTable(QWidget):
"""
Widget that handles pipeline iteration.
This widget allows users to select tags for iteration and visualization,
filter values, and manage the iteration process for pipeline execution.
.. Methods:
- _create_tag_button: Create a new tag button
- add_tag: adds a tag to visualize in the iteration table
- emit_iteration_table_updated: emits a signal when the iteration
scans have been updated
- fill_values: fill values_list depending on the visualized tags
- filter_values: select the tag values used for the iteration
- refresh_layout: updates the layout of the widget
- remove_tag: removes a tag to visualize in the iteration table
- select_iteration_tag: opens a pop-up to let the user select on which
tag to iterate
- select_visualized_tag: opens a pop-up to let the user select which
tag to visualize in the iteration table
- update_iterated_tag: updates the widget
- update_table: updates the iteration table
- update_selected_tag: updates the selected tag for current pipeline
manager tab
.. Signals:
- iteration_table_updated: Emitted when iteration scans have been
updated.
"""
iteration_table_updated = pyqtSignal(list, list)
[docs]
def __init__(self, project, scan_list=None, main_window=None):
"""
Initialize the IterationTable widget.
:param project: Current project in the software.
:param scan_list: List of the selected database files. If None, all
documents from the current collection will be used.
:param main_window: Software's main window reference.
"""
super().__init__()
# Necessary for using Mia bricks
ProcessMIA.project = project
self.project = project
self.main_window = main_window
# Initialize scan list
with self.project.database.data() as database_data:
self.scan_list = (
scan_list
if scan_list
else database_data.get_document_names(COLLECTION_CURRENT)
)
# Initialize tag values data structures
# values_list will contain the different values of each selected tag
self.values_list = [[], []]
self.all_tag_values = []
# Set up iteration controls
self.check_box_iterate = QCheckBox("Iterate pipeline")
self.check_box_iterate.stateChanged.connect(
self.emit_iteration_table_updated
)
self.label_iterate = QLabel("Iterate over:")
self.iterated_tag_label = QLabel("Select a tag")
self.iterated_tag_push_button = QPushButton("Select")
self.iterated_tag_push_button.clicked.connect(
self.select_iteration_tag
)
# Set up filtering and value selection
self.combo_box = QComboBox()
self.combo_box.currentIndexChanged.connect(self.update_table)
self.filter_button = QPushButton("Filter")
self.filter_button.clicked.connect(self.filter_values)
# Set up table widget
self.iteration_table = QTableWidget()
# Set up tag visualization controls
self.label_tags = QLabel("Tags to visualize:")
# Create default tag buttons
self.push_buttons = []
self._create_tag_button("SequenceName", 0)
self._create_tag_button("AcquisitionDate", 1)
# Set up add/remove tag controls
sources_images_dir = Config().getSourceImageDir()
self.add_tag_label = ClickableLabel()
self.add_tag_label.setObjectName("plus")
add_tag_picture = QPixmap(
os.path.relpath(os.path.join(sources_images_dir, "green_plus.png"))
)
add_tag_picture = add_tag_picture.scaledToHeight(15)
self.add_tag_label.setPixmap(add_tag_picture)
self.add_tag_label.clicked.connect(self.add_tag)
self.remove_tag_label = ClickableLabel()
remove_tag_picture = QPixmap(
os.path.relpath(os.path.join(sources_images_dir, "red_minus.png"))
)
remove_tag_picture = remove_tag_picture.scaledToHeight(20)
self.remove_tag_label.setPixmap(remove_tag_picture)
self.remove_tag_label.clicked.connect(self.remove_tag)
# Set up layout
self.v_layout = QVBoxLayout()
self.setLayout(self.v_layout)
self.refresh_layout()
def _create_tag_button(self, text, index):
"""Create a new tag button with the given text and index.
:param text (str): Text to display on the button.
:param index (int): Index of the button in the push_buttons list.
"""
button = QPushButton(text)
button.clicked.connect(lambda: self.select_visualized_tag(index))
self.push_buttons.insert(index, button)
return button
[docs]
def add_tag(self):
"""
Add a new tag button to visualize in the iteration table.
Used only for tests.
"""
idx = len(self.push_buttons)
button_text = f"Tag n°{idx + 1}"
self._create_tag_button(button_text, idx)
self.refresh_layout()
[docs]
def emit_iteration_table_updated(self):
"""Emit a signal when the iteration scans have been updated."""
if self.check_box_iterate.checkState():
if hasattr(self, "scans"):
self.iteration_table_updated.emit(
self.iteration_scans, self.all_iterations_scans
)
else:
self.iteration_table_updated.emit(
self.scan_list, [self.scan_list]
)
else:
self.iteration_table_updated.emit(self.scan_list, [self.scan_list])
[docs]
def fill_values(self, idx):
"""
Fill values_list with unique tag values for the specified tag.
:param idx (int): Index of the tag in push_buttons list.
"""
tag_name = self.push_buttons[idx].text()
values = []
# Get all unique values for this tag from current documents
with self.project.database.data() as database_data:
for scan in database_data.get_document_names(COLLECTION_CURRENT):
current_value = database_data.get_value(
collection_name=COLLECTION_CURRENT,
primary_key=scan,
field=tag_name,
)
if current_value is not None:
values.append(current_value)
# Ensure values_list has enough slots
while len(self.values_list) <= idx:
self.values_list.append([])
# Reset and fill values for this tag
if self.values_list[idx] is not None:
self.values_list[idx] = []
for value in values:
if value not in self.values_list[idx]:
self.values_list[idx].append(value)
[docs]
def filter_values(self):
"""Open a dialog to select specific tag values for iteration."""
# fmt: off
current_editor = (
self.main_window.pipeline_manager.pipelineEditorTabs
.get_current_editor()
)
# fmt: on
iterated_tag = current_editor.iterated_tag
tag_values = current_editor.all_tag_values_list
ui_iteration = PopUpSelectIteration(iterated_tag, tag_values)
if ui_iteration.exec_():
# Remove ampersands from tag values (used for shortcuts in Qt)
tag_values_list = [
t.replace("&", "") for t in ui_iteration.final_values
]
current_editor.tag_values_list = tag_values_list
# Update the combo box with filtered values
self.combo_box.clear()
self.combo_box.addItems(tag_values_list)
self.update_table()
[docs]
def refresh_layout(self):
"""Update the layout of the widget.
Called in widget's initialization and when a tag push button
is added or removed.
"""
# Clear existing layout
for i in reversed(range(self.v_layout.count())):
item = self.v_layout.itemAt(i)
if item:
self.v_layout.removeItem(item)
# Create top row controls
first_v_layout = QVBoxLayout()
first_v_layout.addWidget(self.check_box_iterate)
second_v_layout = QVBoxLayout()
second_v_layout.addWidget(self.label_iterate)
second_v_layout.addWidget(self.iterated_tag_label)
third_v_layout = QVBoxLayout()
third_v_layout.addWidget(self.iterated_tag_push_button)
hbox = QHBoxLayout()
hbox.addWidget(self.combo_box)
hbox.addWidget(self.filter_button)
third_v_layout.addLayout(hbox)
top_layout = QHBoxLayout()
top_layout.addLayout(first_v_layout)
top_layout.addLayout(second_v_layout)
top_layout.addLayout(third_v_layout)
# Add components to main layout
self.v_layout.addLayout(top_layout)
self.v_layout.addWidget(self.iteration_table)
# Create tag visualization controls
self.h_box = QHBoxLayout()
self.h_box.setSpacing(10)
self.h_box.addWidget(self.label_tags)
for tag_button in self.push_buttons:
self.h_box.addWidget(tag_button)
self.h_box.addWidget(self.add_tag_label)
self.h_box.addWidget(self.remove_tag_label)
self.h_box.addStretch(1)
self.v_layout.addLayout(self.h_box)
[docs]
def remove_tag(self):
"""Remove the last tag button from the visualization list."""
if self.push_buttons:
# Remove the last button
button = self.push_buttons.pop()
button.close()
button.deleteLater()
button = None
# Remove corresponding values
if len(self.values_list) > 0:
self.values_list.pop()
# Update UI
self.refresh_layout()
self.update_table()
[docs]
def select_iteration_tag(self):
"""Open a dialog to let the user select which tag to iterate over."""
# fmt: off
current_editor = (
self.main_window.pipeline_manager
.pipelineEditorTabs.get_current_editor()
)
# fmt: on
with self.project.database.data() as database_data:
available_fields = database_data.get_field_names(
COLLECTION_CURRENT
)
ui_select = PopUpSelectTagCountTable(
self.project,
available_fields,
current_editor.iterated_tag,
)
if ui_select.exec_():
if not (
current_editor.iterated_tag is None
and ui_select.selected_tag is None
):
current_editor.iterated_tag = ui_select.selected_tag
self.update_selected_tag(ui_select.selected_tag)
[docs]
def select_visualized_tag(self, idx):
"""
Open a dialog to select which tag to visualize in the iteration table.
:param idx (int): Index of the clicked push button.
"""
with self.project.database.data() as database_data:
available_fields = database_data.get_field_names(
COLLECTION_CURRENT
)
current_tag = self.push_buttons[idx].text()
popUp = PopUpSelectTagCountTable(
self.project,
available_fields,
current_tag,
)
if popUp.exec_() and popUp.selected_tag is not None:
self.push_buttons[idx].setText(popUp.selected_tag)
self.fill_values(idx)
self.update_table()
[docs]
def update_iterated_tag(self, tag_name=None):
"""
Update the widget when the iterated tag is modified.
:param tag_name (str): Name of the iterated tag.
"""
# Update scan list
if self.main_window.pipeline_manager.scan_list:
self.scan_list = self.main_window.pipeline_manager.scan_list
else:
with self.project.database.data() as database_data:
self.scan_list = database_data.get_document_names(
COLLECTION_CURRENT
)
# Clear combo box
self.combo_box.clear()
if tag_name is None:
# Reset tag selection UI
self.iterated_tag_push_button.setText("Select")
self.iterated_tag_label.setText("Select a tag")
self.iteration_table.clear()
self.iteration_table.setColumnCount(len(self.push_buttons))
else:
# Update tag selection UI
self.iterated_tag_push_button.setText(tag_name)
self.iterated_tag_label.setText(f"{tag_name}:")
# Get tag values from current editor
# fmt: off
current_editor = (
self.main_window.pipeline_manager
.pipelineEditorTabs.get_current_editor()
)
# fmt: on
self.all_tag_values = list(current_editor.all_tag_values_list)
self.combo_box.addItems(current_editor.tag_values_list)
# Update table
self.update_table()
[docs]
def update_table(self):
"""
Update the iteration table with current data.
"""
with self.project.database.data() as database_data:
# Update scan list if empty
if not self.scan_list:
self.scan_list = database_data.get_document_names(
COLLECTION_CURRENT
)
# Clear and prepare table
self.iteration_table.clear()
self.iteration_table.setColumnCount(len(self.push_buttons))
# fmt: off
current_editor = (
self.main_window.pipeline_manager
.pipelineEditorTabs.get_current_editor()
)
# fmt: on
iterated_tag = current_editor.iterated_tag
if iterated_tag is None:
return
# Set up table headers
for idx, button in enumerate(self.push_buttons):
# FIXME should not use GUI text values !!
header_name = button.text().replace("&", "")
# Skip if tag doesn't exist in project
if header_name not in database_data.get_field_names(
COLLECTION_CURRENT
):
print(f"{header_name} not in the project's tags")
return
item = QTableWidgetItem(header_name)
self.iteration_table.setHorizontalHeaderItem(idx, item)
# Get current filter value
current_filter = self.combo_box.currentText().replace("&", "")
# Create filter query
filter_query = f'({{{iterated_tag}}} == "{current_filter}")'
# Get filtered scans
filtered_scans = database_data.filter_documents(
COLLECTION_CURRENT, filter_query
)
filtered_filenames = [
document[TAG_FILENAME] for document in filtered_scans
]
# Get intersection with selected scans
self.iteration_scans = list(
set(filtered_filenames).intersection(self.scan_list)
)
self.iteration_table.setRowCount(len(self.iteration_scans))
# Fill table cells
for row, scan_name in enumerate(self.iteration_scans):
for col, button in enumerate(self.push_buttons):
tag_name = button.text().replace("&", "")
value = database_data.get_value(
collection_name=COLLECTION_CURRENT,
primary_key=scan_name,
field=tag_name,
)
item = QTableWidgetItem(str(value))
self.iteration_table.setItem(row, col, item)
# Get all iterations scans
all_iterations_scans = []
for tag_value in current_editor.tag_values_list:
filter_query = f'({{{iterated_tag}}} == "{tag_value}")'
filtered_scans = database_data.filter_documents(
COLLECTION_CURRENT, filter_query
)
filtered_filenames = [
document[TAG_FILENAME] for document in filtered_scans
]
intersection = list(
set(filtered_filenames).intersection(self.scan_list)
)
all_iterations_scans.append(intersection)
self.all_iterations_scans = all_iterations_scans
# Emit signal to update pipeline manager
self.iteration_table_updated.emit(
self.iteration_scans, self.all_iterations_scans
)
[docs]
def update_selected_tag(self, selected_tag):
"""
Update the lists of values corresponding to the selected tag.
Retrieves all unique values of the selected tag from scans in the
current collection that also exist in the scan list. Then updates
the tag value lists in the current pipeline editor.
:param selected_tag (str): The tag whose values should be
retrieved and updated.
"""
with self.project.database.data() as database_data:
# Get intersection of available scans and loaded scan list
available_scans = database_data.get_document_names(
COLLECTION_CURRENT
)
if not self.scan_list:
self.scan_list = available_scans
scans_to_process = set(available_scans).intersection(
self.scan_list
)
# Collect unique tag values from the scans
tag_values = set()
for scan_name in scans_to_process:
tag_value = database_data.get_value(
collection_name=COLLECTION_CURRENT,
primary_key=scan_name,
field=selected_tag,
)
# Skip None values
if tag_value is not None:
tag_values.add(str(tag_value))
tag_values_list = sorted(list(tag_values))
# Get current editor and update its tag value lists
# fmt: off
current_editor = (
self.main_window.pipeline_manager
.pipelineEditorTabs.get_current_editor()
)
# fmt: on
current_editor.tag_values_list = tag_values_list
current_editor.all_tag_values_list = tag_values_list
# Update iterated tag
self.update_iterated_tag(selected_tag)