summaryrefslogtreecommitdiff
path: root/src/streamml/front
diff options
context:
space:
mode:
authorEnricoGuccii <partyka.003@proton.me>2026-01-10 22:43:36 +0100
committerEnricoGuccii <partyka.003@proton.me>2026-01-10 22:43:36 +0100
commit6d7410e286ce0fde31f89185c095fe90e85597f3 (patch)
tree5092c99d353382a71f00d8fe18d53b8073cf3f58 /src/streamml/front
parentc2f5fbe7fb93ce420caf23c5c0e06144cf953bb8 (diff)
bloat removed
Diffstat (limited to 'src/streamml/front')
-rw-r--r--src/streamml/front/__pycache__/detector_profiles_tab.cpython-312.pycbin0 -> 5920 bytes
-rw-r--r--src/streamml/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pycbin0 -> 13717 bytes
-rw-r--r--src/streamml/front/__pycache__/detector_tab.cpython-312.pycbin0 -> 7649 bytes
-rw-r--r--src/streamml/front/__pycache__/detector_tab_pushscreens.cpython-312.pycbin0 -> 2794 bytes
-rw-r--r--src/streamml/front/__pycache__/options_tab.cpython-312.pycbin0 -> 3393 bytes
-rw-r--r--src/streamml/front/detector_profiles_tab.py77
-rw-r--r--src/streamml/front/detector_profiles_tab_pushscreens.py187
-rw-r--r--src/streamml/front/detector_tab.py135
-rw-r--r--src/streamml/front/detector_tab_pushscreens.py31
-rw-r--r--src/streamml/front/options_tab.py43
10 files changed, 473 insertions, 0 deletions
diff --git a/src/streamml/front/__pycache__/detector_profiles_tab.cpython-312.pyc b/src/streamml/front/__pycache__/detector_profiles_tab.cpython-312.pyc
new file mode 100644
index 0000000..3e3957b
--- /dev/null
+++ b/src/streamml/front/__pycache__/detector_profiles_tab.cpython-312.pyc
Binary files differ
diff --git a/src/streamml/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc b/src/streamml/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
new file mode 100644
index 0000000..d812190
--- /dev/null
+++ b/src/streamml/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
Binary files differ
diff --git a/src/streamml/front/__pycache__/detector_tab.cpython-312.pyc b/src/streamml/front/__pycache__/detector_tab.cpython-312.pyc
new file mode 100644
index 0000000..ea78d4d
--- /dev/null
+++ b/src/streamml/front/__pycache__/detector_tab.cpython-312.pyc
Binary files differ
diff --git a/src/streamml/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc b/src/streamml/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
new file mode 100644
index 0000000..798edff
--- /dev/null
+++ b/src/streamml/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
Binary files differ
diff --git a/src/streamml/front/__pycache__/options_tab.cpython-312.pyc b/src/streamml/front/__pycache__/options_tab.cpython-312.pyc
new file mode 100644
index 0000000..a8a75aa
--- /dev/null
+++ b/src/streamml/front/__pycache__/options_tab.cpython-312.pyc
Binary files differ
diff --git a/src/streamml/front/detector_profiles_tab.py b/src/streamml/front/detector_profiles_tab.py
new file mode 100644
index 0000000..7860e16
--- /dev/null
+++ b/src/streamml/front/detector_profiles_tab.py
@@ -0,0 +1,77 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.widgets import Button, Label, Switch
+from textual.containers import Vertical, Horizontal, VerticalScroll
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+from .detector_profiles_tab_pushscreens import ConfirmDeletePushScreen, ShowLogsPushScreen, ShowProfilePushScreen, SetDetectorNotificationPushScreen
+
+class DetectorProfilesTab(Vertical):
+ def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.manager.on_refresh = self.refresh_profiles
+ self.manager.on_message = self.on_manager_message
+
+ def compose(self) -> ComposeResult:
+ yield VerticalScroll(id="profiles-list")
+
+ def on_mount(self) -> None:
+ self.refresh_profiles()
+
+ def refresh_profiles(self) -> None:
+ profiles_list = self.query_one("#profiles-list", VerticalScroll)
+ profiles_list.remove_children()
+
+ if not self.manager.profiles:
+ return
+
+ for profile in self.manager.profiles:
+ row = Horizontal(
+ Switch(id=f"activate-profile-switch-{profile.profile_name}", value=profile.is_active),
+ Label(f"{profile.profile_name}", classes="profile-profile_name"),
+ Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="primary"),
+ Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="default"),
+ Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"),
+ Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action button-delete", variant="error"),
+ classes="profile-row"
+ )
+ profiles_list.mount(row)
+
+ @on(Switch.Changed)
+ def any_switch_changed(self, event: Switch.Changed) -> None:
+ switch_id = event.switch.id
+ if not switch_id:
+ return
+ elif switch_id.startswith("activate-profile-switch"):
+ profile_name = switch_id.removeprefix("activate-profile-switch-")
+
+ if event.switch.value:
+ is_turned_on = self.manager.turn_on_profile(profile_name)
+ if not is_turned_on:
+ event.switch.value = False
+ else:
+ is_turned_off = self.manager.turn_off_profile(profile_name)
+ if not is_turned_off:
+ event.switch.value = False
+
+ @on(Button.Pressed)
+ def on_any_button_pressed(self, event: Button.Pressed) -> None:
+ button_id = event.button.id
+ if not button_id:
+ return
+ elif button_id.startswith("show-profile-button-"):
+ profile_name = button_id.removeprefix("show-profile-button-")
+ self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name))
+ elif button_id.startswith("set-notifications-button-"):
+ profile_name = button_id.removeprefix("set-notifications-button-")
+ self.app.push_screen(SetDetectorNotificationPushScreen(self.manager, profile_name))
+ elif button_id.startswith("show-logs-button-"):
+ profile_name = button_id.removeprefix("show-logs-button-")
+ self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name))
+ elif button_id.startswith("delete-"):
+ profile_name = button_id.removeprefix("delete-")
+ self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name))
+
+ def on_manager_message(self, msg: str, title: str, severity):
+ self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/streamml/front/detector_profiles_tab_pushscreens.py b/src/streamml/front/detector_profiles_tab_pushscreens.py
new file mode 100644
index 0000000..fc5b731
--- /dev/null
+++ b/src/streamml/front/detector_profiles_tab_pushscreens.py
@@ -0,0 +1,187 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.screen import ModalScreen
+from textual.widgets import Button, Label, Pretty, DataTable, Switch
+from textual.containers import Vertical, Horizontal, VerticalScroll, Container
+from textual_plotext import PlotextPlot
+
+from datetime import datetime
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+from ..back.detector_profile_HST import DetectorProfileHST
+
+class PlotTab(Container):
+ def __init__(self, profile: DetectorProfileHST, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.profile = profile
+ self.classes = "plot-card"
+
+ def compose(self):
+ yield PlotextPlot()
+
+ def on_mount(self):
+ self.set_interval(1, self.update_plot)
+
+ def update_plot(self):
+ plot_widget = self.query_one(PlotextPlot)
+ plt = plot_widget.plt
+
+ y = list(getattr(self.profile, "plot_data", []))
+
+ plt.clear_figure()
+ plt.theme("dark")
+
+ plt.plot(y, marker="dot", color="green")
+ plt.title("Anomaly Score")
+ plt.ylabel("last 30 windows")
+ plt.ylim(0, 1)
+
+ threshold = self.profile.params.get("threshold", 0.7)
+ if threshold is not None:
+ plt.horizontal_line(float(threshold), color="red")
+
+
+ plot_widget.refresh()
+
+
+class ShowProfilePushScreen(ModalScreen[str]):
+ def __init__(self, manager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.profile = self.manager.get_profile(profile_name)
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window large-modal"):
+ yield Label(f"Profile: {self.profile_name}", classes="modal-header")
+
+ with Horizontal(classes="modal-split-container"):
+
+ with Vertical(classes="left-panel"):
+ yield PlotTab(self.profile)
+
+ with Vertical(classes="right-panel"):
+ yield Label("Runtime Stats (Live)", classes="section-header")
+ with VerticalScroll(classes="info-box", id="stats-box"):
+ yield Pretty({}, id="runtime-stats-pretty")
+
+ yield Label("Configuration", classes="section-header")
+ with VerticalScroll(classes="info-box"):
+ yield Pretty(self.profile.to_dict())
+
+ with Container(classes="modal-footer"):
+ yield Button("Close", id="cancel-button", variant="primary")
+
+ def on_mount(self):
+ self.set_interval(1.0, self.update_stats)
+ self.update_stats()
+
+ def update_stats(self):
+ if self.profile:
+ stats = self.profile.get_runtime_stats()
+ self.query_one("#runtime-stats-pretty", Pretty).update(stats)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ self.dismiss(None)
+
+
+class ShowLogsPushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window medium-modal"):
+ yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header")
+
+ with Container(classes="table-container"):
+ yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Clear History", id="clear-button", variant="warning")
+ yield Button("Close", id="cancel-button", variant="primary")
+
+
+ def on_mount(self):
+ table = self.query_one("#logs_table", DataTable)
+ table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict")
+
+ logs = self.manager.get_profile_logs(self.profile_name)
+
+ if not logs:
+ return
+
+ sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True)
+
+ for log in sorted_logs:
+ dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S")
+
+ score = f"{log.get('score', 0):.4f}"
+ rate = f"{log.get('pkt_rate', 0):.1f}"
+ proto = str(log.get("proto_info", "-"))
+ verdict = "ANOMALY"
+
+ table.add_row(dt, score, rate, proto, verdict)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed) -> None:
+ if event.button.id == "clear-button":
+ profile = self.manager.get_profile(self.profile_name)
+ if profile:
+ profile.clear_logs()
+ self.query_one("#logs_table", DataTable).clear()
+ else:
+ self.dismiss(None)
+
+class SetDetectorNotificationPushScreen(ModalScreen[str]):
+ def __init__(self, manager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.profile = self.manager.get_profile(profile_name)
+
+ def compose(self) -> ComposeResult:
+ current_val = getattr(self.profile, 'notify_enabled', False)
+
+ with Container(classes="modal-window small-modal"):
+ yield Label(f"Notification: {self.profile_name}", classes="modal-header")
+
+ with Vertical(classes="section-card"):
+ yield Label("Enable notification (Discord)")
+ yield Switch(value=current_val, id="switch-anomaly")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Close", id="close-button", variant="primary")
+
+ @on(Switch.Changed)
+ def on_switch_changed(self, event: Switch.Changed):
+ if event.switch.id == "switch-anomaly":
+ self.profile.notify_enabled = event.value
+ self.manager.try_save_profiles(notify=False)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ self.dismiss(None)
+
+class ConfirmDeletePushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window small-modal"):
+ yield Label("Are you sure?",classes="modal-header")
+ with Horizontal(classes="modal-footer"):
+ yield Button("Delete", id="confirm-button", variant="error")
+ yield Button("Cancel", id="cancel-button", variant="default")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ self.manager.delete_profile(self.profile_name)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)
diff --git a/src/streamml/front/detector_tab.py b/src/streamml/front/detector_tab.py
new file mode 100644
index 0000000..f93a8cb
--- /dev/null
+++ b/src/streamml/front/detector_tab.py
@@ -0,0 +1,135 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.widgets import Input, Select, Button, Label, Checkbox
+from textual.containers import Container, Horizontal, VerticalScroll
+
+import psutil
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+from ..back.window import FEATURE_LIST
+from ..front.detector_tab_pushscreens import SaveProfilePushScreen
+
+class DetectorTab(Container):
+ def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.manager.on_message = self.on_manager_message
+
+ def compose(self) -> ComposeResult:
+ with Horizontal(id="top-config-container"):
+
+ model_section = Container(id="model-section", classes="section-card")
+ model_section.border_title = "Algorithm: HalfSpaceTrees"
+ with model_section:
+ with VerticalScroll(classes="detector-scroll"):
+ yield Label("Select interface:", classes="label")
+ try:
+ ifaces = list(psutil.net_if_addrs().keys())
+ except:
+ ifaces = []
+
+ yield Select.from_values(
+ ifaces,
+ id="interface-select",
+ allow_blank=False,
+ classes="input"
+ )
+
+ yield Label("Model params:", classes="label")
+ yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input")
+ yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input")
+ yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input")
+ yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input")
+ yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input")
+ yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input")
+ yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input")
+
+ features_section = Container(id="features-section", classes="section-card")
+ features_section.border_title = "Flow-based Features"
+ with features_section:
+ with VerticalScroll(classes="detector-scroll"):
+ yield Label("Select features to include in model:", classes="label")
+ self.feature_checkboxes = {}
+ for feat in FEATURE_LIST:
+ cb = Checkbox(feat, value=True, classes="input")
+ self.feature_checkboxes[feat] = cb
+ yield cb
+
+ bpf_section = Container(id="bpf-section", classes="section-card")
+ bpf_section.border_title = "BPF Filter (Optional)"
+ with bpf_section:
+ yield Input(
+ placeholder="BPF Filter (e.g. 'tcp port 80 or udp')",
+ id="param-bpf_filter",
+ classes="input full"
+ )
+ with Container(classes="save-button-container"):
+ yield Button("Save Profile", id="save-button", variant="success")
+
+ def get_inputs(self):
+ features = [f for f, cb in self.feature_checkboxes.items() if cb.value]
+
+ if not features:
+ raise ValueError("Select at least one feature.")
+
+ params = {}
+
+ try:
+ interface = self.query_one("#interface-select", Select).value
+ if not interface:
+ raise ValueError("Select interface.")
+ params["interface"] = interface
+ except Exception:
+ raise ValueError("Interface selection error.")
+
+ bpf_input = self.query_one("#param-bpf_filter", Input)
+ if bpf_input.value.strip():
+ params["bpf_filter"] = bpf_input.value.strip()
+
+ defaults = {
+ "trees": 10,
+ "height": 8,
+ "window": 250,
+ "seed": 42,
+ "threshold": 0.7,
+ "window_duration": 10.0,
+ "queue_size": 10000,
+ "bpf_filter": ""
+ }
+
+ model_section = self.query_one("#model-section")
+ for inp in model_section.query("Input"):
+ if inp.id and inp.id.startswith("param-"):
+ key = inp.id.removeprefix("param-")
+ if key == "bpf_filter": continue
+
+ val_str = inp.value.strip()
+
+ if not val_str:
+ if key in defaults and defaults[key] is not None:
+ params[key] = defaults[key]
+ continue
+
+ try:
+ if key in ["trees", "height", "window", "seed","queue_size"]:
+ params[key] = int(val_str)
+ elif key in ["threshold", "window_duration"]:
+ params[key] = float(val_str)
+ except ValueError:
+ raise ValueError(f"Param '{key}' must be a number.")
+
+ return {
+ "features": features,
+ "params": params,
+ }
+
+ @on(Button.Pressed, "#save-button")
+ async def handle_save_button(self, event: Button.Pressed):
+ try:
+ input_data = self.get_inputs()
+ self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data))
+ except ValueError as e:
+ self.app.notify(str(e), title="Validation error", severity="error")
+
+ def on_manager_message(self, msg: str, title: str, severity):
+ self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/streamml/front/detector_tab_pushscreens.py b/src/streamml/front/detector_tab_pushscreens.py
new file mode 100644
index 0000000..625921b
--- /dev/null
+++ b/src/streamml/front/detector_tab_pushscreens.py
@@ -0,0 +1,31 @@
+from textual.app import ComposeResult
+from textual.widgets import Input, Label, Button
+from textual.containers import Vertical, Horizontal
+from textual.screen import ModalScreen
+from textual import on
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+
+class SaveProfilePushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, input_data , *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.input_data = input_data
+
+
+ def compose(self) -> ComposeResult:
+ with Vertical(classes="modal-window small-modal"):
+ yield Label("Save scan profile", classes="modal-header")
+ yield Input(placeholder="Profile name:", id="profile-name", classes="input")
+ with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"):
+ yield Button("Confirm", id="confirm-button", variant="success", classes="button")
+ yield Button("Cancel", id="cancel-button", variant="error", classes="button")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ profile_name = self.query_one("#profile-name", Input).value.strip()
+ self.manager.add_profile(profile_name,self.input_data)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)
diff --git a/src/streamml/front/options_tab.py b/src/streamml/front/options_tab.py
new file mode 100644
index 0000000..22bd43a
--- /dev/null
+++ b/src/streamml/front/options_tab.py
@@ -0,0 +1,43 @@
+from textual.app import ComposeResult
+from textual.containers import Container, Horizontal
+from textual.widgets import Button, Input, Label
+from textual import on
+
+from ..back.notification_service import notification_service
+
+class OptionsTab(Container):
+ def __init__(self, detector_manager, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.detector_manager = detector_manager
+
+ def compose(self) -> ComposeResult:
+ notify_section = Container(id="notify-section", classes="section-card")
+ notify_section.border_title = "Notification config"
+ with notify_section:
+ yield Label("Discord Webhook URL:")
+ yield Input(placeholder="https://discord.com/api/webhooks/...", id="input-webhook-url")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Save config", id="save-config", variant="success")
+ yield Button("notification test", id="test-notif", variant="primary")
+
+ def on_mount(self):
+ self.query_one("#input-webhook-url", Input).value = notification_service.webhook_url
+
+ @on(Button.Pressed, "#save-config")
+ def save_configuration(self):
+ url = self.query_one("#input-webhook-url", Input).value.strip()
+
+ if notification_service.save_config(url):
+ self.app.notify("Config saved", severity="information")
+ else:
+ self.app.notify("Error during saving", severity="error")
+
+ @on(Button.Pressed, "#test-notif")
+ def test_notification(self):
+ success = notification_service.send_message("**Test NetMonitor**\n")
+
+ if success:
+ self.app.notify("good", severity="information")
+ else:
+ self.app.notify("bad", severity="error")