diff options
| author | EnricoGuccii <partyka.003@proton.me> | 2026-01-10 22:43:36 +0100 |
|---|---|---|
| committer | EnricoGuccii <partyka.003@proton.me> | 2026-01-10 22:43:36 +0100 |
| commit | 6d7410e286ce0fde31f89185c095fe90e85597f3 (patch) | |
| tree | 5092c99d353382a71f00d8fe18d53b8073cf3f58 /src/netmonitor/front | |
| parent | c2f5fbe7fb93ce420caf23c5c0e06144cf953bb8 (diff) | |
bloat removed
Diffstat (limited to 'src/netmonitor/front')
18 files changed, 0 insertions, 1050 deletions
diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc Binary files differdeleted file mode 100644 index 6a32fac..0000000 --- a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc Binary files differdeleted file mode 100644 index a74ee0c..0000000 --- a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc Binary files differdeleted file mode 100644 index 0257359..0000000 --- a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc Binary files differdeleted file mode 100644 index 8672061..0000000 --- a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc Binary files differdeleted file mode 100644 index 227bfa8..0000000 --- a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc Binary files differdeleted file mode 100644 index d6fcdfb..0000000 --- a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc Binary files differdeleted file mode 100644 index a4db183..0000000 --- a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc Binary files differdeleted file mode 100644 index 8a8d03a..0000000 --- a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc Binary files differdeleted file mode 100644 index 28cc805..0000000 --- a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc +++ /dev/null diff --git a/src/netmonitor/front/detector_profiles_tab.py b/src/netmonitor/front/detector_profiles_tab.py deleted file mode 100644 index 7860e16..0000000 --- a/src/netmonitor/front/detector_profiles_tab.py +++ /dev/null @@ -1,77 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.widgets import Button, Label, Switch -from textual.containers import Vertical, Horizontal, VerticalScroll - -from ..back.detector_profiles_manager import DetectorProfilesManager -from .detector_profiles_tab_pushscreens import ConfirmDeletePushScreen, ShowLogsPushScreen, ShowProfilePushScreen, SetDetectorNotificationPushScreen - -class DetectorProfilesTab(Vertical): - def __init__(self, manager: DetectorProfilesManager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.manager.on_refresh = self.refresh_profiles - self.manager.on_message = self.on_manager_message - - def compose(self) -> ComposeResult: - yield VerticalScroll(id="profiles-list") - - def on_mount(self) -> None: - self.refresh_profiles() - - def refresh_profiles(self) -> None: - profiles_list = self.query_one("#profiles-list", VerticalScroll) - profiles_list.remove_children() - - if not self.manager.profiles: - return - - for profile in self.manager.profiles: - row = Horizontal( - Switch(id=f"activate-profile-switch-{profile.profile_name}", value=profile.is_active), - Label(f"{profile.profile_name}", classes="profile-profile_name"), - Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="primary"), - Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action button-delete", variant="error"), - classes="profile-row" - ) - profiles_list.mount(row) - - @on(Switch.Changed) - def any_switch_changed(self, event: Switch.Changed) -> None: - switch_id = event.switch.id - if not switch_id: - return - elif switch_id.startswith("activate-profile-switch"): - profile_name = switch_id.removeprefix("activate-profile-switch-") - - if event.switch.value: - is_turned_on = self.manager.turn_on_profile(profile_name) - if not is_turned_on: - event.switch.value = False - else: - is_turned_off = self.manager.turn_off_profile(profile_name) - if not is_turned_off: - event.switch.value = False - - @on(Button.Pressed) - def on_any_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id - if not button_id: - return - elif button_id.startswith("show-profile-button-"): - profile_name = button_id.removeprefix("show-profile-button-") - self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name)) - elif button_id.startswith("set-notifications-button-"): - profile_name = button_id.removeprefix("set-notifications-button-") - self.app.push_screen(SetDetectorNotificationPushScreen(self.manager, profile_name)) - elif button_id.startswith("show-logs-button-"): - profile_name = button_id.removeprefix("show-logs-button-") - self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name)) - elif button_id.startswith("delete-"): - profile_name = button_id.removeprefix("delete-") - self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name)) - - def on_manager_message(self, msg: str, title: str, severity): - self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/detector_profiles_tab_pushscreens.py b/src/netmonitor/front/detector_profiles_tab_pushscreens.py deleted file mode 100644 index fc5b731..0000000 --- a/src/netmonitor/front/detector_profiles_tab_pushscreens.py +++ /dev/null @@ -1,187 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.screen import ModalScreen -from textual.widgets import Button, Label, Pretty, DataTable, Switch -from textual.containers import Vertical, Horizontal, VerticalScroll, Container -from textual_plotext import PlotextPlot - -from datetime import datetime - -from ..back.detector_profiles_manager import DetectorProfilesManager -from ..back.detector_profile_HST import DetectorProfileHST - -class PlotTab(Container): - def __init__(self, profile: DetectorProfileHST, *args, **kwargs): - super().__init__(*args, **kwargs) - self.profile = profile - self.classes = "plot-card" - - def compose(self): - yield PlotextPlot() - - def on_mount(self): - self.set_interval(1, self.update_plot) - - def update_plot(self): - plot_widget = self.query_one(PlotextPlot) - plt = plot_widget.plt - - y = list(getattr(self.profile, "plot_data", [])) - - plt.clear_figure() - plt.theme("dark") - - plt.plot(y, marker="dot", color="green") - plt.title("Anomaly Score") - plt.ylabel("last 30 windows") - plt.ylim(0, 1) - - threshold = self.profile.params.get("threshold", 0.7) - if threshold is not None: - plt.horizontal_line(float(threshold), color="red") - - - plot_widget.refresh() - - -class ShowProfilePushScreen(ModalScreen[str]): - def __init__(self, manager, profile_name: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.profile = self.manager.get_profile(profile_name) - - def compose(self) -> ComposeResult: - with Container(classes="modal-window large-modal"): - yield Label(f"Profile: {self.profile_name}", classes="modal-header") - - with Horizontal(classes="modal-split-container"): - - with Vertical(classes="left-panel"): - yield PlotTab(self.profile) - - with Vertical(classes="right-panel"): - yield Label("Runtime Stats (Live)", classes="section-header") - with VerticalScroll(classes="info-box", id="stats-box"): - yield Pretty({}, id="runtime-stats-pretty") - - yield Label("Configuration", classes="section-header") - with VerticalScroll(classes="info-box"): - yield Pretty(self.profile.to_dict()) - - with Container(classes="modal-footer"): - yield Button("Close", id="cancel-button", variant="primary") - - def on_mount(self): - self.set_interval(1.0, self.update_stats) - self.update_stats() - - def update_stats(self): - if self.profile: - stats = self.profile.get_runtime_stats() - self.query_one("#runtime-stats-pretty", Pretty).update(stats) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - self.dismiss(None) - - -class ShowLogsPushScreen(ModalScreen[str]): - def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window medium-modal"): - yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header") - - with Container(classes="table-container"): - yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row") - - with Horizontal(classes="modal-footer"): - yield Button("Clear History", id="clear-button", variant="warning") - yield Button("Close", id="cancel-button", variant="primary") - - - def on_mount(self): - table = self.query_one("#logs_table", DataTable) - table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict") - - logs = self.manager.get_profile_logs(self.profile_name) - - if not logs: - return - - sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True) - - for log in sorted_logs: - dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S") - - score = f"{log.get('score', 0):.4f}" - rate = f"{log.get('pkt_rate', 0):.1f}" - proto = str(log.get("proto_info", "-")) - verdict = "ANOMALY" - - table.add_row(dt, score, rate, proto, verdict) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - if event.button.id == "clear-button": - profile = self.manager.get_profile(self.profile_name) - if profile: - profile.clear_logs() - self.query_one("#logs_table", DataTable).clear() - else: - self.dismiss(None) - -class SetDetectorNotificationPushScreen(ModalScreen[str]): - def __init__(self, manager, profile_name: str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.profile = self.manager.get_profile(profile_name) - - def compose(self) -> ComposeResult: - current_val = getattr(self.profile, 'notify_enabled', False) - - with Container(classes="modal-window small-modal"): - yield Label(f"Notification: {self.profile_name}", classes="modal-header") - - with Vertical(classes="section-card"): - yield Label("Enable notification (Discord)") - yield Switch(value=current_val, id="switch-anomaly") - - with Horizontal(classes="modal-footer"): - yield Button("Close", id="close-button", variant="primary") - - @on(Switch.Changed) - def on_switch_changed(self, event: Switch.Changed): - if event.switch.id == "switch-anomaly": - self.profile.notify_enabled = event.value - self.manager.try_save_profiles(notify=False) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - self.dismiss(None) - -class ConfirmDeletePushScreen(ModalScreen[str]): - def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window small-modal"): - yield Label("Are you sure?",classes="modal-header") - with Horizontal(classes="modal-footer"): - yield Button("Delete", id="confirm-button", variant="error") - yield Button("Cancel", id="cancel-button", variant="default") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - self.manager.delete_profile(self.profile_name) - self.dismiss(None) - else: - self.dismiss(None) diff --git a/src/netmonitor/front/detector_tab.py b/src/netmonitor/front/detector_tab.py deleted file mode 100644 index 53dc908..0000000 --- a/src/netmonitor/front/detector_tab.py +++ /dev/null @@ -1,135 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.widgets import Input, Select, Button, Label, Checkbox -from textual.containers import Container, Horizontal, Vertical, VerticalScroll - -import psutil - -from ..back.detector_profiles_manager import DetectorProfilesManager -from ..back.window import FEATURE_LIST -from ..front.detector_tab_pushscreens import SaveProfilePushScreen - -class DetectorTab(Container): - def __init__(self, manager: DetectorProfilesManager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.manager.on_message = self.on_manager_message - - def compose(self) -> ComposeResult: - with Horizontal(id="top-config-container"): - - model_section = Container(id="model-section", classes="section-card") - model_section.border_title = "Algorithm: HalfSpaceTrees" - with model_section: - with VerticalScroll(classes="detector-scroll"): - yield Label("Select interface:", classes="label") - try: - ifaces = list(psutil.net_if_addrs().keys()) - except: - ifaces = [] - - yield Select.from_values( - ifaces, - id="interface-select", - allow_blank=False, - classes="input" - ) - - yield Label("Model params:", classes="label") - yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input") - yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input") - yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input") - yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input") - yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input") - yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input") - yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input") - - features_section = Container(id="features-section", classes="section-card") - features_section.border_title = "Flow-based Features" - with features_section: - with VerticalScroll(classes="detector-scroll"): - yield Label("Select features to include in model:", classes="label") - self.feature_checkboxes = {} - for feat in FEATURE_LIST: - cb = Checkbox(feat, value=True, classes="input") - self.feature_checkboxes[feat] = cb - yield cb - - bpf_section = Container(id="bpf-section", classes="section-card") - bpf_section.border_title = "BPF Filter (Optional)" - with bpf_section: - yield Input( - placeholder="BPF Filter (e.g. 'tcp port 80 or udp')", - id="param-bpf_filter", - classes="input full" - ) - with Container(classes="save-button-container"): - yield Button("Save Profile", id="save-button", variant="success") - - def get_inputs(self): - features = [f for f, cb in self.feature_checkboxes.items() if cb.value] - - if not features: - raise ValueError("Select at least one feature.") - - params = {} - - try: - interface = self.query_one("#interface-select", Select).value - if not interface: - raise ValueError("Select interface.") - params["interface"] = interface - except Exception: - raise ValueError("Interface selection error.") - - bpf_input = self.query_one("#param-bpf_filter", Input) - if bpf_input.value.strip(): - params["bpf_filter"] = bpf_input.value.strip() - - defaults = { - "trees": 10, - "height": 8, - "window": 250, - "seed": 42, - "threshold": 0.7, - "window_duration": 10.0, - "queue_size": 10000, - "bpf_filter": "" - } - - model_section = self.query_one("#model-section") - for inp in model_section.query("Input"): - if inp.id and inp.id.startswith("param-"): - key = inp.id.removeprefix("param-") - if key == "bpf_filter": continue - - val_str = inp.value.strip() - - if not val_str: - if key in defaults and defaults[key] is not None: - params[key] = defaults[key] - continue - - try: - if key in ["trees", "height", "window", "seed","queue_size"]: - params[key] = int(val_str) - elif key in ["threshold", "window_duration"]: - params[key] = float(val_str) - except ValueError: - raise ValueError(f"Param '{key}' must be a number.") - - return { - "features": features, - "params": params, - } - - @on(Button.Pressed, "#save-button") - async def handle_save_button(self, event: Button.Pressed): - try: - input_data = self.get_inputs() - self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data)) - except ValueError as e: - self.app.notify(str(e), title="Validation error", severity="error") - - def on_manager_message(self, msg: str, title: str, severity): - self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/detector_tab_pushscreens.py b/src/netmonitor/front/detector_tab_pushscreens.py deleted file mode 100644 index 625921b..0000000 --- a/src/netmonitor/front/detector_tab_pushscreens.py +++ /dev/null @@ -1,31 +0,0 @@ -from textual.app import ComposeResult -from textual.widgets import Input, Label, Button -from textual.containers import Vertical, Horizontal -from textual.screen import ModalScreen -from textual import on - -from ..back.detector_profiles_manager import DetectorProfilesManager - -class SaveProfilePushScreen(ModalScreen[str]): - def __init__(self, manager: DetectorProfilesManager, input_data , *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.input_data = input_data - - - def compose(self) -> ComposeResult: - with Vertical(classes="modal-window small-modal"): - yield Label("Save scan profile", classes="modal-header") - yield Input(placeholder="Profile name:", id="profile-name", classes="input") - with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"): - yield Button("Confirm", id="confirm-button", variant="success", classes="button") - yield Button("Cancel", id="cancel-button", variant="error", classes="button") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - profile_name = self.query_one("#profile-name", Input).value.strip() - self.manager.add_profile(profile_name,self.input_data) - self.dismiss(None) - else: - self.dismiss(None) diff --git a/src/netmonitor/front/options_tab.py b/src/netmonitor/front/options_tab.py deleted file mode 100644 index 6cbd448..0000000 --- a/src/netmonitor/front/options_tab.py +++ /dev/null @@ -1,44 +0,0 @@ -from textual.app import ComposeResult -from textual.containers import Container, Horizontal -from textual.widgets import Button, Input, Label -from textual import on - -from ..back.notification_service import notification_service - -class OptionsTab(Container): - def __init__(self, scanner_manager, detector_manager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.scanner_manager = scanner_manager - self.detector_manager = detector_manager - - def compose(self) -> ComposeResult: - notify_section = Container(id="notify-section", classes="section-card") - notify_section.border_title = "Notification config" - with notify_section: - yield Label("Discord Webhook URL:") - yield Input(placeholder="https://discord.com/api/webhooks/...", id="input-webhook-url") - - with Horizontal(classes="modal-footer"): - yield Button("Save config", id="save-config", variant="success") - yield Button("notification test", id="test-notif", variant="primary") - - def on_mount(self): - self.query_one("#input-webhook-url", Input).value = notification_service.webhook_url - - @on(Button.Pressed, "#save-config") - def save_configuration(self): - url = self.query_one("#input-webhook-url", Input).value.strip() - - if notification_service.save_config(url): - self.app.notify("Config saved", severity="information") - else: - self.app.notify("Error during saving", severity="error") - - @on(Button.Pressed, "#test-notif") - def test_notification(self): - success = notification_service.send_message("**Test NetMonitor**\n") - - if success: - self.app.notify("good", severity="information") - else: - self.app.notify("bad", severity="error") diff --git a/src/netmonitor/front/scanner_profiles_tab.py b/src/netmonitor/front/scanner_profiles_tab.py deleted file mode 100644 index 8e8610c..0000000 --- a/src/netmonitor/front/scanner_profiles_tab.py +++ /dev/null @@ -1,84 +0,0 @@ -from textual.widgets import Button, Label, Switch -from textual import on -from textual.app import ComposeResult -from textual.containers import Vertical, Horizontal, VerticalScroll - -from ..back.scanner_profiles_manager import ScannerProfilesManager -from .scanner_profiles_tab_pushscreens import ScanNowPushScreen, SetSchedulerPushScreen, ShowLogsPushScreen, SetNotifacationOptionsPushScreen, ShowProfilePushScreen, ConfirmDeletePushScreen - -class ScannerProfilesTab(Vertical): - def __init__(self, manager: ScannerProfilesManager, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.manager.on_refresh = self.refresh_profiles - self.manager.on_message = self.on_manager_message - - def compose(self) -> ComposeResult: - yield VerticalScroll(id="profiles-list") - - def on_mount(self) -> None: - self.refresh_profiles() - - def refresh_profiles(self) -> None: - profiles_list = self.query_one("#profiles-list", VerticalScroll) - profiles_list.remove_children() - - if not self.manager.profiles: - return - - for profile in self.manager.profiles: - row = Horizontal( - Switch(id=f"switch-{profile.profile_name}", value=profile.is_active), - Button("Scan Now", id=f"scan-now-button-{profile.profile_name}", classes="profile-action", variant="success"), - Label(f"{profile.profile_name}", classes="profile-profile_name"), - Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="primary"), - Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Set scheduler", id=f"set-scheduler-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"), - Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action", variant="error"), - classes="profile-row" - ) - profiles_list.mount(row) - - @on(Switch.Changed) - def switch_changed(self, event: Switch.Changed) -> None: - switch_id = event.switch.id - if not switch_id: - return - profile_name = switch_id.removeprefix("switch-") - - if event.switch.value: - is_turned_on = self.manager.turn_on_profile(profile_name) - if not is_turned_on: - event.switch.value = False - else: - is_turned_off = self.manager.turn_off_profile(profile_name) - if not is_turned_off: - event.switch.value = False - - @on(Button.Pressed) - def on_any_button_pressed(self, event: Button.Pressed) -> None: - button_id = event.button.id - if not button_id: - return - if button_id.startswith("scan-now-button-"): - profile_name = button_id.removeprefix("scan-now-button-") - self.app.push_screen(ScanNowPushScreen(self.manager, profile_name)) - elif button_id.startswith("show-profile-button-"): - profile_name = button_id.removeprefix("show-profile-button-") - self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name)) - elif button_id.startswith("set-scheduler-button-"): - profile_name = button_id.removeprefix("set-scheduler-button-") - self.app.push_screen(SetSchedulerPushScreen(self.manager, profile_name)) - elif button_id.startswith("set-notifications-button-"): - profile_name = button_id.removeprefix("set-notifications-button-") - self.app.push_screen(SetNotifacationOptionsPushScreen(self.manager, profile_name)) - elif button_id.startswith("delete-"): - profile_name = button_id.removeprefix("delete-") - self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name)) - elif button_id.startswith("show-logs-button-"): - profile_name = button_id.removeprefix("show-logs-button-") - self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name)) - - def on_manager_message(self, msg: str, title: str, severity): - self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py deleted file mode 100644 index 5be9941..0000000 --- a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py +++ /dev/null @@ -1,180 +0,0 @@ -from textual.app import ComposeResult -from textual import on -from textual.screen import ModalScreen -from textual.widgets import Input, Button, Label, Pretty, Select, Switch -from textual.containers import Vertical, Horizontal, VerticalScroll, Container - -from ..back.scanner_profiles_manager import ScannerProfilesManager - -class ScanNowPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window large-modal"): - yield Label(f"Scanning: {self.profile_name}", classes="modal-header") - with VerticalScroll(classes="info-box"): - yield Pretty(self.manager.get_profile(self.profile_name).scan()) - with Horizontal(classes="modal-footer"): - yield Button("Close", variant="primary") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - self.dismiss(None) - -class ShowProfilePushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window medium-modal"): - yield Label(f"Profile: {self.profile_name}", classes="modal-header") - with VerticalScroll(classes="info-box"): - yield Pretty(self.manager.get_profile(self.profile_name).to_dict()) - with Horizontal(classes="modal-footer"): - yield Button("Close", id="cancel-button", variant="primary") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - self.dismiss(None) - - -class SetSchedulerPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window small-modal"): - yield Label("Set CRON Scheduler", classes="modal-header") - yield Label("Format: min hour day month day_of_week", classes="label") - yield Input(placeholder="* * * * * ", id="cron-input", classes="input") - - with Horizontal(classes="modal-footer"): - yield Button("Confirm", id="confirm-button", variant="success") - yield Button("Cancel", id="cancel-button", variant="error") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - cron_input = self.query_one("#cron-input", Input).value.strip() - if cron_input: - self.manager.set_validated_scheduler(self.profile_name, cron_input) - self.dismiss(None) - else: - self.dismiss(None) - -class SetNotifacationOptionsPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.profile = self.manager.get_profile(profile_name) - - def compose(self) -> ComposeResult: - current_enabled = getattr(self.profile, 'notify_enabled', False) - current_cve_only = getattr(self.profile, 'notify_only_cve', False) - - with Container(classes="modal-window small-modal"): - yield Label(f"Notifications: {self.profile_name}", classes="modal-header") - - with Vertical(classes="section-card"): - yield Label("Enable notifications (Discord):") - yield Switch(value=current_enabled, id="switch-enable") - - yield Label("Notify only when scanner finds CVE") - yield Switch(value=current_cve_only, id="switch-cve-only") - - with Horizontal(classes="modal-footer"): - yield Button("Close", id="close-button", variant="primary") - - @on(Switch.Changed) - def on_switch_changed(self, event: Switch.Changed): - if event.switch.id == "switch-enable": - self.profile.notify_enabled = event.value - elif event.switch.id == "switch-cve-only": - self.profile.notify_only_cve = event.value - - self.manager.try_save_profiles(notify=False) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - self.dismiss(None) - -class ShowLogsPushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - self.logs = [] - - def compose(self) -> ComposeResult: - with Container(classes="modal-window large-modal"): - yield Label(f"Logs: {self.profile_name}", classes="modal-header") - - yield Select([], prompt="Select date", id="scan-date-select") - - with VerticalScroll(classes="info-box"): - yield Pretty({}, id="log-content") - - with Horizontal(classes="modal-footer"): - yield Button("Close", id="cancel-button", variant="primary") - - def on_mount(self): - self.logs = self.manager.get_profile_logs(self.profile_name) - select = self.query_one("#scan-date-select", Select) - options = [] - - if self.logs: - for index, log in enumerate(reversed(self.logs)): - real_index = len(self.logs) - 1 - index - label = log.get('_timestamp', f"Scan #{real_index + 1} (no date)") - - options.append((str(label), real_index)) - - select.set_options(options) - - if options: - select.value = options[0][1] - - @on(Select.Changed, "#scan-date-select") - def on_date_selected(self, event: Select.Changed): - if event.value is not None: - index = event.value - if index == Select.BLANK: - self.query_one("#log-content", Pretty).update({}) - return - if 0 <= index < len(self.logs): - log_entry = self.logs[index] - self.query_one("#log-content", Pretty).update(log_entry) - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed) -> None: - self.dismiss(None) - - -class ConfirmDeletePushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.profile_name = profile_name - - def compose(self) -> ComposeResult: - with Container(classes="modal-window small-modal"): - yield Label("Are you sure?",classes="modal-header") - with Horizontal(classes="modal-footer"): - yield Button("Delete", id="confirm-button", variant="error") - yield Button("Cancel", id="cancel-button", variant="default") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - self.manager.delete_profile(self.profile_name) - self.dismiss(None) - else: - self.dismiss(None) diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py deleted file mode 100755 index 868fa74..0000000 --- a/src/netmonitor/front/scanner_tab.py +++ /dev/null @@ -1,277 +0,0 @@ -from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty
-from textual.widgets.selection_list import Selection
-from textual.containers import Container, Vertical, Horizontal, VerticalScroll
-from textual.events import Mount
-
-import psutil, ipaddress, shlex
-from typing import List, Dict, Optional
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-from .scanner_tab_pushscreens import SaveProfilePushScreen
-
-class ScannerTab(Container):
-
- def __init__(self, manager: ScannerProfilesManager):
- super().__init__()
- self.manager = manager
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- friendly_section = Container(id="friendly-command-section", classes="section card")
- friendly_section.border_title = "Scanner Configuration"
-
- with friendly_section:
- yield Label("Interface:", classes="label")
- try:
- interfaces = list(psutil.net_if_addrs().keys())
- except Exception:
- interfaces = []
-
- yield Select.from_values(
- interfaces,
- id="interface-select",
- allow_blank=False,
- classes="input"
- )
-
- yield Label("IP address:", classes="label")
- yield Input(
- value="127.0.0.1/32",
- placeholder="192.168.0.0/24",
- name="ip-input",
- id="ip",
- classes="input"
- )
-
- yield Label("Port range (or single port)", classes="label")
- with Horizontal(id="port-range", classes="input-row"):
- yield Input(placeholder="Start / Single", id="low-port-range", classes="input half")
- yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half")
-
- yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox")
-
- yield Label("Scan options:", classes="label")
- with Horizontal(classes="selection-row"):
- yield SelectionList[str](
- Selection("Service version (-sV)", "-sV", False),
- Selection("OS detection (-O)", "-O", False),
- Selection("SYN scan (-sS)", "-sS", False),
- Selection("UDP scan (-sU)", "-sU", False),
- Selection("Fast scan (-F)", "-F", False),
- Selection("Aggressive scan (-A)", "-A", False),
- Selection("Ping skip (-Pn)", "-Pn", False),
- Selection("Timing T4 (-T4)", "-T4", True),
- classes="selection-list"
- )
-
- user_section = Container(id="user-command-section", classes="section card")
- user_section.border_title = "Manual Command"
-
- with user_section:
- yield Input(
- placeholder="sudo nmap -sS 192.168.1.1",
- name="user-nmap-input",
- id="usercommand",
- classes="input full"
- )
-
-
- final_section = Container(id="final-section", classes="section card")
- final_section.border_title = "Execution"
-
- with final_section:
- with Vertical(id="final-command", classes="command-output"):
- self.final_command = Pretty([], id="final-command-output")
- yield self.final_command
- with Vertical(id="buttons", classes="buttons-column"):
- yield Button("Scan now", id="scan-button", variant="success")
- yield Button("Save profile", id="save-button", variant="primary")
-
- results_section = Container(id="results-section", classes="section card")
- results_section.border_title = "Scan Output"
-
- with results_section:
- with VerticalScroll(id="results-scroll"):
- self.results = Pretty([], id="results", classes="results")
- yield self.results
-
- def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]:
- errors: List[str] = []
-
- if self.query_one("#cve-checkbox", Checkbox).value:
- args = nmap_input.get("arguments", "")
- if "-sV" not in args.split():
- errors.append("To check for CVE's, '-sV' option is required")
- return errors
-
- targets = nmap_input.get("targets", "").strip()
- if not targets:
- errors.append("IP field cannot be empty")
- return errors
-
- if not self.query_one("#usercommand", Input).value.strip():
- try:
- ipaddress.ip_network(targets, strict=False)
- except ValueError:
- errors.append(f"Invalid IP address/CIDR: {targets}")
- return errors
-
- if "ports" in nmap_input:
- port_str = nmap_input["ports"]
- if "-" in port_str:
- try:
- low, high = map(int, port_str.split("-"))
- if not (1 <= low <= 65535 and 1 <= high <= 65535):
- errors.append("Ports must be in range 1-65535")
- if low > high:
- errors.append("Start port cannot be greater than end port")
- except ValueError:
- errors.append("Ports (range) must be numbers")
- else:
- try:
- port = int(port_str)
- if not (1 <= port <= 65535):
- errors.append("Port must be in range 1-65535")
- except ValueError:
- errors.append("Port must be a number")
-
- dry_run_err = self.dry_run_nmap_command(nmap_input)
- if dry_run_err:
- errors.append(dry_run_err)
- return errors
-
- def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]:
- try:
- cmd = ["nmap"]
- if "arguments" in nmap_input:
- cmd += shlex.split(nmap_input["arguments"])
- if "ports" in nmap_input:
- cmd += ["-p", nmap_input["ports"]]
- cmd.append(nmap_input["targets"])
- return None
- except Exception as e:
- return f"Błąd weryfikacji komendy: {e}"
-
- def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict:
- user_command = self.query_one("#usercommand", Input).value.strip()
- low_port = self.query_one("#low-port-range", Input).value.strip()
- high_port = self.query_one("#high-port-range", Input).value.strip()
- interface = self.query_one("#interface-select", Select).value
- targets = self.query_one("#ip", Input).value.strip()
- options = self.query_one(SelectionList).selected
-
- if return_dict:
- if user_command:
- parts = shlex.split(user_command)
-
- if parts and parts[0] == "sudo": parts.pop(0)
- if parts and parts[0] == "nmap": parts.pop(0)
-
- if not parts:
- return {}
-
- target = parts[-1]
- args = parts[:-1]
-
- return {
- "targets": target,
- "arguments": " ".join(args)
- }
- else:
- nmap_dict = {"targets": targets}
-
- if low_port:
- if high_port:
- nmap_dict["ports"] = f"{low_port}-{high_port}"
- else:
- nmap_dict["ports"] = f"{low_port}"
-
- args: list[str] = []
- if options:
- args.extend(options)
- if interface:
- args.extend(["-e", str(interface)])
- if args:
- nmap_dict["arguments"] = " ".join(args)
-
- return {k: v for k, v in nmap_dict.items() if v is not None}
-
- if user_command:
- return shlex.split(user_command)
-
- parts = ["nmap"]
- if interface:
- parts.extend(["-e", str(interface)])
- if options:
- parts.extend(options)
-
- if low_port:
- if high_port:
- parts.extend(["-p", f"{low_port}-{high_port}"])
- else:
- parts.extend(["-p", f"{low_port}"])
-
- parts.append(targets)
- return parts
-
- @on(Mount)
- @on(SelectionList.SelectedChanged)
- @on(Input.Changed)
- @on(Select.Changed)
- def update_selected_view(self, event=None) -> None:
- self.final_command.update(" ".join(self.get_nmap_input()))
-
- @on(Button.Pressed, "#scan-button")
- async def handle_scan_button(self, event: Button.Pressed) -> None:
- nmap_input = self.get_nmap_input(return_dict=True)
- if not isinstance(nmap_input, dict):
- self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
- return
- errors = self.validate_inputs(nmap_input)
- if errors:
- self.notify("\n".join(errors), title="Bad Input", severity="error")
- else:
- try:
- temp_prof_name = "temp_scan_profile"
- self.notify("Start scanning", title="Notification", severity="information")
-
- if not self.manager.add_profile(temp_prof_name, notify=False):
- self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error")
- return
-
- if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False):
- self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error")
- self.manager.delete_profile(temp_prof_name, notify=False)
- return
-
- profile = self.manager.get_profile(temp_prof_name)
- if profile:
- scan_now_results = profile.scan()
- self.results.update(scan_now_results)
- self.manager.delete_profile(temp_prof_name, notify=False)
- self.notify("Scanning completed", title="Notification", severity="information")
- else:
- self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error")
-
- except Exception as e:
- self.notify(f"Scanner error: {e}", title="Error", severity="error")
-
- @on(Button.Pressed, "#save-button")
- async def handle_save_button(self, event: Button.Pressed):
- nmap_input = self.get_nmap_input(return_dict=True)
- if not isinstance(nmap_input, dict):
- self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
- return
-
- errors = self.validate_inputs(nmap_input)
- if errors:
- self.notify("\n".join(errors), title="Bad Input", severity="error")
- return
-
- cve_check = self.query_one("#cve-checkbox", Checkbox).value
- self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check))
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/scanner_tab_pushscreens.py b/src/netmonitor/front/scanner_tab_pushscreens.py deleted file mode 100644 index 4e4a564..0000000 --- a/src/netmonitor/front/scanner_tab_pushscreens.py +++ /dev/null @@ -1,35 +0,0 @@ -from textual import on -from textual.app import ComposeResult -from textual.widgets import Input, Button, Label -from textual.containers import Vertical, Horizontal -from textual.screen import ModalScreen - -from ..back.scanner_profiles_manager import ScannerProfilesManager - -class SaveProfilePushScreen(ModalScreen[str]): - def __init__(self, manager: ScannerProfilesManager, nmap_input, cve_check, *args, **kwargs): - super().__init__(*args, **kwargs) - self.manager = manager - self.nmap_input = nmap_input - self.cve_check = cve_check - - - def compose(self) -> ComposeResult: - with Vertical(classes="modal-window small-modal"): - yield Label("Save scan profile", classes="modal-header") - yield Input(placeholder="Profile name:", id="profile-name", classes="input") - with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"): - yield Button("Confirm", id="confirm-button", variant="success", classes="button") - yield Button("Cancel", id="cancel-button", variant="error", classes="button") - - @on(Button.Pressed) - async def on_button_pressed(self, event: Button.Pressed): - if event.button.id == "confirm-button": - profile_name = self.query_one("#profile-name", Input).value.strip() - self.manager.add_profile(profile_name) - self.manager.update_profile(profile_name,"nmap_input",self.nmap_input,notify=False) - self.manager.update_profile(profile_name,"cve",self.cve_check, notify=False) - self.dismiss(None) - else: - self.dismiss(None) - |