summaryrefslogtreecommitdiff
path: root/src/streamml/front/detector_profiles_tab_pushscreens.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/streamml/front/detector_profiles_tab_pushscreens.py')
-rw-r--r--src/streamml/front/detector_profiles_tab_pushscreens.py187
1 files changed, 187 insertions, 0 deletions
diff --git a/src/streamml/front/detector_profiles_tab_pushscreens.py b/src/streamml/front/detector_profiles_tab_pushscreens.py
new file mode 100644
index 0000000..fc5b731
--- /dev/null
+++ b/src/streamml/front/detector_profiles_tab_pushscreens.py
@@ -0,0 +1,187 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.screen import ModalScreen
+from textual.widgets import Button, Label, Pretty, DataTable, Switch
+from textual.containers import Vertical, Horizontal, VerticalScroll, Container
+from textual_plotext import PlotextPlot
+
+from datetime import datetime
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+from ..back.detector_profile_HST import DetectorProfileHST
+
+class PlotTab(Container):
+ def __init__(self, profile: DetectorProfileHST, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.profile = profile
+ self.classes = "plot-card"
+
+ def compose(self):
+ yield PlotextPlot()
+
+ def on_mount(self):
+ self.set_interval(1, self.update_plot)
+
+ def update_plot(self):
+ plot_widget = self.query_one(PlotextPlot)
+ plt = plot_widget.plt
+
+ y = list(getattr(self.profile, "plot_data", []))
+
+ plt.clear_figure()
+ plt.theme("dark")
+
+ plt.plot(y, marker="dot", color="green")
+ plt.title("Anomaly Score")
+ plt.ylabel("last 30 windows")
+ plt.ylim(0, 1)
+
+ threshold = self.profile.params.get("threshold", 0.7)
+ if threshold is not None:
+ plt.horizontal_line(float(threshold), color="red")
+
+
+ plot_widget.refresh()
+
+
+class ShowProfilePushScreen(ModalScreen[str]):
+ def __init__(self, manager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.profile = self.manager.get_profile(profile_name)
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window large-modal"):
+ yield Label(f"Profile: {self.profile_name}", classes="modal-header")
+
+ with Horizontal(classes="modal-split-container"):
+
+ with Vertical(classes="left-panel"):
+ yield PlotTab(self.profile)
+
+ with Vertical(classes="right-panel"):
+ yield Label("Runtime Stats (Live)", classes="section-header")
+ with VerticalScroll(classes="info-box", id="stats-box"):
+ yield Pretty({}, id="runtime-stats-pretty")
+
+ yield Label("Configuration", classes="section-header")
+ with VerticalScroll(classes="info-box"):
+ yield Pretty(self.profile.to_dict())
+
+ with Container(classes="modal-footer"):
+ yield Button("Close", id="cancel-button", variant="primary")
+
+ def on_mount(self):
+ self.set_interval(1.0, self.update_stats)
+ self.update_stats()
+
+ def update_stats(self):
+ if self.profile:
+ stats = self.profile.get_runtime_stats()
+ self.query_one("#runtime-stats-pretty", Pretty).update(stats)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ self.dismiss(None)
+
+
+class ShowLogsPushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window medium-modal"):
+ yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header")
+
+ with Container(classes="table-container"):
+ yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Clear History", id="clear-button", variant="warning")
+ yield Button("Close", id="cancel-button", variant="primary")
+
+
+ def on_mount(self):
+ table = self.query_one("#logs_table", DataTable)
+ table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict")
+
+ logs = self.manager.get_profile_logs(self.profile_name)
+
+ if not logs:
+ return
+
+ sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True)
+
+ for log in sorted_logs:
+ dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S")
+
+ score = f"{log.get('score', 0):.4f}"
+ rate = f"{log.get('pkt_rate', 0):.1f}"
+ proto = str(log.get("proto_info", "-"))
+ verdict = "ANOMALY"
+
+ table.add_row(dt, score, rate, proto, verdict)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed) -> None:
+ if event.button.id == "clear-button":
+ profile = self.manager.get_profile(self.profile_name)
+ if profile:
+ profile.clear_logs()
+ self.query_one("#logs_table", DataTable).clear()
+ else:
+ self.dismiss(None)
+
+class SetDetectorNotificationPushScreen(ModalScreen[str]):
+ def __init__(self, manager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.profile = self.manager.get_profile(profile_name)
+
+ def compose(self) -> ComposeResult:
+ current_val = getattr(self.profile, 'notify_enabled', False)
+
+ with Container(classes="modal-window small-modal"):
+ yield Label(f"Notification: {self.profile_name}", classes="modal-header")
+
+ with Vertical(classes="section-card"):
+ yield Label("Enable notification (Discord)")
+ yield Switch(value=current_val, id="switch-anomaly")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Close", id="close-button", variant="primary")
+
+ @on(Switch.Changed)
+ def on_switch_changed(self, event: Switch.Changed):
+ if event.switch.id == "switch-anomaly":
+ self.profile.notify_enabled = event.value
+ self.manager.try_save_profiles(notify=False)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ self.dismiss(None)
+
+class ConfirmDeletePushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window small-modal"):
+ yield Label("Are you sure?",classes="modal-header")
+ with Horizontal(classes="modal-footer"):
+ yield Button("Delete", id="confirm-button", variant="error")
+ yield Button("Cancel", id="cancel-button", variant="default")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ self.manager.delete_profile(self.profile_name)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)