summaryrefslogtreecommitdiff
path: root/src/netmonitor/front
diff options
context:
space:
mode:
Diffstat (limited to 'src/netmonitor/front')
-rw-r--r--src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pycbin5924 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pycbin13721 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_tab.cpython-312.pycbin7672 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pycbin2798 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/options_tab.cpython-312.pycbin3442 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pycbin6552 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pycbin14133 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pycbin14995 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pycbin3075 -> 0 bytes
-rw-r--r--src/netmonitor/front/detector_profiles_tab.py77
-rw-r--r--src/netmonitor/front/detector_profiles_tab_pushscreens.py187
-rw-r--r--src/netmonitor/front/detector_tab.py135
-rw-r--r--src/netmonitor/front/detector_tab_pushscreens.py31
-rw-r--r--src/netmonitor/front/options_tab.py44
-rw-r--r--src/netmonitor/front/scanner_profiles_tab.py84
-rw-r--r--src/netmonitor/front/scanner_profiles_tab_pushscreens.py180
-rwxr-xr-xsrc/netmonitor/front/scanner_tab.py277
-rw-r--r--src/netmonitor/front/scanner_tab_pushscreens.py35
18 files changed, 0 insertions, 1050 deletions
diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc
deleted file mode 100644
index 6a32fac..0000000
--- a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index a74ee0c..0000000
--- a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc
deleted file mode 100644
index 0257359..0000000
--- a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index 8672061..0000000
--- a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc
deleted file mode 100644
index 227bfa8..0000000
--- a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc
deleted file mode 100644
index d6fcdfb..0000000
--- a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index a4db183..0000000
--- a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc
deleted file mode 100644
index 8a8d03a..0000000
--- a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index 28cc805..0000000
--- a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/detector_profiles_tab.py b/src/netmonitor/front/detector_profiles_tab.py
deleted file mode 100644
index 7860e16..0000000
--- a/src/netmonitor/front/detector_profiles_tab.py
+++ /dev/null
@@ -1,77 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Button, Label, Switch
-from textual.containers import Vertical, Horizontal, VerticalScroll
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-from .detector_profiles_tab_pushscreens import ConfirmDeletePushScreen, ShowLogsPushScreen, ShowProfilePushScreen, SetDetectorNotificationPushScreen
-
-class DetectorProfilesTab(Vertical):
- def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.manager.on_refresh = self.refresh_profiles
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- yield VerticalScroll(id="profiles-list")
-
- def on_mount(self) -> None:
- self.refresh_profiles()
-
- def refresh_profiles(self) -> None:
- profiles_list = self.query_one("#profiles-list", VerticalScroll)
- profiles_list.remove_children()
-
- if not self.manager.profiles:
- return
-
- for profile in self.manager.profiles:
- row = Horizontal(
- Switch(id=f"activate-profile-switch-{profile.profile_name}", value=profile.is_active),
- Label(f"{profile.profile_name}", classes="profile-profile_name"),
- Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="primary"),
- Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action button-delete", variant="error"),
- classes="profile-row"
- )
- profiles_list.mount(row)
-
- @on(Switch.Changed)
- def any_switch_changed(self, event: Switch.Changed) -> None:
- switch_id = event.switch.id
- if not switch_id:
- return
- elif switch_id.startswith("activate-profile-switch"):
- profile_name = switch_id.removeprefix("activate-profile-switch-")
-
- if event.switch.value:
- is_turned_on = self.manager.turn_on_profile(profile_name)
- if not is_turned_on:
- event.switch.value = False
- else:
- is_turned_off = self.manager.turn_off_profile(profile_name)
- if not is_turned_off:
- event.switch.value = False
-
- @on(Button.Pressed)
- def on_any_button_pressed(self, event: Button.Pressed) -> None:
- button_id = event.button.id
- if not button_id:
- return
- elif button_id.startswith("show-profile-button-"):
- profile_name = button_id.removeprefix("show-profile-button-")
- self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name))
- elif button_id.startswith("set-notifications-button-"):
- profile_name = button_id.removeprefix("set-notifications-button-")
- self.app.push_screen(SetDetectorNotificationPushScreen(self.manager, profile_name))
- elif button_id.startswith("show-logs-button-"):
- profile_name = button_id.removeprefix("show-logs-button-")
- self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name))
- elif button_id.startswith("delete-"):
- profile_name = button_id.removeprefix("delete-")
- self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name))
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/detector_profiles_tab_pushscreens.py b/src/netmonitor/front/detector_profiles_tab_pushscreens.py
deleted file mode 100644
index fc5b731..0000000
--- a/src/netmonitor/front/detector_profiles_tab_pushscreens.py
+++ /dev/null
@@ -1,187 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.screen import ModalScreen
-from textual.widgets import Button, Label, Pretty, DataTable, Switch
-from textual.containers import Vertical, Horizontal, VerticalScroll, Container
-from textual_plotext import PlotextPlot
-
-from datetime import datetime
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-from ..back.detector_profile_HST import DetectorProfileHST
-
-class PlotTab(Container):
- def __init__(self, profile: DetectorProfileHST, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.profile = profile
- self.classes = "plot-card"
-
- def compose(self):
- yield PlotextPlot()
-
- def on_mount(self):
- self.set_interval(1, self.update_plot)
-
- def update_plot(self):
- plot_widget = self.query_one(PlotextPlot)
- plt = plot_widget.plt
-
- y = list(getattr(self.profile, "plot_data", []))
-
- plt.clear_figure()
- plt.theme("dark")
-
- plt.plot(y, marker="dot", color="green")
- plt.title("Anomaly Score")
- plt.ylabel("last 30 windows")
- plt.ylim(0, 1)
-
- threshold = self.profile.params.get("threshold", 0.7)
- if threshold is not None:
- plt.horizontal_line(float(threshold), color="red")
-
-
- plot_widget.refresh()
-
-
-class ShowProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager, profile_name: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.profile = self.manager.get_profile(profile_name)
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window large-modal"):
- yield Label(f"Profile: {self.profile_name}", classes="modal-header")
-
- with Horizontal(classes="modal-split-container"):
-
- with Vertical(classes="left-panel"):
- yield PlotTab(self.profile)
-
- with Vertical(classes="right-panel"):
- yield Label("Runtime Stats (Live)", classes="section-header")
- with VerticalScroll(classes="info-box", id="stats-box"):
- yield Pretty({}, id="runtime-stats-pretty")
-
- yield Label("Configuration", classes="section-header")
- with VerticalScroll(classes="info-box"):
- yield Pretty(self.profile.to_dict())
-
- with Container(classes="modal-footer"):
- yield Button("Close", id="cancel-button", variant="primary")
-
- def on_mount(self):
- self.set_interval(1.0, self.update_stats)
- self.update_stats()
-
- def update_stats(self):
- if self.profile:
- stats = self.profile.get_runtime_stats()
- self.query_one("#runtime-stats-pretty", Pretty).update(stats)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- self.dismiss(None)
-
-
-class ShowLogsPushScreen(ModalScreen[str]):
- def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window medium-modal"):
- yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header")
-
- with Container(classes="table-container"):
- yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Clear History", id="clear-button", variant="warning")
- yield Button("Close", id="cancel-button", variant="primary")
-
-
- def on_mount(self):
- table = self.query_one("#logs_table", DataTable)
- table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict")
-
- logs = self.manager.get_profile_logs(self.profile_name)
-
- if not logs:
- return
-
- sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True)
-
- for log in sorted_logs:
- dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S")
-
- score = f"{log.get('score', 0):.4f}"
- rate = f"{log.get('pkt_rate', 0):.1f}"
- proto = str(log.get("proto_info", "-"))
- verdict = "ANOMALY"
-
- table.add_row(dt, score, rate, proto, verdict)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- if event.button.id == "clear-button":
- profile = self.manager.get_profile(self.profile_name)
- if profile:
- profile.clear_logs()
- self.query_one("#logs_table", DataTable).clear()
- else:
- self.dismiss(None)
-
-class SetDetectorNotificationPushScreen(ModalScreen[str]):
- def __init__(self, manager, profile_name: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.profile = self.manager.get_profile(profile_name)
-
- def compose(self) -> ComposeResult:
- current_val = getattr(self.profile, 'notify_enabled', False)
-
- with Container(classes="modal-window small-modal"):
- yield Label(f"Notification: {self.profile_name}", classes="modal-header")
-
- with Vertical(classes="section-card"):
- yield Label("Enable notification (Discord)")
- yield Switch(value=current_val, id="switch-anomaly")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="close-button", variant="primary")
-
- @on(Switch.Changed)
- def on_switch_changed(self, event: Switch.Changed):
- if event.switch.id == "switch-anomaly":
- self.profile.notify_enabled = event.value
- self.manager.try_save_profiles(notify=False)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- self.dismiss(None)
-
-class ConfirmDeletePushScreen(ModalScreen[str]):
- def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window small-modal"):
- yield Label("Are you sure?",classes="modal-header")
- with Horizontal(classes="modal-footer"):
- yield Button("Delete", id="confirm-button", variant="error")
- yield Button("Cancel", id="cancel-button", variant="default")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- self.manager.delete_profile(self.profile_name)
- self.dismiss(None)
- else:
- self.dismiss(None)
diff --git a/src/netmonitor/front/detector_tab.py b/src/netmonitor/front/detector_tab.py
deleted file mode 100644
index 53dc908..0000000
--- a/src/netmonitor/front/detector_tab.py
+++ /dev/null
@@ -1,135 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Input, Select, Button, Label, Checkbox
-from textual.containers import Container, Horizontal, Vertical, VerticalScroll
-
-import psutil
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-from ..back.window import FEATURE_LIST
-from ..front.detector_tab_pushscreens import SaveProfilePushScreen
-
-class DetectorTab(Container):
- def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- with Horizontal(id="top-config-container"):
-
- model_section = Container(id="model-section", classes="section-card")
- model_section.border_title = "Algorithm: HalfSpaceTrees"
- with model_section:
- with VerticalScroll(classes="detector-scroll"):
- yield Label("Select interface:", classes="label")
- try:
- ifaces = list(psutil.net_if_addrs().keys())
- except:
- ifaces = []
-
- yield Select.from_values(
- ifaces,
- id="interface-select",
- allow_blank=False,
- classes="input"
- )
-
- yield Label("Model params:", classes="label")
- yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input")
- yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input")
- yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input")
- yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input")
- yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input")
- yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input")
- yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input")
-
- features_section = Container(id="features-section", classes="section-card")
- features_section.border_title = "Flow-based Features"
- with features_section:
- with VerticalScroll(classes="detector-scroll"):
- yield Label("Select features to include in model:", classes="label")
- self.feature_checkboxes = {}
- for feat in FEATURE_LIST:
- cb = Checkbox(feat, value=True, classes="input")
- self.feature_checkboxes[feat] = cb
- yield cb
-
- bpf_section = Container(id="bpf-section", classes="section-card")
- bpf_section.border_title = "BPF Filter (Optional)"
- with bpf_section:
- yield Input(
- placeholder="BPF Filter (e.g. 'tcp port 80 or udp')",
- id="param-bpf_filter",
- classes="input full"
- )
- with Container(classes="save-button-container"):
- yield Button("Save Profile", id="save-button", variant="success")
-
- def get_inputs(self):
- features = [f for f, cb in self.feature_checkboxes.items() if cb.value]
-
- if not features:
- raise ValueError("Select at least one feature.")
-
- params = {}
-
- try:
- interface = self.query_one("#interface-select", Select).value
- if not interface:
- raise ValueError("Select interface.")
- params["interface"] = interface
- except Exception:
- raise ValueError("Interface selection error.")
-
- bpf_input = self.query_one("#param-bpf_filter", Input)
- if bpf_input.value.strip():
- params["bpf_filter"] = bpf_input.value.strip()
-
- defaults = {
- "trees": 10,
- "height": 8,
- "window": 250,
- "seed": 42,
- "threshold": 0.7,
- "window_duration": 10.0,
- "queue_size": 10000,
- "bpf_filter": ""
- }
-
- model_section = self.query_one("#model-section")
- for inp in model_section.query("Input"):
- if inp.id and inp.id.startswith("param-"):
- key = inp.id.removeprefix("param-")
- if key == "bpf_filter": continue
-
- val_str = inp.value.strip()
-
- if not val_str:
- if key in defaults and defaults[key] is not None:
- params[key] = defaults[key]
- continue
-
- try:
- if key in ["trees", "height", "window", "seed","queue_size"]:
- params[key] = int(val_str)
- elif key in ["threshold", "window_duration"]:
- params[key] = float(val_str)
- except ValueError:
- raise ValueError(f"Param '{key}' must be a number.")
-
- return {
- "features": features,
- "params": params,
- }
-
- @on(Button.Pressed, "#save-button")
- async def handle_save_button(self, event: Button.Pressed):
- try:
- input_data = self.get_inputs()
- self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data))
- except ValueError as e:
- self.app.notify(str(e), title="Validation error", severity="error")
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/detector_tab_pushscreens.py b/src/netmonitor/front/detector_tab_pushscreens.py
deleted file mode 100644
index 625921b..0000000
--- a/src/netmonitor/front/detector_tab_pushscreens.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from textual.app import ComposeResult
-from textual.widgets import Input, Label, Button
-from textual.containers import Vertical, Horizontal
-from textual.screen import ModalScreen
-from textual import on
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-
-class SaveProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager: DetectorProfilesManager, input_data , *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.input_data = input_data
-
-
- def compose(self) -> ComposeResult:
- with Vertical(classes="modal-window small-modal"):
- yield Label("Save scan profile", classes="modal-header")
- yield Input(placeholder="Profile name:", id="profile-name", classes="input")
- with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"):
- yield Button("Confirm", id="confirm-button", variant="success", classes="button")
- yield Button("Cancel", id="cancel-button", variant="error", classes="button")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- profile_name = self.query_one("#profile-name", Input).value.strip()
- self.manager.add_profile(profile_name,self.input_data)
- self.dismiss(None)
- else:
- self.dismiss(None)
diff --git a/src/netmonitor/front/options_tab.py b/src/netmonitor/front/options_tab.py
deleted file mode 100644
index 6cbd448..0000000
--- a/src/netmonitor/front/options_tab.py
+++ /dev/null
@@ -1,44 +0,0 @@
-from textual.app import ComposeResult
-from textual.containers import Container, Horizontal
-from textual.widgets import Button, Input, Label
-from textual import on
-
-from ..back.notification_service import notification_service
-
-class OptionsTab(Container):
- def __init__(self, scanner_manager, detector_manager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.scanner_manager = scanner_manager
- self.detector_manager = detector_manager
-
- def compose(self) -> ComposeResult:
- notify_section = Container(id="notify-section", classes="section-card")
- notify_section.border_title = "Notification config"
- with notify_section:
- yield Label("Discord Webhook URL:")
- yield Input(placeholder="https://discord.com/api/webhooks/...", id="input-webhook-url")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Save config", id="save-config", variant="success")
- yield Button("notification test", id="test-notif", variant="primary")
-
- def on_mount(self):
- self.query_one("#input-webhook-url", Input).value = notification_service.webhook_url
-
- @on(Button.Pressed, "#save-config")
- def save_configuration(self):
- url = self.query_one("#input-webhook-url", Input).value.strip()
-
- if notification_service.save_config(url):
- self.app.notify("Config saved", severity="information")
- else:
- self.app.notify("Error during saving", severity="error")
-
- @on(Button.Pressed, "#test-notif")
- def test_notification(self):
- success = notification_service.send_message("**Test NetMonitor**\n")
-
- if success:
- self.app.notify("good", severity="information")
- else:
- self.app.notify("bad", severity="error")
diff --git a/src/netmonitor/front/scanner_profiles_tab.py b/src/netmonitor/front/scanner_profiles_tab.py
deleted file mode 100644
index 8e8610c..0000000
--- a/src/netmonitor/front/scanner_profiles_tab.py
+++ /dev/null
@@ -1,84 +0,0 @@
-from textual.widgets import Button, Label, Switch
-from textual import on
-from textual.app import ComposeResult
-from textual.containers import Vertical, Horizontal, VerticalScroll
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-from .scanner_profiles_tab_pushscreens import ScanNowPushScreen, SetSchedulerPushScreen, ShowLogsPushScreen, SetNotifacationOptionsPushScreen, ShowProfilePushScreen, ConfirmDeletePushScreen
-
-class ScannerProfilesTab(Vertical):
- def __init__(self, manager: ScannerProfilesManager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.manager.on_refresh = self.refresh_profiles
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- yield VerticalScroll(id="profiles-list")
-
- def on_mount(self) -> None:
- self.refresh_profiles()
-
- def refresh_profiles(self) -> None:
- profiles_list = self.query_one("#profiles-list", VerticalScroll)
- profiles_list.remove_children()
-
- if not self.manager.profiles:
- return
-
- for profile in self.manager.profiles:
- row = Horizontal(
- Switch(id=f"switch-{profile.profile_name}", value=profile.is_active),
- Button("Scan Now", id=f"scan-now-button-{profile.profile_name}", classes="profile-action", variant="success"),
- Label(f"{profile.profile_name}", classes="profile-profile_name"),
- Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="primary"),
- Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Set scheduler", id=f"set-scheduler-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action", variant="error"),
- classes="profile-row"
- )
- profiles_list.mount(row)
-
- @on(Switch.Changed)
- def switch_changed(self, event: Switch.Changed) -> None:
- switch_id = event.switch.id
- if not switch_id:
- return
- profile_name = switch_id.removeprefix("switch-")
-
- if event.switch.value:
- is_turned_on = self.manager.turn_on_profile(profile_name)
- if not is_turned_on:
- event.switch.value = False
- else:
- is_turned_off = self.manager.turn_off_profile(profile_name)
- if not is_turned_off:
- event.switch.value = False
-
- @on(Button.Pressed)
- def on_any_button_pressed(self, event: Button.Pressed) -> None:
- button_id = event.button.id
- if not button_id:
- return
- if button_id.startswith("scan-now-button-"):
- profile_name = button_id.removeprefix("scan-now-button-")
- self.app.push_screen(ScanNowPushScreen(self.manager, profile_name))
- elif button_id.startswith("show-profile-button-"):
- profile_name = button_id.removeprefix("show-profile-button-")
- self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name))
- elif button_id.startswith("set-scheduler-button-"):
- profile_name = button_id.removeprefix("set-scheduler-button-")
- self.app.push_screen(SetSchedulerPushScreen(self.manager, profile_name))
- elif button_id.startswith("set-notifications-button-"):
- profile_name = button_id.removeprefix("set-notifications-button-")
- self.app.push_screen(SetNotifacationOptionsPushScreen(self.manager, profile_name))
- elif button_id.startswith("delete-"):
- profile_name = button_id.removeprefix("delete-")
- self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name))
- elif button_id.startswith("show-logs-button-"):
- profile_name = button_id.removeprefix("show-logs-button-")
- self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name))
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py
deleted file mode 100644
index 5be9941..0000000
--- a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py
+++ /dev/null
@@ -1,180 +0,0 @@
-from textual.app import ComposeResult
-from textual import on
-from textual.screen import ModalScreen
-from textual.widgets import Input, Button, Label, Pretty, Select, Switch
-from textual.containers import Vertical, Horizontal, VerticalScroll, Container
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-
-class ScanNowPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window large-modal"):
- yield Label(f"Scanning: {self.profile_name}", classes="modal-header")
- with VerticalScroll(classes="info-box"):
- yield Pretty(self.manager.get_profile(self.profile_name).scan())
- with Horizontal(classes="modal-footer"):
- yield Button("Close", variant="primary")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- self.dismiss(None)
-
-class ShowProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window medium-modal"):
- yield Label(f"Profile: {self.profile_name}", classes="modal-header")
- with VerticalScroll(classes="info-box"):
- yield Pretty(self.manager.get_profile(self.profile_name).to_dict())
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="cancel-button", variant="primary")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- self.dismiss(None)
-
-
-class SetSchedulerPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window small-modal"):
- yield Label("Set CRON Scheduler", classes="modal-header")
- yield Label("Format: min hour day month day_of_week", classes="label")
- yield Input(placeholder="* * * * * ", id="cron-input", classes="input")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Confirm", id="confirm-button", variant="success")
- yield Button("Cancel", id="cancel-button", variant="error")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- cron_input = self.query_one("#cron-input", Input).value.strip()
- if cron_input:
- self.manager.set_validated_scheduler(self.profile_name, cron_input)
- self.dismiss(None)
- else:
- self.dismiss(None)
-
-class SetNotifacationOptionsPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.profile = self.manager.get_profile(profile_name)
-
- def compose(self) -> ComposeResult:
- current_enabled = getattr(self.profile, 'notify_enabled', False)
- current_cve_only = getattr(self.profile, 'notify_only_cve', False)
-
- with Container(classes="modal-window small-modal"):
- yield Label(f"Notifications: {self.profile_name}", classes="modal-header")
-
- with Vertical(classes="section-card"):
- yield Label("Enable notifications (Discord):")
- yield Switch(value=current_enabled, id="switch-enable")
-
- yield Label("Notify only when scanner finds CVE")
- yield Switch(value=current_cve_only, id="switch-cve-only")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="close-button", variant="primary")
-
- @on(Switch.Changed)
- def on_switch_changed(self, event: Switch.Changed):
- if event.switch.id == "switch-enable":
- self.profile.notify_enabled = event.value
- elif event.switch.id == "switch-cve-only":
- self.profile.notify_only_cve = event.value
-
- self.manager.try_save_profiles(notify=False)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- self.dismiss(None)
-
-class ShowLogsPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.logs = []
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window large-modal"):
- yield Label(f"Logs: {self.profile_name}", classes="modal-header")
-
- yield Select([], prompt="Select date", id="scan-date-select")
-
- with VerticalScroll(classes="info-box"):
- yield Pretty({}, id="log-content")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="cancel-button", variant="primary")
-
- def on_mount(self):
- self.logs = self.manager.get_profile_logs(self.profile_name)
- select = self.query_one("#scan-date-select", Select)
- options = []
-
- if self.logs:
- for index, log in enumerate(reversed(self.logs)):
- real_index = len(self.logs) - 1 - index
- label = log.get('_timestamp', f"Scan #{real_index + 1} (no date)")
-
- options.append((str(label), real_index))
-
- select.set_options(options)
-
- if options:
- select.value = options[0][1]
-
- @on(Select.Changed, "#scan-date-select")
- def on_date_selected(self, event: Select.Changed):
- if event.value is not None:
- index = event.value
- if index == Select.BLANK:
- self.query_one("#log-content", Pretty).update({})
- return
- if 0 <= index < len(self.logs):
- log_entry = self.logs[index]
- self.query_one("#log-content", Pretty).update(log_entry)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- self.dismiss(None)
-
-
-class ConfirmDeletePushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window small-modal"):
- yield Label("Are you sure?",classes="modal-header")
- with Horizontal(classes="modal-footer"):
- yield Button("Delete", id="confirm-button", variant="error")
- yield Button("Cancel", id="cancel-button", variant="default")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- self.manager.delete_profile(self.profile_name)
- self.dismiss(None)
- else:
- self.dismiss(None)
diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py
deleted file mode 100755
index 868fa74..0000000
--- a/src/netmonitor/front/scanner_tab.py
+++ /dev/null
@@ -1,277 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty
-from textual.widgets.selection_list import Selection
-from textual.containers import Container, Vertical, Horizontal, VerticalScroll
-from textual.events import Mount
-
-import psutil, ipaddress, shlex
-from typing import List, Dict, Optional
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-from .scanner_tab_pushscreens import SaveProfilePushScreen
-
-class ScannerTab(Container):
-
- def __init__(self, manager: ScannerProfilesManager):
- super().__init__()
- self.manager = manager
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- friendly_section = Container(id="friendly-command-section", classes="section card")
- friendly_section.border_title = "Scanner Configuration"
-
- with friendly_section:
- yield Label("Interface:", classes="label")
- try:
- interfaces = list(psutil.net_if_addrs().keys())
- except Exception:
- interfaces = []
-
- yield Select.from_values(
- interfaces,
- id="interface-select",
- allow_blank=False,
- classes="input"
- )
-
- yield Label("IP address:", classes="label")
- yield Input(
- value="127.0.0.1/32",
- placeholder="192.168.0.0/24",
- name="ip-input",
- id="ip",
- classes="input"
- )
-
- yield Label("Port range (or single port)", classes="label")
- with Horizontal(id="port-range", classes="input-row"):
- yield Input(placeholder="Start / Single", id="low-port-range", classes="input half")
- yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half")
-
- yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox")
-
- yield Label("Scan options:", classes="label")
- with Horizontal(classes="selection-row"):
- yield SelectionList[str](
- Selection("Service version (-sV)", "-sV", False),
- Selection("OS detection (-O)", "-O", False),
- Selection("SYN scan (-sS)", "-sS", False),
- Selection("UDP scan (-sU)", "-sU", False),
- Selection("Fast scan (-F)", "-F", False),
- Selection("Aggressive scan (-A)", "-A", False),
- Selection("Ping skip (-Pn)", "-Pn", False),
- Selection("Timing T4 (-T4)", "-T4", True),
- classes="selection-list"
- )
-
- user_section = Container(id="user-command-section", classes="section card")
- user_section.border_title = "Manual Command"
-
- with user_section:
- yield Input(
- placeholder="sudo nmap -sS 192.168.1.1",
- name="user-nmap-input",
- id="usercommand",
- classes="input full"
- )
-
-
- final_section = Container(id="final-section", classes="section card")
- final_section.border_title = "Execution"
-
- with final_section:
- with Vertical(id="final-command", classes="command-output"):
- self.final_command = Pretty([], id="final-command-output")
- yield self.final_command
- with Vertical(id="buttons", classes="buttons-column"):
- yield Button("Scan now", id="scan-button", variant="success")
- yield Button("Save profile", id="save-button", variant="primary")
-
- results_section = Container(id="results-section", classes="section card")
- results_section.border_title = "Scan Output"
-
- with results_section:
- with VerticalScroll(id="results-scroll"):
- self.results = Pretty([], id="results", classes="results")
- yield self.results
-
- def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]:
- errors: List[str] = []
-
- if self.query_one("#cve-checkbox", Checkbox).value:
- args = nmap_input.get("arguments", "")
- if "-sV" not in args.split():
- errors.append("To check for CVE's, '-sV' option is required")
- return errors
-
- targets = nmap_input.get("targets", "").strip()
- if not targets:
- errors.append("IP field cannot be empty")
- return errors
-
- if not self.query_one("#usercommand", Input).value.strip():
- try:
- ipaddress.ip_network(targets, strict=False)
- except ValueError:
- errors.append(f"Invalid IP address/CIDR: {targets}")
- return errors
-
- if "ports" in nmap_input:
- port_str = nmap_input["ports"]
- if "-" in port_str:
- try:
- low, high = map(int, port_str.split("-"))
- if not (1 <= low <= 65535 and 1 <= high <= 65535):
- errors.append("Ports must be in range 1-65535")
- if low > high:
- errors.append("Start port cannot be greater than end port")
- except ValueError:
- errors.append("Ports (range) must be numbers")
- else:
- try:
- port = int(port_str)
- if not (1 <= port <= 65535):
- errors.append("Port must be in range 1-65535")
- except ValueError:
- errors.append("Port must be a number")
-
- dry_run_err = self.dry_run_nmap_command(nmap_input)
- if dry_run_err:
- errors.append(dry_run_err)
- return errors
-
- def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]:
- try:
- cmd = ["nmap"]
- if "arguments" in nmap_input:
- cmd += shlex.split(nmap_input["arguments"])
- if "ports" in nmap_input:
- cmd += ["-p", nmap_input["ports"]]
- cmd.append(nmap_input["targets"])
- return None
- except Exception as e:
- return f"Błąd weryfikacji komendy: {e}"
-
- def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict:
- user_command = self.query_one("#usercommand", Input).value.strip()
- low_port = self.query_one("#low-port-range", Input).value.strip()
- high_port = self.query_one("#high-port-range", Input).value.strip()
- interface = self.query_one("#interface-select", Select).value
- targets = self.query_one("#ip", Input).value.strip()
- options = self.query_one(SelectionList).selected
-
- if return_dict:
- if user_command:
- parts = shlex.split(user_command)
-
- if parts and parts[0] == "sudo": parts.pop(0)
- if parts and parts[0] == "nmap": parts.pop(0)
-
- if not parts:
- return {}
-
- target = parts[-1]
- args = parts[:-1]
-
- return {
- "targets": target,
- "arguments": " ".join(args)
- }
- else:
- nmap_dict = {"targets": targets}
-
- if low_port:
- if high_port:
- nmap_dict["ports"] = f"{low_port}-{high_port}"
- else:
- nmap_dict["ports"] = f"{low_port}"
-
- args: list[str] = []
- if options:
- args.extend(options)
- if interface:
- args.extend(["-e", str(interface)])
- if args:
- nmap_dict["arguments"] = " ".join(args)
-
- return {k: v for k, v in nmap_dict.items() if v is not None}
-
- if user_command:
- return shlex.split(user_command)
-
- parts = ["nmap"]
- if interface:
- parts.extend(["-e", str(interface)])
- if options:
- parts.extend(options)
-
- if low_port:
- if high_port:
- parts.extend(["-p", f"{low_port}-{high_port}"])
- else:
- parts.extend(["-p", f"{low_port}"])
-
- parts.append(targets)
- return parts
-
- @on(Mount)
- @on(SelectionList.SelectedChanged)
- @on(Input.Changed)
- @on(Select.Changed)
- def update_selected_view(self, event=None) -> None:
- self.final_command.update(" ".join(self.get_nmap_input()))
-
- @on(Button.Pressed, "#scan-button")
- async def handle_scan_button(self, event: Button.Pressed) -> None:
- nmap_input = self.get_nmap_input(return_dict=True)
- if not isinstance(nmap_input, dict):
- self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
- return
- errors = self.validate_inputs(nmap_input)
- if errors:
- self.notify("\n".join(errors), title="Bad Input", severity="error")
- else:
- try:
- temp_prof_name = "temp_scan_profile"
- self.notify("Start scanning", title="Notification", severity="information")
-
- if not self.manager.add_profile(temp_prof_name, notify=False):
- self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error")
- return
-
- if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False):
- self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error")
- self.manager.delete_profile(temp_prof_name, notify=False)
- return
-
- profile = self.manager.get_profile(temp_prof_name)
- if profile:
- scan_now_results = profile.scan()
- self.results.update(scan_now_results)
- self.manager.delete_profile(temp_prof_name, notify=False)
- self.notify("Scanning completed", title="Notification", severity="information")
- else:
- self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error")
-
- except Exception as e:
- self.notify(f"Scanner error: {e}", title="Error", severity="error")
-
- @on(Button.Pressed, "#save-button")
- async def handle_save_button(self, event: Button.Pressed):
- nmap_input = self.get_nmap_input(return_dict=True)
- if not isinstance(nmap_input, dict):
- self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
- return
-
- errors = self.validate_inputs(nmap_input)
- if errors:
- self.notify("\n".join(errors), title="Bad Input", severity="error")
- return
-
- cve_check = self.query_one("#cve-checkbox", Checkbox).value
- self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check))
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/scanner_tab_pushscreens.py b/src/netmonitor/front/scanner_tab_pushscreens.py
deleted file mode 100644
index 4e4a564..0000000
--- a/src/netmonitor/front/scanner_tab_pushscreens.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Input, Button, Label
-from textual.containers import Vertical, Horizontal
-from textual.screen import ModalScreen
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-
-class SaveProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, nmap_input, cve_check, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.nmap_input = nmap_input
- self.cve_check = cve_check
-
-
- def compose(self) -> ComposeResult:
- with Vertical(classes="modal-window small-modal"):
- yield Label("Save scan profile", classes="modal-header")
- yield Input(placeholder="Profile name:", id="profile-name", classes="input")
- with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"):
- yield Button("Confirm", id="confirm-button", variant="success", classes="button")
- yield Button("Cancel", id="cancel-button", variant="error", classes="button")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- profile_name = self.query_one("#profile-name", Input).value.strip()
- self.manager.add_profile(profile_name)
- self.manager.update_profile(profile_name,"nmap_input",self.nmap_input,notify=False)
- self.manager.update_profile(profile_name,"cve",self.cve_check, notify=False)
- self.dismiss(None)
- else:
- self.dismiss(None)
-