From 6d7410e286ce0fde31f89185c095fe90e85597f3 Mon Sep 17 00:00:00 2001 From: EnricoGuccii Date: Sat, 10 Jan 2026 22:43:36 +0100 Subject: bloat removed --- .../detector_profiles_tab.cpython-312.pyc | Bin 5924 -> 0 bytes ...tector_profiles_tab_pushscreens.cpython-312.pyc | Bin 13721 -> 0 bytes .../front/__pycache__/detector_tab.cpython-312.pyc | Bin 7672 -> 0 bytes .../detector_tab_pushscreens.cpython-312.pyc | Bin 2798 -> 0 bytes .../front/__pycache__/options_tab.cpython-312.pyc | Bin 3442 -> 0 bytes .../scanner_profiles_tab.cpython-312.pyc | Bin 6552 -> 0 bytes ...canner_profiles_tab_pushscreens.cpython-312.pyc | Bin 14133 -> 0 bytes .../front/__pycache__/scanner_tab.cpython-312.pyc | Bin 14995 -> 0 bytes .../scanner_tab_pushscreens.cpython-312.pyc | Bin 3075 -> 0 bytes src/netmonitor/front/detector_profiles_tab.py | 77 ------ .../front/detector_profiles_tab_pushscreens.py | 187 -------------- src/netmonitor/front/detector_tab.py | 135 ---------- src/netmonitor/front/detector_tab_pushscreens.py | 31 --- src/netmonitor/front/options_tab.py | 44 ---- src/netmonitor/front/scanner_profiles_tab.py | 84 ------- .../front/scanner_profiles_tab_pushscreens.py | 180 ------------- src/netmonitor/front/scanner_tab.py | 277 --------------------- src/netmonitor/front/scanner_tab_pushscreens.py | 35 --- 18 files changed, 1050 deletions(-) delete mode 100644 src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc delete mode 100644 src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc delete mode 100644 src/netmonitor/front/detector_profiles_tab.py delete mode 100644 src/netmonitor/front/detector_profiles_tab_pushscreens.py delete mode 100644 src/netmonitor/front/detector_tab.py delete mode 100644 src/netmonitor/front/detector_tab_pushscreens.py delete mode 100644 src/netmonitor/front/options_tab.py delete mode 100644 src/netmonitor/front/scanner_profiles_tab.py delete mode 100644 src/netmonitor/front/scanner_profiles_tab_pushscreens.py delete mode 100755 src/netmonitor/front/scanner_tab.py delete mode 100644 src/netmonitor/front/scanner_tab_pushscreens.py (limited to 'src/netmonitor/front') diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc deleted file mode 100644 index 6a32fac..0000000 Binary files a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc deleted file mode 100644 index a74ee0c..0000000 Binary files a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc deleted file mode 100644 index 0257359..0000000 Binary files a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc deleted file mode 100644 index 8672061..0000000 Binary files a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc deleted file mode 100644 index 227bfa8..0000000 Binary files a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc deleted file mode 100644 index d6fcdfb..0000000 Binary files a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc deleted file mode 100644 index a4db183..0000000 Binary files a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc deleted file mode 100644 index 8a8d03a..0000000 Binary files a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc deleted file mode 100644 index 28cc805..0000000 Binary files a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc and /dev/null differ diff --git a/src/netmonitor/front/detector_profiles_tab.py b/src/netmonitor/front/detector_profiles_tab.py deleted file mode 100644 index 7860e16..0000000 --- a/src/netmonitor/front/detector_profiles_tab.py +++ /dev/null @@ -1,77 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.widgets import Button, Label, Switch -from textual.containers import Vertical, Horizontal, VerticalScroll - -from ..back.detector_profiles_manager import DetectorProfilesManager -from .detector_profiles_tab_pushscreens import ConfirmDeletePushScreen, ShowLogsPushScreen, ShowProfilePushScreen, SetDetectorNotificationPushScreen - -class DetectorProfilesTab(Vertical): - def __init__(self, manager: DetectorProfilesManager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.manager.on_refresh = self.refresh_profiles - self.manager.on_message = self.on_manager_message - - def compose(self) -> ComposeResult: - yield VerticalScroll(id="profiles-list") - - def on_mount(self) -> None: - self.refresh_profiles() - - def refresh_profiles(self) -> None: - profiles_list = self.query_one("#profiles-list", VerticalScroll) - profiles_list.remove_children() - - if not self.manager.profiles: - return - - for profile in self.manager.profiles: - row = Horizontal( - Switch(id=f"activate-profile-switch-{profile.profile_name}", value=profile.is_active), - Label(f"{profile.profile_name}", classes="profile-profile_name"), - Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="primary"), - Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action button-delete", variant="error"), - classes="profile-row" - ) - profiles_list.mount(row) - - @on(Switch.Changed) - def any_switch_changed(self, event: Switch.Changed) -> None: - switch_id = event.switch.id - if not switch_id: - return - elif switch_id.startswith("activate-profile-switch"): - profile_name = switch_id.removeprefix("activate-profile-switch-") - - if event.switch.value: - is_turned_on = self.manager.turn_on_profile(profile_name) - if not is_turned_on: - event.switch.value = False - else: - is_turned_off = self.manager.turn_off_profile(profile_name) - if not is_turned_off: - event.switch.value = False - - @on(Button.Pressed) - def on_any_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id - if not button_id: - return - elif button_id.startswith("show-profile-button-"): - profile_name = button_id.removeprefix("show-profile-button-") - self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name)) - elif button_id.startswith("set-notifications-button-"): - profile_name = button_id.removeprefix("set-notifications-button-") - self.app.push_screen(SetDetectorNotificationPushScreen(self.manager, profile_name)) - elif button_id.startswith("show-logs-button-"): - profile_name = button_id.removeprefix("show-logs-button-") - self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name)) - elif button_id.startswith("delete-"): - profile_name = button_id.removeprefix("delete-") - self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name)) - - def on_manager_message(self, msg: str, title: str, severity): - self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/detector_profiles_tab_pushscreens.py b/src/netmonitor/front/detector_profiles_tab_pushscreens.py deleted file mode 100644 index fc5b731..0000000 --- a/src/netmonitor/front/detector_profiles_tab_pushscreens.py +++ /dev/null @@ -1,187 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.screen import ModalScreen -from textual.widgets import Button, Label, Pretty, DataTable, Switch -from textual.containers import Vertical, Horizontal, VerticalScroll, Container -from textual_plotext import PlotextPlot - -from datetime import datetime - -from ..back.detector_profiles_manager import DetectorProfilesManager -from ..back.detector_profile_HST import DetectorProfileHST - -class PlotTab(Container): - def __init__(self, profile: DetectorProfileHST, *args, **kwargs): - super().__init__(*args, **kwargs) - self.profile = profile - self.classes = "plot-card" - - def compose(self): - yield PlotextPlot() - - def on_mount(self): - self.set_interval(1, self.update_plot) - - def update_plot(self): - plot_widget = self.query_one(PlotextPlot) - plt = plot_widget.plt - - y = list(getattr(self.profile, "plot_data", [])) - - plt.clear_figure() - plt.theme("dark") - - plt.plot(y, marker="dot", color="green") - plt.title("Anomaly Score") - plt.ylabel("last 30 windows") - plt.ylim(0, 1) - - threshold = self.profile.params.get("threshold", 0.7) - if threshold is not None: - plt.horizontal_line(float(threshold), color="red") - - - plot_widget.refresh() - - -class ShowProfilePushScreen(ModalScreen[str]): - def __init__(self, manager, profile_name: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.profile = self.manager.get_profile(profile_name) - - def compose(self) -> ComposeResult: - with Container(classes="modal-window large-modal"): - yield Label(f"Profile: {self.profile_name}", classes="modal-header") - - with Horizontal(classes="modal-split-container"): - - with Vertical(classes="left-panel"): - yield PlotTab(self.profile) - - with Vertical(classes="right-panel"): - yield Label("Runtime Stats (Live)", classes="section-header") - with VerticalScroll(classes="info-box", id="stats-box"): - yield Pretty({}, id="runtime-stats-pretty") - - yield Label("Configuration", classes="section-header") - with VerticalScroll(classes="info-box"): - yield Pretty(self.profile.to_dict()) - - with Container(classes="modal-footer"): - yield Button("Close", id="cancel-button", variant="primary") - - def on_mount(self): - self.set_interval(1.0, self.update_stats) - self.update_stats() - - def update_stats(self): - if self.profile: - stats = self.profile.get_runtime_stats() - self.query_one("#runtime-stats-pretty", Pretty).update(stats) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - self.dismiss(None) - - -class ShowLogsPushScreen(ModalScreen[str]): - def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window medium-modal"): - yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header") - - with Container(classes="table-container"): - yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row") - - with Horizontal(classes="modal-footer"): - yield Button("Clear History", id="clear-button", variant="warning") - yield Button("Close", id="cancel-button", variant="primary") - - - def on_mount(self): - table = self.query_one("#logs_table", DataTable) - table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict") - - logs = self.manager.get_profile_logs(self.profile_name) - - if not logs: - return - - sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True) - - for log in sorted_logs: - dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S") - - score = f"{log.get('score', 0):.4f}" - rate = f"{log.get('pkt_rate', 0):.1f}" - proto = str(log.get("proto_info", "-")) - verdict = "ANOMALY" - - table.add_row(dt, score, rate, proto, verdict) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "clear-button": - profile = self.manager.get_profile(self.profile_name) - if profile: - profile.clear_logs() - self.query_one("#logs_table", DataTable).clear() - else: - self.dismiss(None) - -class SetDetectorNotificationPushScreen(ModalScreen[str]): - def __init__(self, manager, profile_name: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.profile = self.manager.get_profile(profile_name) - - def compose(self) -> ComposeResult: - current_val = getattr(self.profile, 'notify_enabled', False) - - with Container(classes="modal-window small-modal"): - yield Label(f"Notification: {self.profile_name}", classes="modal-header") - - with Vertical(classes="section-card"): - yield Label("Enable notification (Discord)") - yield Switch(value=current_val, id="switch-anomaly") - - with Horizontal(classes="modal-footer"): - yield Button("Close", id="close-button", variant="primary") - - @on(Switch.Changed) - def on_switch_changed(self, event: Switch.Changed): - if event.switch.id == "switch-anomaly": - self.profile.notify_enabled = event.value - self.manager.try_save_profiles(notify=False) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - self.dismiss(None) - -class ConfirmDeletePushScreen(ModalScreen[str]): - def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window small-modal"): - yield Label("Are you sure?",classes="modal-header") - with Horizontal(classes="modal-footer"): - yield Button("Delete", id="confirm-button", variant="error") - yield Button("Cancel", id="cancel-button", variant="default") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - self.manager.delete_profile(self.profile_name) - self.dismiss(None) - else: - self.dismiss(None) diff --git a/src/netmonitor/front/detector_tab.py b/src/netmonitor/front/detector_tab.py deleted file mode 100644 index 53dc908..0000000 --- a/src/netmonitor/front/detector_tab.py +++ /dev/null @@ -1,135 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.widgets import Input, Select, Button, Label, Checkbox -from textual.containers import Container, Horizontal, Vertical, VerticalScroll - -import psutil - -from ..back.detector_profiles_manager import DetectorProfilesManager -from ..back.window import FEATURE_LIST -from ..front.detector_tab_pushscreens import SaveProfilePushScreen - -class DetectorTab(Container): - def __init__(self, manager: DetectorProfilesManager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.manager.on_message = self.on_manager_message - - def compose(self) -> ComposeResult: - with Horizontal(id="top-config-container"): - - model_section = Container(id="model-section", classes="section-card") - model_section.border_title = "Algorithm: HalfSpaceTrees" - with model_section: - with VerticalScroll(classes="detector-scroll"): - yield Label("Select interface:", classes="label") - try: - ifaces = list(psutil.net_if_addrs().keys()) - except: - ifaces = [] - - yield Select.from_values( - ifaces, - id="interface-select", - allow_blank=False, - classes="input" - ) - - yield Label("Model params:", classes="label") - yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input") - yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input") - yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input") - yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input") - yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input") - yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input") - yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input") - - features_section = Container(id="features-section", classes="section-card") - features_section.border_title = "Flow-based Features" - with features_section: - with VerticalScroll(classes="detector-scroll"): - yield Label("Select features to include in model:", classes="label") - self.feature_checkboxes = {} - for feat in FEATURE_LIST: - cb = Checkbox(feat, value=True, classes="input") - self.feature_checkboxes[feat] = cb - yield cb - - bpf_section = Container(id="bpf-section", classes="section-card") - bpf_section.border_title = "BPF Filter (Optional)" - with bpf_section: - yield Input( - placeholder="BPF Filter (e.g. 'tcp port 80 or udp')", - id="param-bpf_filter", - classes="input full" - ) - with Container(classes="save-button-container"): - yield Button("Save Profile", id="save-button", variant="success") - - def get_inputs(self): - features = [f for f, cb in self.feature_checkboxes.items() if cb.value] - - if not features: - raise ValueError("Select at least one feature.") - - params = {} - - try: - interface = self.query_one("#interface-select", Select).value - if not interface: - raise ValueError("Select interface.") - params["interface"] = interface - except Exception: - raise ValueError("Interface selection error.") - - bpf_input = self.query_one("#param-bpf_filter", Input) - if bpf_input.value.strip(): - params["bpf_filter"] = bpf_input.value.strip() - - defaults = { - "trees": 10, - "height": 8, - "window": 250, - "seed": 42, - "threshold": 0.7, - "window_duration": 10.0, - "queue_size": 10000, - "bpf_filter": "" - } - - model_section = self.query_one("#model-section") - for inp in model_section.query("Input"): - if inp.id and inp.id.startswith("param-"): - key = inp.id.removeprefix("param-") - if key == "bpf_filter": continue - - val_str = inp.value.strip() - - if not val_str: - if key in defaults and defaults[key] is not None: - params[key] = defaults[key] - continue - - try: - if key in ["trees", "height", "window", "seed","queue_size"]: - params[key] = int(val_str) - elif key in ["threshold", "window_duration"]: - params[key] = float(val_str) - except ValueError: - raise ValueError(f"Param '{key}' must be a number.") - - return { - "features": features, - "params": params, - } - - @on(Button.Pressed, "#save-button") - async def handle_save_button(self, event: Button.Pressed): - try: - input_data = self.get_inputs() - self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data)) - except ValueError as e: - self.app.notify(str(e), title="Validation error", severity="error") - - def on_manager_message(self, msg: str, title: str, severity): - self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/detector_tab_pushscreens.py b/src/netmonitor/front/detector_tab_pushscreens.py deleted file mode 100644 index 625921b..0000000 --- a/src/netmonitor/front/detector_tab_pushscreens.py +++ /dev/null @@ -1,31 +0,0 @@ -from textual.app import ComposeResult -from textual.widgets import Input, Label, Button -from textual.containers import Vertical, Horizontal -from textual.screen import ModalScreen -from textual import on - -from ..back.detector_profiles_manager import DetectorProfilesManager - -class SaveProfilePushScreen(ModalScreen[str]): - def __init__(self, manager: DetectorProfilesManager, input_data , *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.input_data = input_data - - - def compose(self) -> ComposeResult: - with Vertical(classes="modal-window small-modal"): - yield Label("Save scan profile", classes="modal-header") - yield Input(placeholder="Profile name:", id="profile-name", classes="input") - with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"): - yield Button("Confirm", id="confirm-button", variant="success", classes="button") - yield Button("Cancel", id="cancel-button", variant="error", classes="button") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - profile_name = self.query_one("#profile-name", Input).value.strip() - self.manager.add_profile(profile_name,self.input_data) - self.dismiss(None) - else: - self.dismiss(None) diff --git a/src/netmonitor/front/options_tab.py b/src/netmonitor/front/options_tab.py deleted file mode 100644 index 6cbd448..0000000 --- a/src/netmonitor/front/options_tab.py +++ /dev/null @@ -1,44 +0,0 @@ -from textual.app import ComposeResult -from textual.containers import Container, Horizontal -from textual.widgets import Button, Input, Label -from textual import on - -from ..back.notification_service import notification_service - -class OptionsTab(Container): - def __init__(self, scanner_manager, detector_manager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.scanner_manager = scanner_manager - self.detector_manager = detector_manager - - def compose(self) -> ComposeResult: - notify_section = Container(id="notify-section", classes="section-card") - notify_section.border_title = "Notification config" - with notify_section: - yield Label("Discord Webhook URL:") - yield Input(placeholder="https://discord.com/api/webhooks/...", id="input-webhook-url") - - with Horizontal(classes="modal-footer"): - yield Button("Save config", id="save-config", variant="success") - yield Button("notification test", id="test-notif", variant="primary") - - def on_mount(self): - self.query_one("#input-webhook-url", Input).value = notification_service.webhook_url - - @on(Button.Pressed, "#save-config") - def save_configuration(self): - url = self.query_one("#input-webhook-url", Input).value.strip() - - if notification_service.save_config(url): - self.app.notify("Config saved", severity="information") - else: - self.app.notify("Error during saving", severity="error") - - @on(Button.Pressed, "#test-notif") - def test_notification(self): - success = notification_service.send_message("**Test NetMonitor**\n") - - if success: - self.app.notify("good", severity="information") - else: - self.app.notify("bad", severity="error") diff --git a/src/netmonitor/front/scanner_profiles_tab.py b/src/netmonitor/front/scanner_profiles_tab.py deleted file mode 100644 index 8e8610c..0000000 --- a/src/netmonitor/front/scanner_profiles_tab.py +++ /dev/null @@ -1,84 +0,0 @@ -from textual.widgets import Button, Label, Switch -from textual import on -from textual.app import ComposeResult -from textual.containers import Vertical, Horizontal, VerticalScroll - -from ..back.scanner_profiles_manager import ScannerProfilesManager -from .scanner_profiles_tab_pushscreens import ScanNowPushScreen, SetSchedulerPushScreen, ShowLogsPushScreen, SetNotifacationOptionsPushScreen, ShowProfilePushScreen, ConfirmDeletePushScreen - -class ScannerProfilesTab(Vertical): - def __init__(self, manager: ScannerProfilesManager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.manager.on_refresh = self.refresh_profiles - self.manager.on_message = self.on_manager_message - - def compose(self) -> ComposeResult: - yield VerticalScroll(id="profiles-list") - - def on_mount(self) -> None: - self.refresh_profiles() - - def refresh_profiles(self) -> None: - profiles_list = self.query_one("#profiles-list", VerticalScroll) - profiles_list.remove_children() - - if not self.manager.profiles: - return - - for profile in self.manager.profiles: - row = Horizontal( - Switch(id=f"switch-{profile.profile_name}", value=profile.is_active), - Button("Scan Now", id=f"scan-now-button-{profile.profile_name}", classes="profile-action", variant="success"), - Label(f"{profile.profile_name}", classes="profile-profile_name"), - Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="primary"), - Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Set scheduler", id=f"set-scheduler-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action", variant="error"), - classes="profile-row" - ) - profiles_list.mount(row) - - @on(Switch.Changed) - def switch_changed(self, event: Switch.Changed) -> None: - switch_id = event.switch.id - if not switch_id: - return - profile_name = switch_id.removeprefix("switch-") - - if event.switch.value: - is_turned_on = self.manager.turn_on_profile(profile_name) - if not is_turned_on: - event.switch.value = False - else: - is_turned_off = self.manager.turn_off_profile(profile_name) - if not is_turned_off: - event.switch.value = False - - @on(Button.Pressed) - def on_any_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id - if not button_id: - return - if button_id.startswith("scan-now-button-"): - profile_name = button_id.removeprefix("scan-now-button-") - self.app.push_screen(ScanNowPushScreen(self.manager, profile_name)) - elif button_id.startswith("show-profile-button-"): - profile_name = button_id.removeprefix("show-profile-button-") - self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name)) - elif button_id.startswith("set-scheduler-button-"): - profile_name = button_id.removeprefix("set-scheduler-button-") - self.app.push_screen(SetSchedulerPushScreen(self.manager, profile_name)) - elif button_id.startswith("set-notifications-button-"): - profile_name = button_id.removeprefix("set-notifications-button-") - self.app.push_screen(SetNotifacationOptionsPushScreen(self.manager, profile_name)) - elif button_id.startswith("delete-"): - profile_name = button_id.removeprefix("delete-") - self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name)) - elif button_id.startswith("show-logs-button-"): - profile_name = button_id.removeprefix("show-logs-button-") - self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name)) - - def on_manager_message(self, msg: str, title: str, severity): - self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py deleted file mode 100644 index 5be9941..0000000 --- a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py +++ /dev/null @@ -1,180 +0,0 @@ -from textual.app import ComposeResult -from textual import on -from textual.screen import ModalScreen -from textual.widgets import Input, Button, Label, Pretty, Select, Switch -from textual.containers import Vertical, Horizontal, VerticalScroll, Container - -from ..back.scanner_profiles_manager import ScannerProfilesManager - -class ScanNowPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window large-modal"): - yield Label(f"Scanning: {self.profile_name}", classes="modal-header") - with VerticalScroll(classes="info-box"): - yield Pretty(self.manager.get_profile(self.profile_name).scan()) - with Horizontal(classes="modal-footer"): - yield Button("Close", variant="primary") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - self.dismiss(None) - -class ShowProfilePushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window medium-modal"): - yield Label(f"Profile: {self.profile_name}", classes="modal-header") - with VerticalScroll(classes="info-box"): - yield Pretty(self.manager.get_profile(self.profile_name).to_dict()) - with Horizontal(classes="modal-footer"): - yield Button("Close", id="cancel-button", variant="primary") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - self.dismiss(None) - - -class SetSchedulerPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window small-modal"): - yield Label("Set CRON Scheduler", classes="modal-header") - yield Label("Format: min hour day month day_of_week", classes="label") - yield Input(placeholder="* * * * * ", id="cron-input", classes="input") - - with Horizontal(classes="modal-footer"): - yield Button("Confirm", id="confirm-button", variant="success") - yield Button("Cancel", id="cancel-button", variant="error") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - cron_input = self.query_one("#cron-input", Input).value.strip() - if cron_input: - self.manager.set_validated_scheduler(self.profile_name, cron_input) - self.dismiss(None) - else: - self.dismiss(None) - -class SetNotifacationOptionsPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.profile = self.manager.get_profile(profile_name) - - def compose(self) -> ComposeResult: - current_enabled = getattr(self.profile, 'notify_enabled', False) - current_cve_only = getattr(self.profile, 'notify_only_cve', False) - - with Container(classes="modal-window small-modal"): - yield Label(f"Notifications: {self.profile_name}", classes="modal-header") - - with Vertical(classes="section-card"): - yield Label("Enable notifications (Discord):") - yield Switch(value=current_enabled, id="switch-enable") - - yield Label("Notify only when scanner finds CVE") - yield Switch(value=current_cve_only, id="switch-cve-only") - - with Horizontal(classes="modal-footer"): - yield Button("Close", id="close-button", variant="primary") - - @on(Switch.Changed) - def on_switch_changed(self, event: Switch.Changed): - if event.switch.id == "switch-enable": - self.profile.notify_enabled = event.value - elif event.switch.id == "switch-cve-only": - self.profile.notify_only_cve = event.value - - self.manager.try_save_profiles(notify=False) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - self.dismiss(None) - -class ShowLogsPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.logs = [] - - def compose(self) -> ComposeResult: - with Container(classes="modal-window large-modal"): - yield Label(f"Logs: {self.profile_name}", classes="modal-header") - - yield Select([], prompt="Select date", id="scan-date-select") - - with VerticalScroll(classes="info-box"): - yield Pretty({}, id="log-content") - - with Horizontal(classes="modal-footer"): - yield Button("Close", id="cancel-button", variant="primary") - - def on_mount(self): - self.logs = self.manager.get_profile_logs(self.profile_name) - select = self.query_one("#scan-date-select", Select) - options = [] - - if self.logs: - for index, log in enumerate(reversed(self.logs)): - real_index = len(self.logs) - 1 - index - label = log.get('_timestamp', f"Scan #{real_index + 1} (no date)") - - options.append((str(label), real_index)) - - select.set_options(options) - - if options: - select.value = options[0][1] - - @on(Select.Changed, "#scan-date-select") - def on_date_selected(self, event: Select.Changed): - if event.value is not None: - index = event.value - if index == Select.BLANK: - self.query_one("#log-content", Pretty).update({}) - return - if 0 <= index < len(self.logs): - log_entry = self.logs[index] - self.query_one("#log-content", Pretty).update(log_entry) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - self.dismiss(None) - - -class ConfirmDeletePushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window small-modal"): - yield Label("Are you sure?",classes="modal-header") - with Horizontal(classes="modal-footer"): - yield Button("Delete", id="confirm-button", variant="error") - yield Button("Cancel", id="cancel-button", variant="default") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - self.manager.delete_profile(self.profile_name) - self.dismiss(None) - else: - self.dismiss(None) diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py deleted file mode 100755 index 868fa74..0000000 --- a/src/netmonitor/front/scanner_tab.py +++ /dev/null @@ -1,277 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty -from textual.widgets.selection_list import Selection -from textual.containers import Container, Vertical, Horizontal, VerticalScroll -from textual.events import Mount - -import psutil, ipaddress, shlex -from typing import List, Dict, Optional - -from ..back.scanner_profiles_manager import ScannerProfilesManager -from .scanner_tab_pushscreens import SaveProfilePushScreen - -class ScannerTab(Container): - - def __init__(self, manager: ScannerProfilesManager): - super().__init__() - self.manager = manager - self.manager.on_message = self.on_manager_message - - def compose(self) -> ComposeResult: - friendly_section = Container(id="friendly-command-section", classes="section card") - friendly_section.border_title = "Scanner Configuration" - - with friendly_section: - yield Label("Interface:", classes="label") - try: - interfaces = list(psutil.net_if_addrs().keys()) - except Exception: - interfaces = [] - - yield Select.from_values( - interfaces, - id="interface-select", - allow_blank=False, - classes="input" - ) - - yield Label("IP address:", classes="label") - yield Input( - value="127.0.0.1/32", - placeholder="192.168.0.0/24", - name="ip-input", - id="ip", - classes="input" - ) - - yield Label("Port range (or single port)", classes="label") - with Horizontal(id="port-range", classes="input-row"): - yield Input(placeholder="Start / Single", id="low-port-range", classes="input half") - yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half") - - yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox") - - yield Label("Scan options:", classes="label") - with Horizontal(classes="selection-row"): - yield SelectionList[str]( - Selection("Service version (-sV)", "-sV", False), - Selection("OS detection (-O)", "-O", False), - Selection("SYN scan (-sS)", "-sS", False), - Selection("UDP scan (-sU)", "-sU", False), - Selection("Fast scan (-F)", "-F", False), - Selection("Aggressive scan (-A)", "-A", False), - Selection("Ping skip (-Pn)", "-Pn", False), - Selection("Timing T4 (-T4)", "-T4", True), - classes="selection-list" - ) - - user_section = Container(id="user-command-section", classes="section card") - user_section.border_title = "Manual Command" - - with user_section: - yield Input( - placeholder="sudo nmap -sS 192.168.1.1", - name="user-nmap-input", - id="usercommand", - classes="input full" - ) - - - final_section = Container(id="final-section", classes="section card") - final_section.border_title = "Execution" - - with final_section: - with Vertical(id="final-command", classes="command-output"): - self.final_command = Pretty([], id="final-command-output") - yield self.final_command - with Vertical(id="buttons", classes="buttons-column"): - yield Button("Scan now", id="scan-button", variant="success") - yield Button("Save profile", id="save-button", variant="primary") - - results_section = Container(id="results-section", classes="section card") - results_section.border_title = "Scan Output" - - with results_section: - with VerticalScroll(id="results-scroll"): - self.results = Pretty([], id="results", classes="results") - yield self.results - - def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]: - errors: List[str] = [] - - if self.query_one("#cve-checkbox", Checkbox).value: - args = nmap_input.get("arguments", "") - if "-sV" not in args.split(): - errors.append("To check for CVE's, '-sV' option is required") - return errors - - targets = nmap_input.get("targets", "").strip() - if not targets: - errors.append("IP field cannot be empty") - return errors - - if not self.query_one("#usercommand", Input).value.strip(): - try: - ipaddress.ip_network(targets, strict=False) - except ValueError: - errors.append(f"Invalid IP address/CIDR: {targets}") - return errors - - if "ports" in nmap_input: - port_str = nmap_input["ports"] - if "-" in port_str: - try: - low, high = map(int, port_str.split("-")) - if not (1 <= low <= 65535 and 1 <= high <= 65535): - errors.append("Ports must be in range 1-65535") - if low > high: - errors.append("Start port cannot be greater than end port") - except ValueError: - errors.append("Ports (range) must be numbers") - else: - try: - port = int(port_str) - if not (1 <= port <= 65535): - errors.append("Port must be in range 1-65535") - except ValueError: - errors.append("Port must be a number") - - dry_run_err = self.dry_run_nmap_command(nmap_input) - if dry_run_err: - errors.append(dry_run_err) - return errors - - def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]: - try: - cmd = ["nmap"] - if "arguments" in nmap_input: - cmd += shlex.split(nmap_input["arguments"]) - if "ports" in nmap_input: - cmd += ["-p", nmap_input["ports"]] - cmd.append(nmap_input["targets"]) - return None - except Exception as e: - return f"Błąd weryfikacji komendy: {e}" - - def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict: - user_command = self.query_one("#usercommand", Input).value.strip() - low_port = self.query_one("#low-port-range", Input).value.strip() - high_port = self.query_one("#high-port-range", Input).value.strip() - interface = self.query_one("#interface-select", Select).value - targets = self.query_one("#ip", Input).value.strip() - options = self.query_one(SelectionList).selected - - if return_dict: - if user_command: - parts = shlex.split(user_command) - - if parts and parts[0] == "sudo": parts.pop(0) - if parts and parts[0] == "nmap": parts.pop(0) - - if not parts: - return {} - - target = parts[-1] - args = parts[:-1] - - return { - "targets": target, - "arguments": " ".join(args) - } - else: - nmap_dict = {"targets": targets} - - if low_port: - if high_port: - nmap_dict["ports"] = f"{low_port}-{high_port}" - else: - nmap_dict["ports"] = f"{low_port}" - - args: list[str] = [] - if options: - args.extend(options) - if interface: - args.extend(["-e", str(interface)]) - if args: - nmap_dict["arguments"] = " ".join(args) - - return {k: v for k, v in nmap_dict.items() if v is not None} - - if user_command: - return shlex.split(user_command) - - parts = ["nmap"] - if interface: - parts.extend(["-e", str(interface)]) - if options: - parts.extend(options) - - if low_port: - if high_port: - parts.extend(["-p", f"{low_port}-{high_port}"]) - else: - parts.extend(["-p", f"{low_port}"]) - - parts.append(targets) - return parts - - @on(Mount) - @on(SelectionList.SelectedChanged) - @on(Input.Changed) - @on(Select.Changed) - def update_selected_view(self, event=None) -> None: - self.final_command.update(" ".join(self.get_nmap_input())) - - @on(Button.Pressed, "#scan-button") - async def handle_scan_button(self, event: Button.Pressed) -> None: - nmap_input = self.get_nmap_input(return_dict=True) - if not isinstance(nmap_input, dict): - self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error") - return - errors = self.validate_inputs(nmap_input) - if errors: - self.notify("\n".join(errors), title="Bad Input", severity="error") - else: - try: - temp_prof_name = "temp_scan_profile" - self.notify("Start scanning", title="Notification", severity="information") - - if not self.manager.add_profile(temp_prof_name, notify=False): - self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error") - return - - if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False): - self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error") - self.manager.delete_profile(temp_prof_name, notify=False) - return - - profile = self.manager.get_profile(temp_prof_name) - if profile: - scan_now_results = profile.scan() - self.results.update(scan_now_results) - self.manager.delete_profile(temp_prof_name, notify=False) - self.notify("Scanning completed", title="Notification", severity="information") - else: - self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error") - - except Exception as e: - self.notify(f"Scanner error: {e}", title="Error", severity="error") - - @on(Button.Pressed, "#save-button") - async def handle_save_button(self, event: Button.Pressed): - nmap_input = self.get_nmap_input(return_dict=True) - if not isinstance(nmap_input, dict): - self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error") - return - - errors = self.validate_inputs(nmap_input) - if errors: - self.notify("\n".join(errors), title="Bad Input", severity="error") - return - - cve_check = self.query_one("#cve-checkbox", Checkbox).value - self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check)) - - def on_manager_message(self, msg: str, title: str, severity): - self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/scanner_tab_pushscreens.py b/src/netmonitor/front/scanner_tab_pushscreens.py deleted file mode 100644 index 4e4a564..0000000 --- a/src/netmonitor/front/scanner_tab_pushscreens.py +++ /dev/null @@ -1,35 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.widgets import Input, Button, Label -from textual.containers import Vertical, Horizontal -from textual.screen import ModalScreen - -from ..back.scanner_profiles_manager import ScannerProfilesManager - -class SaveProfilePushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, nmap_input, cve_check, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.nmap_input = nmap_input - self.cve_check = cve_check - - - def compose(self) -> ComposeResult: - with Vertical(classes="modal-window small-modal"): - yield Label("Save scan profile", classes="modal-header") - yield Input(placeholder="Profile name:", id="profile-name", classes="input") - with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"): - yield Button("Confirm", id="confirm-button", variant="success", classes="button") - yield Button("Cancel", id="cancel-button", variant="error", classes="button") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - profile_name = self.query_one("#profile-name", Input).value.strip() - self.manager.add_profile(profile_name) - self.manager.update_profile(profile_name,"nmap_input",self.nmap_input,notify=False) - self.manager.update_profile(profile_name,"cve",self.cve_check, notify=False) - self.dismiss(None) - else: - self.dismiss(None) - -- cgit v1.2.3