From c2f5fbe7fb93ce420caf23c5c0e06144cf953bb8 Mon Sep 17 00:00:00 2001 From: EnricoGuccii Date: Sat, 10 Jan 2026 22:12:49 +0100 Subject: yy --- .../detector_profiles_tab.cpython-312.pyc | Bin 0 -> 5924 bytes ...tector_profiles_tab_pushscreens.cpython-312.pyc | Bin 0 -> 13721 bytes .../front/__pycache__/detector_tab.cpython-312.pyc | Bin 0 -> 7672 bytes .../detector_tab_pushscreens.cpython-312.pyc | Bin 0 -> 2798 bytes .../front/__pycache__/options_tab.cpython-312.pyc | Bin 0 -> 3442 bytes .../scanner_profiles_tab.cpython-312.pyc | Bin 0 -> 6552 bytes ...canner_profiles_tab_pushscreens.cpython-312.pyc | Bin 0 -> 14133 bytes .../front/__pycache__/scanner_tab.cpython-312.pyc | Bin 0 -> 14995 bytes .../scanner_tab_pushscreens.cpython-312.pyc | Bin 0 -> 3075 bytes src/netmonitor/front/detector_profiles_tab.py | 77 ++++++ .../front/detector_profiles_tab_pushscreens.py | 187 ++++++++++++++ src/netmonitor/front/detector_tab.py | 135 ++++++++++ src/netmonitor/front/detector_tab_pushscreens.py | 31 +++ src/netmonitor/front/options_tab.py | 44 ++++ src/netmonitor/front/scanner_profiles_tab.py | 84 +++++++ .../front/scanner_profiles_tab_pushscreens.py | 180 +++++++++++++ src/netmonitor/front/scanner_tab.py | 277 +++++++++++++++++++++ src/netmonitor/front/scanner_tab_pushscreens.py | 35 +++ 18 files changed, 1050 insertions(+) create mode 100644 src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc create mode 100644 src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc create mode 100644 src/netmonitor/front/detector_profiles_tab.py create mode 100644 src/netmonitor/front/detector_profiles_tab_pushscreens.py create mode 100644 src/netmonitor/front/detector_tab.py create mode 100644 src/netmonitor/front/detector_tab_pushscreens.py create mode 100644 src/netmonitor/front/options_tab.py create mode 100644 src/netmonitor/front/scanner_profiles_tab.py create mode 100644 src/netmonitor/front/scanner_profiles_tab_pushscreens.py create mode 100755 src/netmonitor/front/scanner_tab.py create mode 100644 src/netmonitor/front/scanner_tab_pushscreens.py (limited to 'src/netmonitor/front') diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc new file mode 100644 index 0000000..6a32fac Binary files /dev/null and b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc new file mode 100644 index 0000000..a74ee0c Binary files /dev/null and b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc new file mode 100644 index 0000000..0257359 Binary files /dev/null and b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc new file mode 100644 index 0000000..8672061 Binary files /dev/null and b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc new file mode 100644 index 0000000..227bfa8 Binary files /dev/null and b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc new file mode 100644 index 0000000..d6fcdfb Binary files /dev/null and b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc new file mode 100644 index 0000000..a4db183 Binary files /dev/null and b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc new file mode 100644 index 0000000..8a8d03a Binary files /dev/null and b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc differ diff --git a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc new file mode 100644 index 0000000..28cc805 Binary files /dev/null and b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc differ diff --git a/src/netmonitor/front/detector_profiles_tab.py b/src/netmonitor/front/detector_profiles_tab.py new file mode 100644 index 0000000..7860e16 --- /dev/null +++ b/src/netmonitor/front/detector_profiles_tab.py @@ -0,0 +1,77 @@ +from textual import on +from textual.app import ComposeResult +from textual.widgets import Button, Label, Switch +from textual.containers import Vertical, Horizontal, VerticalScroll + +from ..back.detector_profiles_manager import DetectorProfilesManager +from .detector_profiles_tab_pushscreens import ConfirmDeletePushScreen, ShowLogsPushScreen, ShowProfilePushScreen, SetDetectorNotificationPushScreen + +class DetectorProfilesTab(Vertical): + def __init__(self, manager: DetectorProfilesManager, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.manager.on_refresh = self.refresh_profiles + self.manager.on_message = self.on_manager_message + + def compose(self) -> ComposeResult: + yield VerticalScroll(id="profiles-list") + + def on_mount(self) -> None: + self.refresh_profiles() + + def refresh_profiles(self) -> None: + profiles_list = self.query_one("#profiles-list", VerticalScroll) + profiles_list.remove_children() + + if not self.manager.profiles: + return + + for profile in self.manager.profiles: + row = Horizontal( + Switch(id=f"activate-profile-switch-{profile.profile_name}", value=profile.is_active), + Label(f"{profile.profile_name}", classes="profile-profile_name"), + Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="primary"), + Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="default"), + Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"), + Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action button-delete", variant="error"), + classes="profile-row" + ) + profiles_list.mount(row) + + @on(Switch.Changed) + def any_switch_changed(self, event: Switch.Changed) -> None: + switch_id = event.switch.id + if not switch_id: + return + elif switch_id.startswith("activate-profile-switch"): + profile_name = switch_id.removeprefix("activate-profile-switch-") + + if event.switch.value: + is_turned_on = self.manager.turn_on_profile(profile_name) + if not is_turned_on: + event.switch.value = False + else: + is_turned_off = self.manager.turn_off_profile(profile_name) + if not is_turned_off: + event.switch.value = False + + @on(Button.Pressed) + def on_any_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id + if not button_id: + return + elif button_id.startswith("show-profile-button-"): + profile_name = button_id.removeprefix("show-profile-button-") + self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name)) + elif button_id.startswith("set-notifications-button-"): + profile_name = button_id.removeprefix("set-notifications-button-") + self.app.push_screen(SetDetectorNotificationPushScreen(self.manager, profile_name)) + elif button_id.startswith("show-logs-button-"): + profile_name = button_id.removeprefix("show-logs-button-") + self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name)) + elif button_id.startswith("delete-"): + profile_name = button_id.removeprefix("delete-") + self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name)) + + def on_manager_message(self, msg: str, title: str, severity): + self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/detector_profiles_tab_pushscreens.py b/src/netmonitor/front/detector_profiles_tab_pushscreens.py new file mode 100644 index 0000000..fc5b731 --- /dev/null +++ b/src/netmonitor/front/detector_profiles_tab_pushscreens.py @@ -0,0 +1,187 @@ +from textual import on +from textual.app import ComposeResult +from textual.screen import ModalScreen +from textual.widgets import Button, Label, Pretty, DataTable, Switch +from textual.containers import Vertical, Horizontal, VerticalScroll, Container +from textual_plotext import PlotextPlot + +from datetime import datetime + +from ..back.detector_profiles_manager import DetectorProfilesManager +from ..back.detector_profile_HST import DetectorProfileHST + +class PlotTab(Container): + def __init__(self, profile: DetectorProfileHST, *args, **kwargs): + super().__init__(*args, **kwargs) + self.profile = profile + self.classes = "plot-card" + + def compose(self): + yield PlotextPlot() + + def on_mount(self): + self.set_interval(1, self.update_plot) + + def update_plot(self): + plot_widget = self.query_one(PlotextPlot) + plt = plot_widget.plt + + y = list(getattr(self.profile, "plot_data", [])) + + plt.clear_figure() + plt.theme("dark") + + plt.plot(y, marker="dot", color="green") + plt.title("Anomaly Score") + plt.ylabel("last 30 windows") + plt.ylim(0, 1) + + threshold = self.profile.params.get("threshold", 0.7) + if threshold is not None: + plt.horizontal_line(float(threshold), color="red") + + + plot_widget.refresh() + + +class ShowProfilePushScreen(ModalScreen[str]): + def __init__(self, manager, profile_name: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + self.profile = self.manager.get_profile(profile_name) + + def compose(self) -> ComposeResult: + with Container(classes="modal-window large-modal"): + yield Label(f"Profile: {self.profile_name}", classes="modal-header") + + with Horizontal(classes="modal-split-container"): + + with Vertical(classes="left-panel"): + yield PlotTab(self.profile) + + with Vertical(classes="right-panel"): + yield Label("Runtime Stats (Live)", classes="section-header") + with VerticalScroll(classes="info-box", id="stats-box"): + yield Pretty({}, id="runtime-stats-pretty") + + yield Label("Configuration", classes="section-header") + with VerticalScroll(classes="info-box"): + yield Pretty(self.profile.to_dict()) + + with Container(classes="modal-footer"): + yield Button("Close", id="cancel-button", variant="primary") + + def on_mount(self): + self.set_interval(1.0, self.update_stats) + self.update_stats() + + def update_stats(self): + if self.profile: + stats = self.profile.get_runtime_stats() + self.query_one("#runtime-stats-pretty", Pretty).update(stats) + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + self.dismiss(None) + + +class ShowLogsPushScreen(ModalScreen[str]): + def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + + def compose(self) -> ComposeResult: + with Container(classes="modal-window medium-modal"): + yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header") + + with Container(classes="table-container"): + yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row") + + with Horizontal(classes="modal-footer"): + yield Button("Clear History", id="clear-button", variant="warning") + yield Button("Close", id="cancel-button", variant="primary") + + + def on_mount(self): + table = self.query_one("#logs_table", DataTable) + table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict") + + logs = self.manager.get_profile_logs(self.profile_name) + + if not logs: + return + + sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True) + + for log in sorted_logs: + dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S") + + score = f"{log.get('score', 0):.4f}" + rate = f"{log.get('pkt_rate', 0):.1f}" + proto = str(log.get("proto_info", "-")) + verdict = "ANOMALY" + + table.add_row(dt, score, rate, proto, verdict) + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed) -> None: + if event.button.id == "clear-button": + profile = self.manager.get_profile(self.profile_name) + if profile: + profile.clear_logs() + self.query_one("#logs_table", DataTable).clear() + else: + self.dismiss(None) + +class SetDetectorNotificationPushScreen(ModalScreen[str]): + def __init__(self, manager, profile_name: str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + self.profile = self.manager.get_profile(profile_name) + + def compose(self) -> ComposeResult: + current_val = getattr(self.profile, 'notify_enabled', False) + + with Container(classes="modal-window small-modal"): + yield Label(f"Notification: {self.profile_name}", classes="modal-header") + + with Vertical(classes="section-card"): + yield Label("Enable notification (Discord)") + yield Switch(value=current_val, id="switch-anomaly") + + with Horizontal(classes="modal-footer"): + yield Button("Close", id="close-button", variant="primary") + + @on(Switch.Changed) + def on_switch_changed(self, event: Switch.Changed): + if event.switch.id == "switch-anomaly": + self.profile.notify_enabled = event.value + self.manager.try_save_profiles(notify=False) + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + self.dismiss(None) + +class ConfirmDeletePushScreen(ModalScreen[str]): + def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + + def compose(self) -> ComposeResult: + with Container(classes="modal-window small-modal"): + yield Label("Are you sure?",classes="modal-header") + with Horizontal(classes="modal-footer"): + yield Button("Delete", id="confirm-button", variant="error") + yield Button("Cancel", id="cancel-button", variant="default") + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "confirm-button": + self.manager.delete_profile(self.profile_name) + self.dismiss(None) + else: + self.dismiss(None) diff --git a/src/netmonitor/front/detector_tab.py b/src/netmonitor/front/detector_tab.py new file mode 100644 index 0000000..53dc908 --- /dev/null +++ b/src/netmonitor/front/detector_tab.py @@ -0,0 +1,135 @@ +from textual import on +from textual.app import ComposeResult +from textual.widgets import Input, Select, Button, Label, Checkbox +from textual.containers import Container, Horizontal, Vertical, VerticalScroll + +import psutil + +from ..back.detector_profiles_manager import DetectorProfilesManager +from ..back.window import FEATURE_LIST +from ..front.detector_tab_pushscreens import SaveProfilePushScreen + +class DetectorTab(Container): + def __init__(self, manager: DetectorProfilesManager, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.manager.on_message = self.on_manager_message + + def compose(self) -> ComposeResult: + with Horizontal(id="top-config-container"): + + model_section = Container(id="model-section", classes="section-card") + model_section.border_title = "Algorithm: HalfSpaceTrees" + with model_section: + with VerticalScroll(classes="detector-scroll"): + yield Label("Select interface:", classes="label") + try: + ifaces = list(psutil.net_if_addrs().keys()) + except: + ifaces = [] + + yield Select.from_values( + ifaces, + id="interface-select", + allow_blank=False, + classes="input" + ) + + yield Label("Model params:", classes="label") + yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input") + yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input") + yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input") + yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input") + yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input") + yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input") + yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input") + + features_section = Container(id="features-section", classes="section-card") + features_section.border_title = "Flow-based Features" + with features_section: + with VerticalScroll(classes="detector-scroll"): + yield Label("Select features to include in model:", classes="label") + self.feature_checkboxes = {} + for feat in FEATURE_LIST: + cb = Checkbox(feat, value=True, classes="input") + self.feature_checkboxes[feat] = cb + yield cb + + bpf_section = Container(id="bpf-section", classes="section-card") + bpf_section.border_title = "BPF Filter (Optional)" + with bpf_section: + yield Input( + placeholder="BPF Filter (e.g. 'tcp port 80 or udp')", + id="param-bpf_filter", + classes="input full" + ) + with Container(classes="save-button-container"): + yield Button("Save Profile", id="save-button", variant="success") + + def get_inputs(self): + features = [f for f, cb in self.feature_checkboxes.items() if cb.value] + + if not features: + raise ValueError("Select at least one feature.") + + params = {} + + try: + interface = self.query_one("#interface-select", Select).value + if not interface: + raise ValueError("Select interface.") + params["interface"] = interface + except Exception: + raise ValueError("Interface selection error.") + + bpf_input = self.query_one("#param-bpf_filter", Input) + if bpf_input.value.strip(): + params["bpf_filter"] = bpf_input.value.strip() + + defaults = { + "trees": 10, + "height": 8, + "window": 250, + "seed": 42, + "threshold": 0.7, + "window_duration": 10.0, + "queue_size": 10000, + "bpf_filter": "" + } + + model_section = self.query_one("#model-section") + for inp in model_section.query("Input"): + if inp.id and inp.id.startswith("param-"): + key = inp.id.removeprefix("param-") + if key == "bpf_filter": continue + + val_str = inp.value.strip() + + if not val_str: + if key in defaults and defaults[key] is not None: + params[key] = defaults[key] + continue + + try: + if key in ["trees", "height", "window", "seed","queue_size"]: + params[key] = int(val_str) + elif key in ["threshold", "window_duration"]: + params[key] = float(val_str) + except ValueError: + raise ValueError(f"Param '{key}' must be a number.") + + return { + "features": features, + "params": params, + } + + @on(Button.Pressed, "#save-button") + async def handle_save_button(self, event: Button.Pressed): + try: + input_data = self.get_inputs() + self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data)) + except ValueError as e: + self.app.notify(str(e), title="Validation error", severity="error") + + def on_manager_message(self, msg: str, title: str, severity): + self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/detector_tab_pushscreens.py b/src/netmonitor/front/detector_tab_pushscreens.py new file mode 100644 index 0000000..625921b --- /dev/null +++ b/src/netmonitor/front/detector_tab_pushscreens.py @@ -0,0 +1,31 @@ +from textual.app import ComposeResult +from textual.widgets import Input, Label, Button +from textual.containers import Vertical, Horizontal +from textual.screen import ModalScreen +from textual import on + +from ..back.detector_profiles_manager import DetectorProfilesManager + +class SaveProfilePushScreen(ModalScreen[str]): + def __init__(self, manager: DetectorProfilesManager, input_data , *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.input_data = input_data + + + def compose(self) -> ComposeResult: + with Vertical(classes="modal-window small-modal"): + yield Label("Save scan profile", classes="modal-header") + yield Input(placeholder="Profile name:", id="profile-name", classes="input") + with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"): + yield Button("Confirm", id="confirm-button", variant="success", classes="button") + yield Button("Cancel", id="cancel-button", variant="error", classes="button") + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "confirm-button": + profile_name = self.query_one("#profile-name", Input).value.strip() + self.manager.add_profile(profile_name,self.input_data) + self.dismiss(None) + else: + self.dismiss(None) diff --git a/src/netmonitor/front/options_tab.py b/src/netmonitor/front/options_tab.py new file mode 100644 index 0000000..6cbd448 --- /dev/null +++ b/src/netmonitor/front/options_tab.py @@ -0,0 +1,44 @@ +from textual.app import ComposeResult +from textual.containers import Container, Horizontal +from textual.widgets import Button, Input, Label +from textual import on + +from ..back.notification_service import notification_service + +class OptionsTab(Container): + def __init__(self, scanner_manager, detector_manager, *args, **kwargs): + super().__init__(*args, **kwargs) + self.scanner_manager = scanner_manager + self.detector_manager = detector_manager + + def compose(self) -> ComposeResult: + notify_section = Container(id="notify-section", classes="section-card") + notify_section.border_title = "Notification config" + with notify_section: + yield Label("Discord Webhook URL:") + yield Input(placeholder="https://discord.com/api/webhooks/...", id="input-webhook-url") + + with Horizontal(classes="modal-footer"): + yield Button("Save config", id="save-config", variant="success") + yield Button("notification test", id="test-notif", variant="primary") + + def on_mount(self): + self.query_one("#input-webhook-url", Input).value = notification_service.webhook_url + + @on(Button.Pressed, "#save-config") + def save_configuration(self): + url = self.query_one("#input-webhook-url", Input).value.strip() + + if notification_service.save_config(url): + self.app.notify("Config saved", severity="information") + else: + self.app.notify("Error during saving", severity="error") + + @on(Button.Pressed, "#test-notif") + def test_notification(self): + success = notification_service.send_message("**Test NetMonitor**\n") + + if success: + self.app.notify("good", severity="information") + else: + self.app.notify("bad", severity="error") diff --git a/src/netmonitor/front/scanner_profiles_tab.py b/src/netmonitor/front/scanner_profiles_tab.py new file mode 100644 index 0000000..8e8610c --- /dev/null +++ b/src/netmonitor/front/scanner_profiles_tab.py @@ -0,0 +1,84 @@ +from textual.widgets import Button, Label, Switch +from textual import on +from textual.app import ComposeResult +from textual.containers import Vertical, Horizontal, VerticalScroll + +from ..back.scanner_profiles_manager import ScannerProfilesManager +from .scanner_profiles_tab_pushscreens import ScanNowPushScreen, SetSchedulerPushScreen, ShowLogsPushScreen, SetNotifacationOptionsPushScreen, ShowProfilePushScreen, ConfirmDeletePushScreen + +class ScannerProfilesTab(Vertical): + def __init__(self, manager: ScannerProfilesManager, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.manager.on_refresh = self.refresh_profiles + self.manager.on_message = self.on_manager_message + + def compose(self) -> ComposeResult: + yield VerticalScroll(id="profiles-list") + + def on_mount(self) -> None: + self.refresh_profiles() + + def refresh_profiles(self) -> None: + profiles_list = self.query_one("#profiles-list", VerticalScroll) + profiles_list.remove_children() + + if not self.manager.profiles: + return + + for profile in self.manager.profiles: + row = Horizontal( + Switch(id=f"switch-{profile.profile_name}", value=profile.is_active), + Button("Scan Now", id=f"scan-now-button-{profile.profile_name}", classes="profile-action", variant="success"), + Label(f"{profile.profile_name}", classes="profile-profile_name"), + Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="primary"), + Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="default"), + Button("Set scheduler", id=f"set-scheduler-button-{profile.profile_name}", classes="profile-action", variant="default"), + Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"), + Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action", variant="error"), + classes="profile-row" + ) + profiles_list.mount(row) + + @on(Switch.Changed) + def switch_changed(self, event: Switch.Changed) -> None: + switch_id = event.switch.id + if not switch_id: + return + profile_name = switch_id.removeprefix("switch-") + + if event.switch.value: + is_turned_on = self.manager.turn_on_profile(profile_name) + if not is_turned_on: + event.switch.value = False + else: + is_turned_off = self.manager.turn_off_profile(profile_name) + if not is_turned_off: + event.switch.value = False + + @on(Button.Pressed) + def on_any_button_pressed(self, event: Button.Pressed) -> None: + button_id = event.button.id + if not button_id: + return + if button_id.startswith("scan-now-button-"): + profile_name = button_id.removeprefix("scan-now-button-") + self.app.push_screen(ScanNowPushScreen(self.manager, profile_name)) + elif button_id.startswith("show-profile-button-"): + profile_name = button_id.removeprefix("show-profile-button-") + self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name)) + elif button_id.startswith("set-scheduler-button-"): + profile_name = button_id.removeprefix("set-scheduler-button-") + self.app.push_screen(SetSchedulerPushScreen(self.manager, profile_name)) + elif button_id.startswith("set-notifications-button-"): + profile_name = button_id.removeprefix("set-notifications-button-") + self.app.push_screen(SetNotifacationOptionsPushScreen(self.manager, profile_name)) + elif button_id.startswith("delete-"): + profile_name = button_id.removeprefix("delete-") + self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name)) + elif button_id.startswith("show-logs-button-"): + profile_name = button_id.removeprefix("show-logs-button-") + self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name)) + + def on_manager_message(self, msg: str, title: str, severity): + self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py new file mode 100644 index 0000000..5be9941 --- /dev/null +++ b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py @@ -0,0 +1,180 @@ +from textual.app import ComposeResult +from textual import on +from textual.screen import ModalScreen +from textual.widgets import Input, Button, Label, Pretty, Select, Switch +from textual.containers import Vertical, Horizontal, VerticalScroll, Container + +from ..back.scanner_profiles_manager import ScannerProfilesManager + +class ScanNowPushScreen(ModalScreen[str]): + def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + + def compose(self) -> ComposeResult: + with Container(classes="modal-window large-modal"): + yield Label(f"Scanning: {self.profile_name}", classes="modal-header") + with VerticalScroll(classes="info-box"): + yield Pretty(self.manager.get_profile(self.profile_name).scan()) + with Horizontal(classes="modal-footer"): + yield Button("Close", variant="primary") + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed) -> None: + self.dismiss(None) + +class ShowProfilePushScreen(ModalScreen[str]): + def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + + def compose(self) -> ComposeResult: + with Container(classes="modal-window medium-modal"): + yield Label(f"Profile: {self.profile_name}", classes="modal-header") + with VerticalScroll(classes="info-box"): + yield Pretty(self.manager.get_profile(self.profile_name).to_dict()) + with Horizontal(classes="modal-footer"): + yield Button("Close", id="cancel-button", variant="primary") + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed) -> None: + self.dismiss(None) + + +class SetSchedulerPushScreen(ModalScreen[str]): + def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + + def compose(self) -> ComposeResult: + with Container(classes="modal-window small-modal"): + yield Label("Set CRON Scheduler", classes="modal-header") + yield Label("Format: min hour day month day_of_week", classes="label") + yield Input(placeholder="* * * * * ", id="cron-input", classes="input") + + with Horizontal(classes="modal-footer"): + yield Button("Confirm", id="confirm-button", variant="success") + yield Button("Cancel", id="cancel-button", variant="error") + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "confirm-button": + cron_input = self.query_one("#cron-input", Input).value.strip() + if cron_input: + self.manager.set_validated_scheduler(self.profile_name, cron_input) + self.dismiss(None) + else: + self.dismiss(None) + +class SetNotifacationOptionsPushScreen(ModalScreen[str]): + def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + self.profile = self.manager.get_profile(profile_name) + + def compose(self) -> ComposeResult: + current_enabled = getattr(self.profile, 'notify_enabled', False) + current_cve_only = getattr(self.profile, 'notify_only_cve', False) + + with Container(classes="modal-window small-modal"): + yield Label(f"Notifications: {self.profile_name}", classes="modal-header") + + with Vertical(classes="section-card"): + yield Label("Enable notifications (Discord):") + yield Switch(value=current_enabled, id="switch-enable") + + yield Label("Notify only when scanner finds CVE") + yield Switch(value=current_cve_only, id="switch-cve-only") + + with Horizontal(classes="modal-footer"): + yield Button("Close", id="close-button", variant="primary") + + @on(Switch.Changed) + def on_switch_changed(self, event: Switch.Changed): + if event.switch.id == "switch-enable": + self.profile.notify_enabled = event.value + elif event.switch.id == "switch-cve-only": + self.profile.notify_only_cve = event.value + + self.manager.try_save_profiles(notify=False) + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + self.dismiss(None) + +class ShowLogsPushScreen(ModalScreen[str]): + def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + self.logs = [] + + def compose(self) -> ComposeResult: + with Container(classes="modal-window large-modal"): + yield Label(f"Logs: {self.profile_name}", classes="modal-header") + + yield Select([], prompt="Select date", id="scan-date-select") + + with VerticalScroll(classes="info-box"): + yield Pretty({}, id="log-content") + + with Horizontal(classes="modal-footer"): + yield Button("Close", id="cancel-button", variant="primary") + + def on_mount(self): + self.logs = self.manager.get_profile_logs(self.profile_name) + select = self.query_one("#scan-date-select", Select) + options = [] + + if self.logs: + for index, log in enumerate(reversed(self.logs)): + real_index = len(self.logs) - 1 - index + label = log.get('_timestamp', f"Scan #{real_index + 1} (no date)") + + options.append((str(label), real_index)) + + select.set_options(options) + + if options: + select.value = options[0][1] + + @on(Select.Changed, "#scan-date-select") + def on_date_selected(self, event: Select.Changed): + if event.value is not None: + index = event.value + if index == Select.BLANK: + self.query_one("#log-content", Pretty).update({}) + return + if 0 <= index < len(self.logs): + log_entry = self.logs[index] + self.query_one("#log-content", Pretty).update(log_entry) + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed) -> None: + self.dismiss(None) + + +class ConfirmDeletePushScreen(ModalScreen[str]): + def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.profile_name = profile_name + + def compose(self) -> ComposeResult: + with Container(classes="modal-window small-modal"): + yield Label("Are you sure?",classes="modal-header") + with Horizontal(classes="modal-footer"): + yield Button("Delete", id="confirm-button", variant="error") + yield Button("Cancel", id="cancel-button", variant="default") + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "confirm-button": + self.manager.delete_profile(self.profile_name) + self.dismiss(None) + else: + self.dismiss(None) diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py new file mode 100755 index 0000000..868fa74 --- /dev/null +++ b/src/netmonitor/front/scanner_tab.py @@ -0,0 +1,277 @@ +from textual import on +from textual.app import ComposeResult +from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty +from textual.widgets.selection_list import Selection +from textual.containers import Container, Vertical, Horizontal, VerticalScroll +from textual.events import Mount + +import psutil, ipaddress, shlex +from typing import List, Dict, Optional + +from ..back.scanner_profiles_manager import ScannerProfilesManager +from .scanner_tab_pushscreens import SaveProfilePushScreen + +class ScannerTab(Container): + + def __init__(self, manager: ScannerProfilesManager): + super().__init__() + self.manager = manager + self.manager.on_message = self.on_manager_message + + def compose(self) -> ComposeResult: + friendly_section = Container(id="friendly-command-section", classes="section card") + friendly_section.border_title = "Scanner Configuration" + + with friendly_section: + yield Label("Interface:", classes="label") + try: + interfaces = list(psutil.net_if_addrs().keys()) + except Exception: + interfaces = [] + + yield Select.from_values( + interfaces, + id="interface-select", + allow_blank=False, + classes="input" + ) + + yield Label("IP address:", classes="label") + yield Input( + value="127.0.0.1/32", + placeholder="192.168.0.0/24", + name="ip-input", + id="ip", + classes="input" + ) + + yield Label("Port range (or single port)", classes="label") + with Horizontal(id="port-range", classes="input-row"): + yield Input(placeholder="Start / Single", id="low-port-range", classes="input half") + yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half") + + yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox") + + yield Label("Scan options:", classes="label") + with Horizontal(classes="selection-row"): + yield SelectionList[str]( + Selection("Service version (-sV)", "-sV", False), + Selection("OS detection (-O)", "-O", False), + Selection("SYN scan (-sS)", "-sS", False), + Selection("UDP scan (-sU)", "-sU", False), + Selection("Fast scan (-F)", "-F", False), + Selection("Aggressive scan (-A)", "-A", False), + Selection("Ping skip (-Pn)", "-Pn", False), + Selection("Timing T4 (-T4)", "-T4", True), + classes="selection-list" + ) + + user_section = Container(id="user-command-section", classes="section card") + user_section.border_title = "Manual Command" + + with user_section: + yield Input( + placeholder="sudo nmap -sS 192.168.1.1", + name="user-nmap-input", + id="usercommand", + classes="input full" + ) + + + final_section = Container(id="final-section", classes="section card") + final_section.border_title = "Execution" + + with final_section: + with Vertical(id="final-command", classes="command-output"): + self.final_command = Pretty([], id="final-command-output") + yield self.final_command + with Vertical(id="buttons", classes="buttons-column"): + yield Button("Scan now", id="scan-button", variant="success") + yield Button("Save profile", id="save-button", variant="primary") + + results_section = Container(id="results-section", classes="section card") + results_section.border_title = "Scan Output" + + with results_section: + with VerticalScroll(id="results-scroll"): + self.results = Pretty([], id="results", classes="results") + yield self.results + + def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]: + errors: List[str] = [] + + if self.query_one("#cve-checkbox", Checkbox).value: + args = nmap_input.get("arguments", "") + if "-sV" not in args.split(): + errors.append("To check for CVE's, '-sV' option is required") + return errors + + targets = nmap_input.get("targets", "").strip() + if not targets: + errors.append("IP field cannot be empty") + return errors + + if not self.query_one("#usercommand", Input).value.strip(): + try: + ipaddress.ip_network(targets, strict=False) + except ValueError: + errors.append(f"Invalid IP address/CIDR: {targets}") + return errors + + if "ports" in nmap_input: + port_str = nmap_input["ports"] + if "-" in port_str: + try: + low, high = map(int, port_str.split("-")) + if not (1 <= low <= 65535 and 1 <= high <= 65535): + errors.append("Ports must be in range 1-65535") + if low > high: + errors.append("Start port cannot be greater than end port") + except ValueError: + errors.append("Ports (range) must be numbers") + else: + try: + port = int(port_str) + if not (1 <= port <= 65535): + errors.append("Port must be in range 1-65535") + except ValueError: + errors.append("Port must be a number") + + dry_run_err = self.dry_run_nmap_command(nmap_input) + if dry_run_err: + errors.append(dry_run_err) + return errors + + def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]: + try: + cmd = ["nmap"] + if "arguments" in nmap_input: + cmd += shlex.split(nmap_input["arguments"]) + if "ports" in nmap_input: + cmd += ["-p", nmap_input["ports"]] + cmd.append(nmap_input["targets"]) + return None + except Exception as e: + return f"Błąd weryfikacji komendy: {e}" + + def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict: + user_command = self.query_one("#usercommand", Input).value.strip() + low_port = self.query_one("#low-port-range", Input).value.strip() + high_port = self.query_one("#high-port-range", Input).value.strip() + interface = self.query_one("#interface-select", Select).value + targets = self.query_one("#ip", Input).value.strip() + options = self.query_one(SelectionList).selected + + if return_dict: + if user_command: + parts = shlex.split(user_command) + + if parts and parts[0] == "sudo": parts.pop(0) + if parts and parts[0] == "nmap": parts.pop(0) + + if not parts: + return {} + + target = parts[-1] + args = parts[:-1] + + return { + "targets": target, + "arguments": " ".join(args) + } + else: + nmap_dict = {"targets": targets} + + if low_port: + if high_port: + nmap_dict["ports"] = f"{low_port}-{high_port}" + else: + nmap_dict["ports"] = f"{low_port}" + + args: list[str] = [] + if options: + args.extend(options) + if interface: + args.extend(["-e", str(interface)]) + if args: + nmap_dict["arguments"] = " ".join(args) + + return {k: v for k, v in nmap_dict.items() if v is not None} + + if user_command: + return shlex.split(user_command) + + parts = ["nmap"] + if interface: + parts.extend(["-e", str(interface)]) + if options: + parts.extend(options) + + if low_port: + if high_port: + parts.extend(["-p", f"{low_port}-{high_port}"]) + else: + parts.extend(["-p", f"{low_port}"]) + + parts.append(targets) + return parts + + @on(Mount) + @on(SelectionList.SelectedChanged) + @on(Input.Changed) + @on(Select.Changed) + def update_selected_view(self, event=None) -> None: + self.final_command.update(" ".join(self.get_nmap_input())) + + @on(Button.Pressed, "#scan-button") + async def handle_scan_button(self, event: Button.Pressed) -> None: + nmap_input = self.get_nmap_input(return_dict=True) + if not isinstance(nmap_input, dict): + self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error") + return + errors = self.validate_inputs(nmap_input) + if errors: + self.notify("\n".join(errors), title="Bad Input", severity="error") + else: + try: + temp_prof_name = "temp_scan_profile" + self.notify("Start scanning", title="Notification", severity="information") + + if not self.manager.add_profile(temp_prof_name, notify=False): + self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error") + return + + if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False): + self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error") + self.manager.delete_profile(temp_prof_name, notify=False) + return + + profile = self.manager.get_profile(temp_prof_name) + if profile: + scan_now_results = profile.scan() + self.results.update(scan_now_results) + self.manager.delete_profile(temp_prof_name, notify=False) + self.notify("Scanning completed", title="Notification", severity="information") + else: + self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error") + + except Exception as e: + self.notify(f"Scanner error: {e}", title="Error", severity="error") + + @on(Button.Pressed, "#save-button") + async def handle_save_button(self, event: Button.Pressed): + nmap_input = self.get_nmap_input(return_dict=True) + if not isinstance(nmap_input, dict): + self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error") + return + + errors = self.validate_inputs(nmap_input) + if errors: + self.notify("\n".join(errors), title="Bad Input", severity="error") + return + + cve_check = self.query_one("#cve-checkbox", Checkbox).value + self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check)) + + def on_manager_message(self, msg: str, title: str, severity): + self.app.notify(message=msg, title=title, severity=severity) diff --git a/src/netmonitor/front/scanner_tab_pushscreens.py b/src/netmonitor/front/scanner_tab_pushscreens.py new file mode 100644 index 0000000..4e4a564 --- /dev/null +++ b/src/netmonitor/front/scanner_tab_pushscreens.py @@ -0,0 +1,35 @@ +from textual import on +from textual.app import ComposeResult +from textual.widgets import Input, Button, Label +from textual.containers import Vertical, Horizontal +from textual.screen import ModalScreen + +from ..back.scanner_profiles_manager import ScannerProfilesManager + +class SaveProfilePushScreen(ModalScreen[str]): + def __init__(self, manager: ScannerProfilesManager, nmap_input, cve_check, *args, **kwargs): + super().__init__(*args, **kwargs) + self.manager = manager + self.nmap_input = nmap_input + self.cve_check = cve_check + + + def compose(self) -> ComposeResult: + with Vertical(classes="modal-window small-modal"): + yield Label("Save scan profile", classes="modal-header") + yield Input(placeholder="Profile name:", id="profile-name", classes="input") + with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"): + yield Button("Confirm", id="confirm-button", variant="success", classes="button") + yield Button("Cancel", id="cancel-button", variant="error", classes="button") + + @on(Button.Pressed) + async def on_button_pressed(self, event: Button.Pressed): + if event.button.id == "confirm-button": + profile_name = self.query_one("#profile-name", Input).value.strip() + self.manager.add_profile(profile_name) + self.manager.update_profile(profile_name,"nmap_input",self.nmap_input,notify=False) + self.manager.update_profile(profile_name,"cve",self.cve_check, notify=False) + self.dismiss(None) + else: + self.dismiss(None) + -- cgit v1.2.3