Source code for populse_mia.user_interface.data_browser.advanced_search

"""
Module to define the advanced search.

Contains:
    Class:
        - AdvancedSearch
"""

##########################################################################
# Populse_mia - Copyright (C) IRMaGe/CEA, 2018
# Distributed under the terms of the CeCILL license, as published by
# the CEA-CNRS-INRIA. Refer to the LICENSE file or to
# http://www.cecill.info/licences/Licence_CeCILL_V2.1-en.html
# for details.
##########################################################################

import logging
import os
from typing import get_origin

# PyQt5 imports
from PyQt5.QtCore import QObjectCleanupHandler
from PyQt5.QtGui import QPixmap
from PyQt5.QtWidgets import (
    QComboBox,
    QGridLayout,
    QHBoxLayout,
    QLineEdit,
    QMessageBox,
    QPushButton,
    QVBoxLayout,
    QWidget,
)

from populse_mia.data_manager import (
    ALL_TYPES,
    COLLECTION_CURRENT,
    FIELD_TYPE_BOOLEAN,
    FIELD_TYPE_STRING,
    TAG_FILENAME,
)

# Populse_MIA imports
from populse_mia.software_properties import Config
from populse_mia.user_interface.pop_ups import ClickableLabel

logger = logging.getLogger(__name__)


[docs] class AdvancedSearch(QWidget): """Class that manages the widget of the advanced search The advanced search creates a complex query to the database and is a combination of several "query lines" which are linked with AND or OR and all composed of: - A negation or not - A tag name or all visible tags - A condition (==, !=, >, <, >=, <=, CONTAINS, IN, BETWEEN) - A value .. Methods: - add_search_bar: create and define the advanced research bar - apply_filter: apply an opened filter to update the table. - clearLayout: called to clear a layout (not used currently in order to fix issue #72) - displayConditionRules: set the list of condition choices, depending on the tag type - displayValueRules: update the placeholder text when the condition choice is changed - get_filters: get the filters in list form - launch_search: start the search and update the table - prepare_filters: prepare the str representation of the filter - refresh_search: refresh the widget - remove_row: remove a row - rows_borders_added: add the links and the added row to the good rows - rows_borders_removed: link and adds row removed from every row - rowsContainsWidget: check if the widget is still used (not used currently) - show_search: reset the rows when the Advanced Search button is clicked """
[docs] def __init__( self, project, data_browser, scans_list=None, tags_list=None, from_pipeline=False, ): """Initialization of the AdvancedSearch class. :param project: current project in the software :param data_browser: parent data browser widget :param scans_list: current list of the documents :param tags_list: list of the visualized tags :param from_pipeline: True if the widget is called from the pipeline manager """ super().__init__() if scans_list is None: scans_list = [] if tags_list is None: tags_list = [] self.project = project self.dataBrowser = data_browser self.rows = [] self.scans_list = scans_list self.tags_list = tags_list self.from_pipeline = from_pipeline self.search = QPushButton("Search") self.search.setFixedWidth(100)
[docs] def apply_filter(self, filter): """ Applies a filter to update the table data by refining the displayed scans based on the filter criteria. The filter is used to query documents from the database, and the results are reflected in the data browser. This function handles the following tasks: - Retrieves filter parameters (e.g., conditions, values, links) from the provided filter object. - Updates the table rows based on the filter values, including conditions and fields. - Prepares and applies the filter query to the database to fetch relevant results. - If the filter is successfully applied, updates the table data with the filtered results. - If an error occurs during the filtering process, displays a warning message and reverts to the full set of scans. :param filter: A filter object that contains the criteria (e.g., conditions, values, and fields) to apply when querying the database. The filter is applied to update the visible scans in the table. """ # Data nots = filter.nots values = filter.values conditions = filter.conditions links = filter.links fields = filter.fields with self.project.database.data() as database_data: for i in range(0, len(nots)): if i >= len(self.rows): self.add_search_bar() row = self.rows[i] if i > 0: row[0].setCurrentText(links[i - 1]) row[1].setCurrentText(nots[i]) row[2].setCurrentText(fields[i][0]) # Replacing all visualized tags by the current list of # visible tags if fields[i][0] == "All visualized tags": fields[i] = database_data.get_shown_tags() row[3].setCurrentText(conditions[i]) row[4].setText(str(values[i])) old_rows = self.dataBrowser.table_data.scans_to_visualize # Filter applied only if at least one row if len(nots) > 0: # Result gotten try: filter_query = self.prepare_filters( links, fields, conditions, values, nots, self.scans_list, ) result = database_data.filter_documents( COLLECTION_CURRENT, filter_query ) # data_browser updated with the new selection result_names = [ document[TAG_FILENAME] for document in result ] except Exception: logger.warning( "Exception occurred in populse_mia.user_interface." "data_browser.advanced_search.AdvancedSearch." "apply_filter()", exc_info=True, ) # Error message if the search can't be done, # and visualization of all scans in the data_browser msg = QMessageBox() msg.setIcon(QMessageBox.Warning) msg.setText("Error in the search") msg.setInformativeText( "An issue occurred during the search. Please " "correct it and try again..." ) msg.setWindowTitle("Warning") msg.setStandardButtons(QMessageBox.Ok) msg.buttonClicked.connect(msg.close) msg.exec() result_names = self.scans_list # data_browser updated with the new selection self.dataBrowser.table_data.scans_to_visualize = result_names # Otherwise, all the scans are reput else: # data_browser updated with every scan if self.scans_list: self.dataBrowser.table_data.scans_to_visualize = ( self.scans_list ) else: self.dataBrowser.table_data.scans_to_visualize = ( database_data.get_document_names(COLLECTION_CURRENT) ) self.dataBrowser.table_data.update_visualized_rows(old_rows)
# def clearLayout(self, layout): # """ # Called to clear a layout # # :param layout: layout to clear # """ # if layout is not None: # while layout.count(): # item = layout.takeAt(0) # widget = item.widget() # # We clear the widget only if the row does not exist anymore # if widget is not None and not self.rowsContainsWidget( # widget): # pass # widget.deleteLater() # else: # self.clearLayout(item.layout())
[docs] def displayConditionRules(self, field, condition): """ Updates the available condition choices based on the field's tag type. This function adjusts the list of conditions (e.g., `<`, `>`, `BETWEEN`, `IN`) available in the `condition` widget, depending on the type of the tag selected in the `field` widget. Certain conditions are removed or added based on the tag's field type, and the condition choices are sorted afterward. The rules for updating the condition choices are as follows: - For tags with a field type of list, string, or boolean, or if the tag name is "All visualized tags", certain operators like `<`, `>`, `<=`, `>=`, and `BETWEEN` are removed. - If the tag's field type is compatible with numeric comparisons (i.e., not list, string, or boolean), operators like `<`, `>`, `<=`, `>=`, and `BETWEEN` are added. - If the tag is a list, the "IN" condition is removed. - Otherwise, the "IN" condition is added. :param field: The field widget representing the selected tag. Used to determine the tag type and adjust the condition choices accordingly. :param condition: The condition widget where the available conditions are updated based on the tag type. """ tag_name = field.currentText() with self.project.database.data() as database_data: tag_attrib = database_data.get_field_attributes( COLLECTION_CURRENT, tag_name ) no_operators_tags = [t for t in ALL_TYPES if get_origin(t) is list] no_operators_tags.append(FIELD_TYPE_STRING) no_operators_tags.append(FIELD_TYPE_BOOLEAN) no_operators_tags.append(None) if ( tag_attrib is not None and tag_attrib["field_type"] in no_operators_tags ) or tag_name == "All visualized tags": condition.removeItem(condition.findText("<")) condition.removeItem(condition.findText(">")) condition.removeItem(condition.findText("<=")) condition.removeItem(condition.findText(">=")) condition.removeItem(condition.findText("BETWEEN")) elif ( tag_attrib is None or tag_attrib["field_type"] not in no_operators_tags ): operators_to_reput = ["<", ">", "<=", ">=", "BETWEEN"] for operator in operators_to_reput: is_op_existing = condition.findText(operator) != -1 if not is_op_existing: condition.addItem(operator) if (tag_attrib is not None) and ( get_origin(tag_attrib["field_type"]) is list ): condition.removeItem(condition.findText("IN")) elif (tag_attrib is None) or ( not get_origin(tag_attrib["field_type"]) is list ): operators_to_reput = ["IN"] for operator in operators_to_reput: is_op_existing = condition.findText(operator) != -1 if not is_op_existing: condition.addItem(operator) condition.model().sort(0)
[docs] def displayValueRules(self, choice, value): """ Update the placeholder text and the enabled/disabled state of the value input based on the selected condition choice. This function adjusts the `value` widget's state (enabled/disabled) and its placeholder text depending on the condition selected in the `choice` widget. The rules are as follows: - "BETWEEN": Enables the `value` input and sets a placeholder text asking the user to separate the two inclusive borders with a semicolon and a space. - "IN": Enables the `value` input and sets a placeholder text asking the user to separate each list item with a semicolon and a space. - "HAS VALUE" or "HAS NO VALUE": Disables the `value` input and clears any placeholder text or value. - For all other conditions: Enables the `value` input and clears the placeholder text. :param choice: The choice widget, which determines the selected condition. :param value: The value widget, which represents the input field whose state will be updated. """ if choice.currentText() == "BETWEEN": value.setDisabled(False) value.setPlaceholderText( "Please separate the two inclusive borders of the range by a " "semicolon and a space" ) elif choice.currentText() == "IN": value.setDisabled(False) value.setPlaceholderText( "Please separate each list item by a semicolon and a space" ) elif ( choice.currentText() == "HAS VALUE" or choice.currentText() == "HAS NO VALUE" ): value.setDisabled(True) value.setPlaceholderText("") value.setText("") else: value.setDisabled(False) value.setPlaceholderText("")
[docs] def get_filters(self, replace_all_by_fields): """Get the filters in a list. :param replace_all_by_fields: to replace All visualized tags by the list of visible fields :return: Lists of filters (fields, conditions, values, links, nots) """ # Lists to get all the data of the search fields = [] conditions = [] values = [] links = [] nots = [] with self.project.database.data() as database_data: for row in self.rows: for widget in row: if widget is not None: child = widget child_name = child.objectName() if child_name == "link": links.append(child.currentText()) elif child_name == "condition": conditions.append(child.currentText()) elif child_name == "field": if child.currentText() != "All visualized tags": fields.append([child.currentText()]) else: if replace_all_by_fields: fields.append( database_data.get_shown_tags() ) else: fields.append([child.currentText()]) elif child_name == "value": values.append(child.displayText()) elif child_name == "not": nots.append(child.currentText()) operators = ["<", ">", "<=", ">=", "BETWEEN"] no_operators_tags = [t for t in ALL_TYPES if get_origin(t) is list] no_operators_tags.append(FIELD_TYPE_STRING) no_operators_tags.append(FIELD_TYPE_BOOLEAN) # Converting BETWEEN and IN values into lists for i in range(0, len(conditions)): if conditions[i] == "BETWEEN" or conditions[i] == "IN": values[i] = values[i].split("; ") if conditions[i] == "IN": for tag in fields[i].copy(): tag_attrib = database_data.get_field_attributes( COLLECTION_CURRENT, tag ) if get_origin(tag_attrib["field_type"]) is list: fields[i].remove(tag) elif conditions[i] in operators: for tag in fields[i].copy(): tag_attrib = database_data.get_field_attributes( COLLECTION_CURRENT, tag ) if tag_attrib["field_type"] in no_operators_tags: fields[i].remove(tag) return fields, conditions, values, links, nots
[docs] @staticmethod def prepare_filters(links, fields, conditions, values, nots, scans): """Prepare the str representation of the filter. :param links: list of links (AND/OR) :param fields: list of fields :param conditions: list of conditions (==, !=, <, >, <=, >=, IN, BETWEEN, CONTAINS, HAS VALUE, HAS NO VALUE) :param values: list of values :param nots: list of negations ("" or NOT) :param scans: list of scans to search in :return: str representation of the filter """ row_queries = [] for row_fields, row_condition, row_value, row_not in zip( fields, conditions, values, nots ): row_query_parts = [] for row_field in row_fields: if row_condition == "IN": escaped_row_value = str(row_value).replace("'", '"') row_field_query = ( f"({{{row_field}}} IN {escaped_row_value})" ) elif row_condition == "BETWEEN": row_field_query = ( f'(({{{row_field}}} >= "{row_value[0]}") AND ' f'({{{row_field}}} <= "{row_value[1]}"))' ) elif row_condition == "HAS VALUE": row_field_query = f"({{{row_field}}} != null)" elif row_condition == "HAS NO VALUE": row_field_query = f"({{{row_field}}} == null)" elif row_condition == "CONTAINS": row_field_query = f'({{{row_field}}} LIKE "%{row_value}%")' else: row_field_query = ( f'({{{row_field}}} {row_condition} "{row_value}")' ) row_query_parts.append(row_field_query) # Combine all parts with "OR" row_query = " OR ".join(row_query_parts) # Apply negation if necessary if row_not == "NOT": row_query = f"(NOT {row_query})" row_queries.append(f"({row_query})") # Combine all row queries with specified links final_query = row_queries[0] for link, next_query in zip(links, row_queries[1:]): final_query = f"{final_query} {link} {next_query}" # Add the scans condition formatted_scans = str(scans).replace("'", '"') final_query = ( f"{final_query} AND " f"({{{TAG_FILENAME}}} IN {formatted_scans})" ) # Enclose the entire query in parentheses return f"({final_query})"
[docs] def remove_row(self, row_layout): """Remove a row. :param row_layout: Row to remove """ # We remove the row only if there is at least 2 rows, because we # always must keep at least one if len(self.rows) > 1: index = self.rows.index(row_layout) for i in range(0, len(self.rows[-1])): if self.rows[index][i] is not None: self.rows[index][i].setParent(None) self.rows[index][i].deleteLater() self.rows[index][i] = None del self.rows[index] # We refresh the view self.refresh_search()
[docs] def rows_borders_added(self, links): """Add the links and the added row to the good rows. :param links: Old links to reput """ # Plus added to the last row sources_images_dir = Config().getSourceImageDir() add_search_bar_label = ClickableLabel() add_search_bar_label.setObjectName("plus") add_search_bar_picture = QPixmap( os.path.relpath(os.path.join(sources_images_dir, "green_plus.png")) ) add_search_bar_picture = add_search_bar_picture.scaledToHeight(20) add_search_bar_label.setPixmap(add_search_bar_picture) add_search_bar_label.clicked.connect(self.add_search_bar) self.rows[len(self.rows) - 1][6] = add_search_bar_label # Link added to every row, except the first one for i in range(1, len(self.rows)): row = self.rows[i] link_choice = QComboBox() link_choice.setObjectName("link") link_choice.addItem("AND") link_choice.addItem("OR") if len(links) >= i: link_choice.setCurrentText(links[i - 1]) row[0] = link_choice
[docs] def rows_borders_removed(self): """Link and add row removed from every row.""" # We remove all the links and the add rows for i in range(0, len(self.rows)): # Plus removed from every row if self.rows[i][6] is not None: self.rows[i][6].setParent(None) self.rows[i][6].deleteLater() self.rows[i][6] = None # Link removed from every row if self.rows[i][0] is not None: self.rows[i][0].setParent(None) self.rows[i][0].deleteLater() self.rows[i][0] = None
# def rowsContainsWidget(self, widget): # """Check if the widget is still used # # :param widget: widget to check # :return: True or False # """ # for row in self.rows: # if widget in row: # return True # return False