import logging
import os
import sys
from collections import OrderedDict
from datetime import datetime
import pandas as pd
from data_agent.exceptions import GroupAlreadyExists, TargetConnectionError
from PySide6 import QtCore, QtGui, QtWidgets
from PySide6.QtUiTools import QUiLoader
from PySide6.QtWidgets import ( # QToolTip,
QDialog,
QDialogButtonBox,
QFileDialog,
QMessageBox,
QPushButton,
QTableWidgetItem,
QTreeWidgetItem,
)
from qt_data_extractor import __version__
from qt_data_extractor.design.create_connection import CreateConnectionDialog
from qt_data_extractor.design.pandas_model import DataTableDialog
from qt_data_extractor.worker_thread import Worker
log = logging.getLogger(__name__)
WINDOW_DEFAULT_TITLE = "Imubit Data Extractor"
SHORT_VERSION = f'{__version__.split(".")[0]}.{__version__.split(".")[1]}'
MAX_TAGS_TO_LOAD = 100
MAX_PREVIEW_SAMPLES = 500
ENABLE_EDITING_CONFIG_BEFORE_EXTRACTION = False
TAGS_FILTER_DEFAULT_PLACEHOLDER = "Search tags by filter..."
WILDCARD_CHARACTERS = ["*", "%", "?"]
bundle_dir = getattr(sys, "_MEIPASS", os.path.abspath(os.path.dirname(__file__)))
[docs]
class NoDelayHintProxyStyle(QtWidgets.QProxyStyle):
def __init__(self):
QtWidgets.QProxyStyle.__init__(self)
[docs]
def styleHint(self, hint, option=..., widget=..., returnData=...):
if hint == QtWidgets.QStyle.SH_ToolTip_WakeUpDelay:
return 0
return QtWidgets.QProxyStyle.styleHint(self, hint, option, widget, returnData)
[docs]
class MainWindow(QtCore.QObject):
def __init__(self, api):
loader = QUiLoader()
self._api = api
self._existing_connections = []
self._registered_connectors = self._api.list_supported_connectors()
self._registered_connectors = {
k: self._registered_connectors[k]
for k in self._registered_connectors
if self._registered_connectors[k]["category"] == "historian"
}
self._w = loader.load(
os.path.abspath(os.path.join(bundle_dir, "design/main-window.ui")), None
)
self._dialogCopyPrompt = loader.load(
os.path.abspath(os.path.join(bundle_dir, "design/copy-prompt.ui")),
parentWidget=self._w,
)
self._dialogCopyPrompt.setWindowTitle(WINDOW_DEFAULT_TITLE)
self._dialogCopyProgress = loader.load(
os.path.abspath(os.path.join(bundle_dir, "design/copy-progress.ui")),
parentWidget=self._w,
)
self._dialogCopyProgress.setWindowTitle(WINDOW_DEFAULT_TITLE)
self._dialogCreateConnection = CreateConnectionDialog(
connectors=self._registered_connectors
)
self._dialogCreateConnection.setWindowTitle(WINDOW_DEFAULT_TITLE)
self._dialogManageConnections = loader.load(
os.path.abspath(os.path.join(bundle_dir, "design/manage-connections.ui")),
parentWidget=self._w,
)
self._dialogManageConnections.setWindowTitle(WINDOW_DEFAULT_TITLE)
self._w.comboSampleRate.setToolTip(
"""
<h3>READ BEFORE CHANGING...</h3><br />
<b><i>Changing this setting may have undesirable effects:</i></b>
<ul>
<li> When used with Osisoft PI plugin and the data has a lower frequency than selected in this option,
the data will be "filled forward"", effectively creating a fake data that does not necessarily corresponds
to the original signal.</li>
</ul>
"""
)
self._w.comboSampleRate.setStyle(NoDelayHintProxyStyle())
self.threadpool = QtCore.QThreadPool()
def _show_msg_box(self, msg, icon=QMessageBox.Icon.Information):
mb = QMessageBox(self._w)
mb.setIcon(QMessageBox.Icon.Information)
mb.setWindowTitle(self._w.windowTitle())
mb.setText(msg)
return mb.exec_()
@staticmethod
def _connection_title(conn_name, conn_type):
return f"{conn_name} ({conn_type})"
def _get_selected_tags(self):
return [
self._w.treeSelectedTags.topLevelItem(i).data(0, QtCore.Qt.UserRole)
for i in range(0, self._w.treeSelectedTags.topLevelItemCount())
]
@property
def _current_connection(self):
return self._w.comboLeftConnection.currentData()
# SLOTS
[docs]
@QtCore.Slot(str)
def on_connection_change(self):
current_conn = self._current_connection
if not current_conn:
self._refresh_current_connection_view(current_conn=None)
return
if current_conn == "add_new":
self._w.comboLeftConnection.setCurrentIndex(-1)
self.on_create_new_connection()
return
if not current_conn["enabled"]:
self._refresh_current_connection_view(current_conn)
self._show_msg_box(
f"Connection '{current_conn['name']}' disabled!",
icon=QMessageBox.Icon.Warning,
)
return
is_connected = self._api.is_connected(current_conn["name"])
if is_connected:
conn_info = self._api.connection_info(current_conn["name"])
else:
try:
self._api.enable_connection(current_conn["name"])
except TargetConnectionError as e:
self._refresh_current_connection_view(current_conn)
self._show_msg_box(
f"Error connecting '{current_conn['name']}': {str(e)}",
icon=QMessageBox.Icon.Error,
)
return
# conn_info = self._api.connection_info(current_conn['name'])
self._refresh_current_connection_view(current_conn, conn_info)
[docs]
@QtCore.Slot()
def on_create_new_connection(self):
if self._dialogCreateConnection.exec_() != QDialog.Accepted:
return
args = self._dialogCreateConnection.values
args["enabled"] = True
args["ignore_existing"] = False
try:
self._api.create_connection(**args)
self._refresh_connections()
self._w.comboLeftConnection.setCurrentText(
self._connection_title(args["conn_name"], args["conn_type"])
)
except Exception as e:
QMessageBox.critical(self._w, self._w.windowTitle(), str(e))
[docs]
@QtCore.Slot()
def on_manage_connections(self):
delete_button = QPushButton(self.tr("&Delete"))
# self._dialogManageConnections.tableConnections.clear()
self._dialogManageConnections.tableConnections.setRowCount(
len(self._existing_connections)
)
for i, conn in enumerate(self._existing_connections):
self._dialogManageConnections.tableConnections.setItem(
i, 0, QTableWidgetItem(conn["name"])
)
item_type = QTableWidgetItem(conn["type"])
item_type.setFlags(item_type.flags() & ~QtCore.Qt.ItemIsEditable)
self._dialogManageConnections.tableConnections.setItem(i, 1, item_type)
item_enabled = QTableWidgetItem(conn["enabled"])
item_enabled.setFlags(item_enabled.flags() & ~QtCore.Qt.ItemIsEditable)
item_enabled.setCheckState(
QtCore.Qt.Checked if conn["enabled"] else QtCore.Qt.Unchecked
)
self._dialogManageConnections.tableConnections.setItem(i, 2, item_enabled)
if len(self._existing_connections) > 0:
@QtCore.Slot()
def on_delete_connection():
item = self._dialogManageConnections.tableConnections.item(
self._dialogManageConnections.tableConnections.currentRow(), 0
)
if not item:
return
conn_name = item.text()
if (
QMessageBox.question(
self._w,
self._w.windowTitle(),
f'Delete "{conn_name}" connection? (this operation cannot be undone)',
QMessageBox.Yes | QMessageBox.No,
)
== QMessageBox.StandardButton.Yes
):
self._api.delete_connection(conn_name)
self._dialogManageConnections.tableConnections.removeRow(
self._dialogManageConnections.tableConnections.currentRow()
)
self._refresh_connections()
delete_button.clicked.connect(on_delete_connection)
self._dialogManageConnections.buttonBox.addButton(
delete_button, QDialogButtonBox.ResetRole
)
if self._dialogManageConnections.exec_() == QDialog.Accepted:
pass
if len(self._existing_connections) > 0:
self._dialogManageConnections.buttonBox.removeButton(delete_button)
[docs]
@QtCore.Slot()
def on_view_tags(self, left=True):
source_conn = self._w.comboLeftConnection.currentData()
items = (
self._w.treeLeftTagHierarchy.selectedItems()
if left
else self._w.treeSelectedTags.selectedItems()
)
# for i in items:
# print(i.data(0, QtCore.Qt.UserRole), ':::', i.parent().data(0, QtCore.Qt.UserRole))
source_tags = {
i.data(0, QtCore.Qt.UserRole): i.data(1, QtCore.Qt.UserRole) for i in items
}
if not source_tags:
self._show_msg_box("No tags selected!")
return
try:
df = self._api.read_tag_values_period(
conn_name=source_conn["name"],
tags=list(source_tags.keys()),
first_timestamp=self._w.dateTimeLeftFrom.dateTime().toPython(),
last_timestamp=self._w.dateTimeLeftTo.dateTime().toPython(),
time_frequency=self._w.comboSampleRate.currentText(),
max_results=MAX_PREVIEW_SAMPLES,
)
if len(df) == 0:
self._show_msg_box("No data available in the selected period!")
return
dlg = DataTableDialog(df, parent=self._w)
dlg.show()
except Exception as e:
QMessageBox.critical(self._w, self._w.windowTitle(), str(e))
[docs]
@QtCore.Slot()
def on_selected_tags_change(self):
selected_items = len(self._w.treeSelectedTags.selectedItems())
total_items = self._w.treeSelectedTags.topLevelItemCount()
self._w.labelRightPanelStatus.setText(
f"{selected_items} / {total_items} tags"
if selected_items > 0
else f"{total_items} tags"
)
[docs]
@QtCore.Slot()
def on_add_selected_tags(self):
source_items = self._w.treeLeftTagHierarchy.selectedItems()
source_tags = {
i.data(0, QtCore.Qt.UserRole): i.data(1, QtCore.Qt.UserRole)
for i in source_items
}
if not source_tags:
return
for i, tag_name in enumerate(source_tags):
if tag_name in self._get_selected_tags():
continue
row = [source_tags[tag_name]["Name"]]
item = QTreeWidgetItem(row)
item.setData(0, QtCore.Qt.UserRole, (tag_name))
item.setData(1, QtCore.Qt.UserRole, (source_tags[tag_name]))
self._w.treeSelectedTags.addTopLevelItem(item)
self._mark_selected_tags()
self.on_selected_tags_change()
[docs]
@QtCore.Slot()
def on_remove_selected_tags(self, all):
if all:
self._w.treeSelectedTags.clear()
self._mark_selected_tags()
return
root = self._w.treeSelectedTags.invisibleRootItem()
for item in self._w.treeSelectedTags.selectedItems():
(item.parent() or root).removeChild(item)
self._mark_selected_tags()
[docs]
@QtCore.Slot()
def on_copy_tags(self):
source_conn = self._w.comboLeftConnection.currentData()
source_tags = self._get_selected_tags()
if not source_tags:
self._show_msg_box("No tags selected!")
return
if not self._w.comboArchiveDirectory.currentText():
self._show_msg_box("Archive directory not selected!")
return
now = datetime.now()
dest_conn_name = self._dialogCopyPrompt.comboCopyTarget.currentText()
filename = (
f'extractor-output-v{SHORT_VERSION}-{now.strftime("%Y-%m-%dT%H-%M-%S")}'
)
file_path = os.path.join(
self._w.comboArchiveDirectory.currentText(),
filename,
)
self._dialogCopyPrompt.labelCopyDescription.setText(
f"Extract {len(source_tags)} tags to"
)
self._dialogCopyPrompt.comboCopyTarget.addItem(
self._w.comboArchiveDirectory.currentText()
)
self._dialogCopyPrompt.dateTimeFrom.setDateTime(
self._w.dateTimeLeftFrom.dateTime()
)
self._dialogCopyPrompt.dateTimeTo.setDateTime(self._w.dateTimeLeftTo.dateTime())
self._dialogCopyPrompt.comboSampleRate.setCurrentIndex(
self._w.comboSampleRate.currentIndex()
)
self._dialogCopyPrompt.checkboxAttributesOnly.stateChanged.connect(
lambda state: self._dialogCopyPrompt.groupboxDataSettings.setEnabled(
state == 0
)
)
self._dialogCopyPrompt.checkboxAttributesOnly.setCheckState(
self._w.checkboxExtractAttributesOnly.checkState()
)
if not ENABLE_EDITING_CONFIG_BEFORE_EXTRACTION:
# Put everything in readonly mode
self._dialogCopyPrompt.comboCopyTarget.setEnabled(False)
self._dialogCopyPrompt.groupboxDataSettings.setEnabled(False)
self._dialogCopyPrompt.checkboxAttributesOnly.setEnabled(False)
do_copy_prompt = self._dialogCopyPrompt.exec_()
if do_copy_prompt != 1:
return
try:
dest_group = ""
copy_from_timestamp = (
self._dialogCopyPrompt.dateTimeFrom.dateTime().toPython()
)
copy_to_timestamp = self._dialogCopyPrompt.dateTimeTo.dateTime().toPython()
attributes_only = self._dialogCopyPrompt.checkboxAttributesOnly.isChecked()
self._dialogCopyProgress.buttonBox.button(QDialogButtonBox.Cancel).setText(
"Cancel"
)
self._dialogCopyProgress.buttonBox.button(
QDialogButtonBox.Cancel
).setEnabled(False)
self._dialogCopyProgress.textExtractionLog.clear()
self._dialogCopyProgress.labelCopy.setText("Extraction in progress...")
self._dialogCopyProgress.labelFrom.setText(f'From: [{source_conn["name"]}]')
self._dialogCopyProgress.labelTo.setText(f"To: [{filename}.zip]")
self._dialogCopyProgress.labelTotalCopied.setText(f"0 / {len(source_tags)}")
self._dialogCopyProgress.progressBar.setRange(0, len(source_tags))
self._dialogCopyProgress.progressBar.setValue(0)
self._dialogCopyProgress.show()
# Functions executed from worker thread
def update_progress(tag, counter):
log.info(f"Extracting tag ({counter}) - {tag}")
self._dialogCopyProgress.textExtractionLog.append(
f"Extracting tag {counter}: {tag}"
)
self._dialogCopyProgress.progressBar.setValue(counter - 1)
self._dialogCopyProgress.labelFrom.setText(
f'From: [{source_conn["name"]}] {tag} ...'
)
self._dialogCopyProgress.labelTotalCopied.setText(
f"{counter} / {len(source_tags)}"
)
def copy_process_run(progress_callback):
self._dialogCopyProgress.textExtractionLog.append(
f'Initialiizing data extraction from [{source_conn["name"]}]...'
)
self._dialogCopyProgress.textExtractionLog.append(
f"Opening {file_path}.zip archive..."
)
dest_conn = self._api.create_connection(
conn_name=dest_conn_name,
conn_type="zip",
enabled=True,
ignore_existing=True,
zipfile_path=f"{file_path}.zip",
)
self._api.copy_attributes(
src_conn=source_conn["name"],
tags=source_tags,
dest_conn=dest_conn["name"],
dest_group=dest_group,
)
if not attributes_only:
try:
self._api.copy_period(
src_conn=source_conn["name"],
tags=source_tags,
dest_conn=dest_conn["name"],
dest_group=dest_group,
first_timestamp=copy_from_timestamp,
last_timestamp=copy_to_timestamp,
time_frequency=self._dialogCopyPrompt.comboSampleRate.currentText(),
on_conflict="ask",
# progress_callback=lambda tag, counter: update_progress(tag, counter))
progress_callback=lambda tag, counter: progress_callback.emit(
tag, counter
),
)
except GroupAlreadyExists as e:
if (
QMessageBox.question(
self._w,
self._w.windowTitle(),
f"{e} \n Would you like to proceed and append to existing data?",
QMessageBox.Yes | QMessageBox.No,
)
== QMessageBox.StandardButton.Yes
):
self._api.copy_period(
src_conn=source_conn["name"],
tags=source_tags,
dest_conn=dest_conn["name"],
dest_group=dest_group,
first_timestamp=copy_from_timestamp,
last_timestamp=copy_to_timestamp,
time_frequency=self._dialogCopyPrompt.comboSampleRate.currentText(),
on_conflict="append",
# progress_callback=lambda tag, counter: update_progress(tag, counter))
progress_callback=lambda tag, counter: progress_callback.emit(
tag, counter
),
)
def complete_success(result):
self._dialogCopyProgress.progressBar.setValue(len(source_tags))
self._dialogCopyProgress.labelCopy.setText("Extraction Completed!")
self._dialogCopyProgress.textExtractionLog.append(
"Extraction finished successfuly!"
)
self.on_remove_selected_tags(all=True)
def complete_error(result):
self._dialogCopyProgress.labelCopy.setText("Extraction Failed!")
self._dialogCopyProgress.textExtractionLog.append(
"Extraction Failed!\n\r"
)
self._dialogCopyProgress.textExtractionLog.append(str(result[0]))
self._dialogCopyProgress.textExtractionLog.append(str(result[1]))
self._dialogCopyProgress.textExtractionLog.append(str(result[2]))
def worker_complete():
self._api.delete_connection(dest_conn_name)
with open(f"{file_path}.log", "w") as writer:
writer.write(
self._dialogCopyProgress.textExtractionLog.toPlainText()
)
self._dialogCopyProgress.labelFrom.setText("")
self._dialogCopyProgress.labelTo.setText("")
self._dialogCopyProgress.buttonBox.button(
QDialogButtonBox.Cancel
).setText("Close")
self._dialogCopyProgress.buttonBox.button(
QDialogButtonBox.Cancel
).setEnabled(True)
worker = Worker(copy_process_run)
worker.signals.progress.connect(update_progress)
worker.signals.result.connect(complete_success)
worker.signals.error.connect(complete_error)
worker.signals.finished.connect(worker_complete)
self.threadpool.start(worker)
except Exception as e:
QMessageBox.critical(self._w, self._w.windowTitle(), str(e))
def _mark_selected_tags(self):
selected_tags = self._get_selected_tags()
font_deselected = QtGui.QFont()
font_deselected.setBold(False)
font_selected = QtGui.QFont()
font_selected.setBold(True)
for i in range(0, self._w.treeLeftTagHierarchy.topLevelItemCount()):
item = self._w.treeLeftTagHierarchy.topLevelItem(i)
tag_name = item.data(0, QtCore.Qt.UserRole)
for i in range(item.columnCount()):
item.setFont(
i, font_selected if tag_name in selected_tags else font_deselected
)
# Dynamic tree expansion
[docs]
@QtCore.Slot(QTreeWidgetItem)
def on_tree_expanded(self, clicked_item):
conn_name = self._current_connection["name"]
display_attributes = OrderedDict(self._current_connection["default_attributes"])
tag = clicked_item.data(1, QtCore.Qt.UserRole)
if tag["HasChildren"]:
# Reload children
for i in reversed(range(clicked_item.childCount())):
clicked_item.removeChild(clicked_item.child(i))
children = self._api.list_tags(
conn_name,
filter=tag["Name"],
include_attributes=True,
max_results=MAX_TAGS_TO_LOAD,
)
for i, child_name in enumerate(children):
row = [
str(children[child_name][key])
if key in children[child_name]
else ""
for j, key in enumerate(display_attributes.keys())
]
child_item = QTreeWidgetItem(row)
child_item.setData(0, QtCore.Qt.UserRole, (child_name))
child_item.setData(1, QtCore.Qt.UserRole, (children[child_name]))
clicked_item.addChild(child_item)
[docs]
@QtCore.Slot()
def on_tree_selection_changed(self):
total_items = self._w.treeLeftTagHierarchy.topLevelItemCount()
selected_items = len(self._w.treeLeftTagHierarchy.selectedItems())
too_many_tags_msg = (
" (Too Many Tags to Display - Narrow Your Search)"
if total_items >= MAX_TAGS_TO_LOAD
else ""
)
self._w.labelLeftPanelStatus.setText(
f"{selected_items} / {total_items} tags {too_many_tags_msg}"
if selected_items > 0
else f"{total_items} tags {too_many_tags_msg}"
)
[docs]
@QtCore.Slot()
def on_refresh_tags_tree(self, filter, max_results=MAX_TAGS_TO_LOAD):
try:
conn_name = self._current_connection["name"]
display_attributes = OrderedDict(
self._current_connection["default_attributes"]
)
# Prepare headers
self._w.treeLeftTagHierarchy.clear()
self._w.treeLeftTagHierarchy.setColumnCount(len(display_attributes))
self._w.treeLeftTagHierarchy.setHeaderLabels(
[a["Name"] for a in display_attributes.values()]
)
# Update items
tags = self._api.list_tags(
conn_name,
filter=filter,
include_attributes=list(display_attributes.keys()),
max_results=max_results,
)
# Update top level rows
for i, tag_name in enumerate(tags):
row = [
str(tags[tag_name][key]) if key in tags[tag_name] else ""
for j, key in enumerate(display_attributes.keys())
]
item = QTreeWidgetItem(row)
item.setData(0, QtCore.Qt.UserRole, (tag_name))
item.setData(1, QtCore.Qt.UserRole, (tags[tag_name]))
if tags[tag_name]["HasChildren"]:
item.setChildIndicatorPolicy(QTreeWidgetItem.ShowIndicator)
self._w.treeLeftTagHierarchy.addTopLevelItem(item)
self._mark_selected_tags()
# If filter is a list of tags - we need to show which tags were not found
if isinstance(filter, list):
lower_exist = [s.lower() for s in tags.keys()]
missing_tags = [
s
for s in filter
if not any(c in s for c in WILDCARD_CHARACTERS)
and s.lower() not in lower_exist
]
if missing_tags:
self._show_msg_box(
f"Cannot find {len(missing_tags)} tag(s): {', '.join(missing_tags)}"[
:2000
],
icon=QMessageBox.Icon.Warning,
)
self.on_tree_selection_changed()
except Exception as e:
mb = QMessageBox(self._w)
# mb.setIcon(QMessageBox.Icon.Error)
mb.setWindowTitle(self._w.windowTitle())
mb.setText(f"Error retrieving tags: {str(e)}")
mb.exec_()
[docs]
@QtCore.Slot(str)
def on_tags_file_select(self):
filename, filter = QFileDialog.getOpenFileName(
parent=self._w,
caption="Select Tags File",
dir=".",
# filter='*.xls, *.xlsx, *.xlsm, *.xlsb, *.odf, *.ods, *.odt')
filter="*.xlsx",
)
if not filename:
return
try:
df = pd.read_excel(filename, header=None)
df = df.dropna()
tags_to_find = df.iloc[:, 0].tolist()
self.on_refresh_tags_tree(filter=tags_to_find, max_results=0)
self._w.comboLeftTagFilter.clear()
except Exception as e:
mb = QMessageBox(self._w)
# mb.setIcon(QMessageBox.Icon.Error)
mb.setWindowTitle(self._w.windowTitle())
mb.setText(f"Error reading excel file: {str(e)}")
mb.exec_()
def _refresh_connections(self):
""" """
self._w.comboLeftConnection.clear()
self._existing_connections = self._api.list_connections()
for conn in [
conn
for conn in self._existing_connections
if conn["category"] == "historian"
]:
self._w.comboLeftConnection.addItem(
self._connection_title(conn["name"], conn["type"]), conn
)
self._w.comboLeftConnection.addItem("-- Add New Connection... --", "add_new")
# Deselect last item
if self._w.comboLeftConnection.count() == 1:
self._w.comboLeftConnection.setCurrentIndex(-1)
def _enable_current_connection(self):
self._w.buttonLeftConnect.hide()
current_conn = self._w.comboLeftConnection.currentData()
try:
print(f"Enabling connection '{current_conn['name']}'...")
self._api.enable_connection(current_conn["name"])
conn_info = self._api.connection_info(current_conn["name"])
current_conn["enabled"] = True
self._w.comboLeftConnection.setItemData(
self._w.comboLeftConnection.currentIndex(), current_conn
)
self._refresh_current_connection_view(current_conn, conn_info)
except Exception as e:
self._refresh_current_connection_view(current_conn)
mb = QMessageBox(self._w)
mb.setIcon(QMessageBox.Icon.Information)
mb.setWindowTitle(self._w.windowTitle())
mb.setText(f"Error connecting to '{current_conn['name']}' - {str(e)}")
mb.exec_()
[docs]
class FilterWidgetEventInspector(QtCore.QObject):
[docs]
def eventFilter(self, obj, event):
if event.type() == QtCore.QEvent.FocusOut:
if obj.currentText().strip() == "":
obj.setCurrentText(TAGS_FILTER_DEFAULT_PLACEHOLDER)
elif event.type() == QtCore.QEvent.FocusIn:
if obj.currentText().strip() == TAGS_FILTER_DEFAULT_PLACEHOLDER:
obj.setCurrentText("")
return QtCore.QObject.eventFilter(self, obj, event)
_filterWidgetEventInspector = FilterWidgetEventInspector()
def _refresh_current_connection_view(self, current_conn, conn_info=None):
self._w.buttonLeftConnect.hide()
self._w.labelLeftConnectionDetails.setText("")
self._w.treeLeftTagHierarchy.clear()
self._w.labelLeftPanelStatus.clear()
if not current_conn:
return
if not current_conn["enabled"]:
self._w.labelLeftConnectionDetails.setText("Connection disabled.")
self._w.buttonLeftConnect.show()
return
if conn_info is None:
return
self._w.labelLeftConnectionDetails.setText(conn_info["OneLiner"])
# - Filters configuration -
self._w.comboLeftTagFilter.setCurrentText(TAGS_FILTER_DEFAULT_PLACEHOLDER)
if "name" in current_conn["supported_filters"]:
self._w.comboLeftTagFilter.show()
else:
self._w.comboLeftTagFilter.hide()
if "tags_file" in current_conn["supported_filters"]:
self._w.buttonLeftTagsFileSelect.show()
else:
self._w.buttonLeftTagsFileSelect.hide()
# Selection dates
if "time" in current_conn["supported_filters"]:
# now = QtCore.QDateTime.currentDateTime()
end_time = QtCore.QDateTime.currentDateTimeUtc()
start_time = end_time.addDays(-365)
self._w.dateTimeLeftFrom.setDateTime(start_time)
self._w.dateTimeLeftTo.setDateTime(end_time)
self._w.widgetLeftTimeFilter.show()
else:
self._w.widgetLeftTimeFilter.hide()
# Tag tree
# self.on_refresh_tags_tree(filter=filter)
[docs]
def setup(self):
self._w.setWindowTitle(f"{WINDOW_DEFAULT_TITLE} - v{__version__}")
# Connection list
self._refresh_connections()
self._w.comboLeftConnection.currentTextChanged.connect(
self.on_connection_change
)
self._w.buttonLeftConnect.clicked.connect(self._enable_current_connection)
self.on_connection_change()
# Menu Bar
self._w.actionAddNewConnection.triggered.connect(self.on_create_new_connection)
self._w.actionManageConnections.triggered.connect(self.on_manage_connections)
# Display
self._w.comboLeftTagFilter.textActivated.connect(self.on_refresh_tags_tree)
self._w.comboLeftTagFilter.installEventFilter(self._filterWidgetEventInspector)
self._w.treeLeftTagHierarchy.itemExpanded.connect(self.on_tree_expanded)
self._w.treeLeftTagHierarchy.itemSelectionChanged.connect(
self.on_tree_selection_changed
)
self._w.treeSelectedTags.itemSelectionChanged.connect(
self.on_selected_tags_change
)
# Controls
self._w.buttonLeftView.clicked.connect(lambda: self.on_view_tags(left=True))
# shortcut_view = QtGui.QShortcut(QtGui.QKeySequence("F3"), self._w)
# shortcut_view.activated.connect(self.on_view_tags)
self._w.buttonRightView.clicked.connect(lambda: self.on_view_tags(left=False))
self._w.buttonLeftTagsFileSelect.clicked.connect(self.on_tags_file_select)
self._w.buttonAddSelectedTags.clicked.connect(self.on_add_selected_tags)
self._w.buttonRemoveAllSelected.clicked.connect(
lambda: self.on_remove_selected_tags(all=True)
)
self._w.buttonRemoveSelected.clicked.connect(
lambda: self.on_remove_selected_tags(all=False)
)
self._w.buttonCopy.clicked.connect(self.on_copy_tags)
shortcut_copy = QtGui.QShortcut(QtGui.QKeySequence("F5"), self._w)
shortcut_copy.activated.connect(self.on_copy_tags)
def on_directory_select():
selected_dir = QFileDialog.getExistingDirectory(
self._w, "Select Archive Folder"
)
if not selected_dir:
return
ind = self._w.comboArchiveDirectory.findText(selected_dir)
if ind == -1:
self._w.comboArchiveDirectory.insertItem(0, selected_dir)
ind = 0
self._w.comboArchiveDirectory.setCurrentIndex(ind)
for conn in [
conn
for conn in self._existing_connections
if conn["category"] != "historian"
]:
self._w.comboArchiveDirectory.addItem(conn["name"])
self._w.buttonSelectArchiveFile.clicked.connect(on_directory_select)
# Refresh
# shortcutRefresh = QtGui.QShortcut(QtGui.QKeySequence('Ctrl+r'), self._w)
# shortcutRefresh.activated.connect(QtWidgets.QApplication.instance().quit)
# Exit
self._w.buttonExit.clicked.connect(QtWidgets.QApplication.instance().quit)
shortcut_quit = QtGui.QShortcut(QtGui.QKeySequence("Alt+F4"), self._w)
shortcut_quit.activated.connect(QtWidgets.QApplication.instance().quit)
[docs]
def show(self):
self._w.show()