summaryrefslogtreecommitdiff
path: root/src/netmonitor
diff options
context:
space:
mode:
authorEnricoGuccii <partyka.003@proton.me>2026-01-10 22:43:36 +0100
committerEnricoGuccii <partyka.003@proton.me>2026-01-10 22:43:36 +0100
commit6d7410e286ce0fde31f89185c095fe90e85597f3 (patch)
tree5092c99d353382a71f00d8fe18d53b8073cf3f58 /src/netmonitor
parentc2f5fbe7fb93ce420caf23c5c0e06144cf953bb8 (diff)
bloat removed
Diffstat (limited to 'src/netmonitor')
-rw-r--r--src/netmonitor/__init__.py0
-rw-r--r--src/netmonitor/__pycache__/__init__.cpython-312.pycbin159 -> 0 bytes
-rw-r--r--src/netmonitor/__pycache__/app.cpython-312.pycbin4846 -> 0 bytes
-rw-r--r--src/netmonitor/app.py74
-rw-r--r--src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pycbin11545 -> 0 bytes
-rw-r--r--src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pycbin10030 -> 0 bytes
-rw-r--r--src/netmonitor/back/__pycache__/flow_features.cpython-312.pycbin1145 -> 0 bytes
-rw-r--r--src/netmonitor/back/__pycache__/flow_table.cpython-312.pycbin9184 -> 0 bytes
-rw-r--r--src/netmonitor/back/__pycache__/notification_service.cpython-312.pycbin2912 -> 0 bytes
-rw-r--r--src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pycbin7203 -> 0 bytes
-rw-r--r--src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pycbin12392 -> 0 bytes
-rw-r--r--src/netmonitor/back/__pycache__/window.cpython-312.pycbin9507 -> 0 bytes
-rw-r--r--src/netmonitor/back/detector_profile_HST.py215
-rw-r--r--src/netmonitor/back/detector_profiles_manager.py153
-rw-r--r--src/netmonitor/back/notification_service.py49
-rw-r--r--src/netmonitor/back/scanner_profile.py138
-rw-r--r--src/netmonitor/back/scanner_profiles_manager.py177
-rw-r--r--src/netmonitor/back/window.py251
-rw-r--r--src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pycbin5924 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pycbin13721 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_tab.cpython-312.pycbin7672 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pycbin2798 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/options_tab.cpython-312.pycbin3442 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pycbin6552 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pycbin14133 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pycbin14995 -> 0 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pycbin3075 -> 0 bytes
-rw-r--r--src/netmonitor/front/detector_profiles_tab.py77
-rw-r--r--src/netmonitor/front/detector_profiles_tab_pushscreens.py187
-rw-r--r--src/netmonitor/front/detector_tab.py135
-rw-r--r--src/netmonitor/front/detector_tab_pushscreens.py31
-rw-r--r--src/netmonitor/front/options_tab.py44
-rw-r--r--src/netmonitor/front/scanner_profiles_tab.py84
-rw-r--r--src/netmonitor/front/scanner_profiles_tab_pushscreens.py180
-rwxr-xr-xsrc/netmonitor/front/scanner_tab.py277
-rw-r--r--src/netmonitor/front/scanner_tab_pushscreens.py35
-rw-r--r--src/netmonitor/styles/styles.css250
37 files changed, 0 insertions, 2357 deletions
diff --git a/src/netmonitor/__init__.py b/src/netmonitor/__init__.py
deleted file mode 100644
index e69de29..0000000
--- a/src/netmonitor/__init__.py
+++ /dev/null
diff --git a/src/netmonitor/__pycache__/__init__.cpython-312.pyc b/src/netmonitor/__pycache__/__init__.cpython-312.pyc
deleted file mode 100644
index bf05138..0000000
--- a/src/netmonitor/__pycache__/__init__.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/__pycache__/app.cpython-312.pyc b/src/netmonitor/__pycache__/app.cpython-312.pyc
deleted file mode 100644
index 38fa57a..0000000
--- a/src/netmonitor/__pycache__/app.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/app.py b/src/netmonitor/app.py
deleted file mode 100644
index e60108f..0000000
--- a/src/netmonitor/app.py
+++ /dev/null
@@ -1,74 +0,0 @@
-from textual.app import App, ComposeResult
-from textual.widgets import TabbedContent, TabPane
-from textual.theme import Theme
-
-from apscheduler.schedulers.background import BackgroundScheduler
-from pathlib import Path
-import os
-
-from .front.scanner_tab import ScannerTab
-from .front.scanner_profiles_tab import ScannerProfilesTab
-from .front.detector_tab import DetectorTab
-from .front.detector_profiles_tab import DetectorProfilesTab
-from .front.options_tab import OptionsTab
-
-from .back.scanner_profiles_manager import ScannerProfilesManager
-from .back.detector_profiles_manager import DetectorProfilesManager
-
-
-XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share"))
-
-theme = Theme(
- name="pastel_blue_theme",
- primary="#82A6F2",
- secondary="#778899",
- accent="#E0FFFF",
- # background="#1a1b26",
- surface="#1e1e20",
- error="#ffb3ba",
- success="#baffc9",
- warning="#ffffba",
-)
-
-class NetMonitor(App):
- CSS_PATH = "styles/styles.css"
-
- def __init__(self):
- super().__init__()
- self.scheduler = BackgroundScheduler()
- self.scheduler.start()
- self.scanner_profiles_manager = ScannerProfilesManager(profiles_file=f"{XDG_DATA_HOME}/netmonitor/objects/scanner_profiles_objects", scheduler=self.scheduler)
- self.detector_profiles_manager = DetectorProfilesManager(profiles_file=f"{XDG_DATA_HOME}/netmonitor/objects/detector_profiles_objects")
-
- def compose(self) -> ComposeResult:
- with TabbedContent():
- with TabPane(title="Scanner", id="scanner", classes="scanner-theme"):
- with TabbedContent():
- with TabPane(title="Scan"):
- yield ScannerTab(self.manager.scanner_profiles_manager)
- with TabPane(title="Profiles"):
- yield ScannerProfilesTab(self.manager.scanner_profiles_manager)
-
- with TabPane(title="Detector", id="detector", classes="detector-theme"):
- with TabbedContent():
- with TabPane(title="Models"):
- yield DetectorTab(self.manager.detector_profiles_manager)
- with TabPane(title="Profiles"):
- yield DetectorProfilesTab(self.manager.detector_profiles_manager)
-
- with TabPane(title="Options", id="options", classes="options-theme"):
- yield OptionsTab(self.scanner_profiles_manager, self.detector_profiles_manager)
-
- def on_mount(self):
- self.register_theme(theme)
- self.theme = "pastel_blue_theme"
-
- @property
- def manager(self):
- return self
-
-def main():
- NetMonitor().run()
-
-if __name__ == "__main__":
- main()
diff --git a/src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pyc b/src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pyc
deleted file mode 100644
index 6dc15c8..0000000
--- a/src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pyc b/src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pyc
deleted file mode 100644
index b17c763..0000000
--- a/src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/flow_features.cpython-312.pyc b/src/netmonitor/back/__pycache__/flow_features.cpython-312.pyc
deleted file mode 100644
index b5e28d0..0000000
--- a/src/netmonitor/back/__pycache__/flow_features.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/flow_table.cpython-312.pyc b/src/netmonitor/back/__pycache__/flow_table.cpython-312.pyc
deleted file mode 100644
index ffb1702..0000000
--- a/src/netmonitor/back/__pycache__/flow_table.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/notification_service.cpython-312.pyc b/src/netmonitor/back/__pycache__/notification_service.cpython-312.pyc
deleted file mode 100644
index 71ee4ce..0000000
--- a/src/netmonitor/back/__pycache__/notification_service.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pyc b/src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pyc
deleted file mode 100644
index 0c8b662..0000000
--- a/src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pyc b/src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pyc
deleted file mode 100644
index f9dc637..0000000
--- a/src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/window.cpython-312.pyc b/src/netmonitor/back/__pycache__/window.cpython-312.pyc
deleted file mode 100644
index c6971ec..0000000
--- a/src/netmonitor/back/__pycache__/window.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/back/detector_profile_HST.py b/src/netmonitor/back/detector_profile_HST.py
deleted file mode 100644
index 267b307..0000000
--- a/src/netmonitor/back/detector_profile_HST.py
+++ /dev/null
@@ -1,215 +0,0 @@
-import threading
-import queue
-import time
-import os
-from pathlib import Path
-from scapy.all import wrpcap, AsyncSniffer
-
-from river.anomaly import HalfSpaceTrees
-from tinydb import TinyDB
-from tinydb.table import Document
-
-from .window import Window
-from .notification_service import notification_service
-
-XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share"))
-LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/detector/profiles_logs"
-PCAP_PATH = f"{XDG_DATA_HOME}/netmonitor/detector/profiles_pcaps"
-
-
-class DetectorProfileHST:
-
- def __init__(
- self,
- profile_name: str,
- input_data: dict
- ):
- self.profile_name = profile_name
- self.features = input_data.get("features", [])
- self.params = input_data.get("params", {})
- self.n_trees = int(self.params.get("trees", 10))
- self.height = int(self.params.get("height", 8))
- self.window_size = int(self.params.get("window", 250))
- self.seed = int(self.params.get("seed", 42))
- self.threshold = float(self.params.get("threshold", 0.7))
- self.window_duration = float(self.params.get("window_duration", 10.0))
- self.bpf_filter = self.params.get("bpf_filter", "")
- self.interface = self.params.get("interface", None)
- self.queue_size = int(self.params.get("queue_size", 10000))
- self.logs_path = f"{LOGS_PATH}/{profile_name}.json"
- os.makedirs(os.path.dirname(self.logs_path), exist_ok=True)
-
- self.is_active = False
-
- self.notify_enabled = False
- self._init_runtime_objects()
-
-
- def _init_runtime_objects(self):
- self.queue = queue.Queue(maxsize=self.queue_size)
-
- os.makedirs(f"{LOGS_PATH}", exist_ok=True)
- self.db = TinyDB(f"{LOGS_PATH}/{self.profile_name}.json")
-
- self.model = HalfSpaceTrees(
- n_trees=self.n_trees,
- height=self.height,
- window_size=self.window_size,
- seed=self.seed
- )
-
- self.window = Window(
- window_duration=self.window_duration,
- enabled_features=self.features
- )
-
- self.processor_thread = None
-
- self.packets_read = 0
- self.windows_analyzed = 0
-
- if not hasattr(self, 'plot_data'):
- self.plot_data = []
-
-
- def __getstate__(self):
- state = self.__dict__.copy()
- cols_to_remove = ['sniffer_thread', 'processor_thread', 'queue', 'db', 'window']
- for col in cols_to_remove:
- if col in state:
- del state[col]
- return state
-
- def __setstate__(self, state):
- self.__dict__.update(state)
- self._init_runtime_objects()
- self.is_active = False
-
- def __repr__(self):
- return f"<DetectorProfileHST profile_name={self.profile_name!r}, active={self.is_active}>"
-
- def turn_on(self):
- if self.is_active:
- return
-
- self.is_active = True
- os.makedirs(os.path.dirname(self.logs_path), exist_ok=True)
- self.window.window_duration = self.window_duration
- self.window.window_start = time.time()
-
- self.sniffer = AsyncSniffer(
- iface=self.interface,
- filter=self.bpf_filter,
- store=False,
- prn=self._add_to_queue
- )
- self.sniffer.start()
-
- self.processor_thread = threading.Thread(target=self._process_thread, daemon=True)
-
- self.processor_thread.start()
-
- def turn_off(self):
- self.is_active = False
- if self.sniffer:
- self.sniffer.stop()
-
- time.sleep(0.2)
-
- def _add_to_queue(self, pkt):
- if self.queue:
- try:
- self.queue.put_nowait(pkt)
- self.packets_read += 1
- except queue.Full:
- pass
-
- def _process_thread(self):
- while self.is_active:
- try:
- pkt = self.queue.get(timeout=1)
- except queue.Empty:
- continue
-
- result = self.window.add_packet(pkt)
-
- if result is None:
- continue
-
- features, raw_packets = result
- self.windows_analyzed += 1
-
- if not features:
- continue
-
- sample = {feat: 0.0 for feat in self.features}
- for k, v in features.items():
- if k in sample:
- sample[k] = float(v)
-
- score = self.model.score_one(sample)
- self.model.learn_one(sample)
-
- self.plot_data.append(score)
- if len(self.plot_data) > 30:
- self.plot_data.pop(0)
-
- if score > self.threshold:
- self._handle_anomaly(score, sample, raw_packets)
-
- def _handle_anomaly(self, score: float, features: dict, raw_packets: list):
- timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
-
- os.makedirs(os.path.dirname(f"{PCAP_PATH}/{self.profile_name}"), exist_ok=True)
- filename = f"{PCAP_PATH}/{self.profile_name}/anom_{timestamp}.pcap"
-
- if raw_packets:
- try:
- wrpcap(filename, raw_packets)
- except Exception as e:
- print(f"Error saving pcap: {e}")
-
- if self.notify_enabled:
- msg = f"*Anomaly detected: {self.profile_name}*\nScore: `{score:.4f}`\nSaved: `{filename}`"
- notification_service.send_message(message=msg)
-
- if self.db:
- self.db.insert(
- Document({
- "ts": time.time(),
- "timestamp": timestamp,
- "profile": self.profile_name,
- "score": float(score),
- "pcap": filename,
- "pkt_rate": features.get("pkt_rate", 0),
- "proto_info": f"TCP:{features.get('proto_tcp_ratio',0):.2f} UDP:{features.get('proto_udp_ratio',0):.2f}",
- }, doc_id=None)
- )
-
- def to_dict(self):
- return {
- "profile name": self.profile_name,
- "logs_path": self.logs_path,
- "pcap_path": f"{PCAP_PATH}/{self.profile_name}",
- "params": self.params,
- "features": self.features,
- }
-
- def get_logs(self):
- if self.db:
- return self.db.all()
- return []
-
- def clear_logs(self):
- if self.db:
- self.db.truncate()
-
- def get_runtime_stats(self):
- return {
- "is_active": self.is_active,
- "notify_enabled": self.notify_enabled,
- "packets_captured": getattr(self, "packets_read", 0),
- "queue_size": self.queue.qsize() if hasattr(self, "queue") and self.queue else 0,
- "windows_processed": getattr(self, "windows_analyzed", 0),
- "window_duration": self.window_duration
- }
diff --git a/src/netmonitor/back/detector_profiles_manager.py b/src/netmonitor/back/detector_profiles_manager.py
deleted file mode 100644
index 1d1c5ea..0000000
--- a/src/netmonitor/back/detector_profiles_manager.py
+++ /dev/null
@@ -1,153 +0,0 @@
-import re
-from pathlib import Path
-from typing import Callable, Literal
-import pickle
-from netmonitor.back.detector_profile_HST import DetectorProfileHST
-
-
-SeverityLevel = Literal["information", "warning", "error"]
-VALID_NAME_REGEX = r"^[a-zA-Z0-9_-]+$"
-
-class DetectorProfilesManager:
- def __init__(self, profiles_file: str):
- self.profiles_file = Path(profiles_file)
- self.profiles: list[DetectorProfileHST] = []
-
- self.on_message: Callable[[str, str, str], None] | None = None
- self.on_refresh: Callable[[], None] | None = None
-
- self.load_profiles()
-
-
- def _notify(self, msg: str, title: str = "Profile Manager", level: SeverityLevel = "information"):
- if self.on_message:
- self.on_message(msg, title, level)
-
-
- def _refresh_front(self):
- if self.on_refresh:
- self.on_refresh()
-
-
- def _fail(self, msg: str, level: SeverityLevel = "error", notify: bool = True) -> bool:
- if notify:
- self._notify(msg, level=level)
- return False
-
-
- def _ok(self, msg: str | None = None, level: SeverityLevel = "information", notify: bool = True) -> bool:
- if msg and notify:
- self._notify(msg, level=level)
- return True
-
-
- def try_save_profiles(self, notify: bool = True) -> bool:
- try:
- self.profiles_file.parent.mkdir(parents=True, exist_ok=True)
-
- with open(self.profiles_file, "wb") as f:
- pickle.dump(self.profiles, f, protocol=pickle.HIGHEST_PROTOCOL)
-
- return self._ok("Profiles saved successfully.", notify=notify)
- except Exception as e:
- import traceback
- traceback.print_exc()
- return self._fail(f"Error saving profiles: {e}", notify=notify)
-
-
- def load_profiles(self, notify: bool = True) -> bool:
- if not self.profiles_file.exists():
- if notify:
- self._notify("Profiles file not found. Creating a new one.", level="warning")
- if not self.try_save_profiles(notify=False):
- return self._fail("Failed to create profiles file!", notify=notify)
- return self._ok("New profiles file created.", notify=notify)
-
- try:
- with open(self.profiles_file, "rb") as f:
- self.profiles = pickle.load(f)
- self._refresh_front()
- return self._ok(f"Loaded {len(self.profiles)} profiles.", notify=notify)
- except Exception as e:
- return self._fail(f"Error loading profiles: {e}", notify=notify)
-
-
- def add_profile(self, profile_name: str, input_data: dict, notify: bool = True) -> bool:
- if not profile_name:
- return self._fail("Name can't be blank.", "error", notify)
-
- if not re.match(VALID_NAME_REGEX, profile_name):
- return self._fail(f"Bad input '{profile_name}'. Use only letters, numbers, '_' and '-'.", "error", notify)
-
- if any(p.profile_name == profile_name for p in self.profiles):
- return self._fail(f"Profile {profile_name} already exists.", "warning", notify)
-
- new_profile = DetectorProfileHST(profile_name=profile_name, input_data = input_data)
- self.profiles.append(new_profile)
-
- if not self.try_save_profiles(notify=False):
- self.profiles.remove(new_profile)
- return self._fail(f"Failed to save profile {profile_name}.", notify=notify)
-
- self._refresh_front()
- return self._ok(f"Added profile {profile_name}.", notify=notify)
-
-
- def delete_profile(self, profile_name: str, notify: bool = True) -> bool:
- before = len(self.profiles)
- self.profiles = [p for p in self.profiles if p.profile_name != profile_name]
-
- if len(self.profiles) == before:
- return self._fail(f"Profile {profile_name} not found.", "warning", notify)
-
- if not self.try_save_profiles(notify=False):
- return self._fail(f"Failed to save changes after deleting {profile_name}.", notify=notify)
-
- self._refresh_front()
- return self._ok(f"Deleted profile {profile_name}.", notify=notify)
-
-
- def get_profile(self, profile_name: str) -> DetectorProfileHST | None:
- return next((p for p in self.profiles if p.profile_name == profile_name), None)
-
-
- def update_profile(self, profile_name: str, field: str, value, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} does not exist.", notify=notify)
-
- setattr(p, field, value)
- if self.try_save_profiles(notify=False):
- return self._ok(f"Updated profile {profile_name}.", notify=notify)
- else:
- return self._fail(f"Failed to save updated profile {profile_name}.", notify=notify)
-
-
- def turn_on_profile(self, profile_name: str, app=None, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
- try:
- p.turn_on()
- return self._ok(f"Profile {profile_name} activated.", notify=notify)
- except Exception as e:
- return self._fail(f"Error activating profile: {e}", notify=notify)
-
-
- def turn_off_profile(self, profile_name: str, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
- try:
- p.turn_off()
- return self._ok(f"Profile {profile_name} deactivated.", notify=notify)
- except Exception as e:
- return self._fail(f"Error deactivating profile: {e}", notify=notify)
-
-
- def get_profile_logs(self, profile_name: str, notify: bool = True):
- p = self.get_profile(profile_name)
- if not p:
- self._fail(f"Profile {profile_name} not found.", "error", notify)
- return None
- return p.get_logs()
diff --git a/src/netmonitor/back/notification_service.py b/src/netmonitor/back/notification_service.py
deleted file mode 100644
index e347faf..0000000
--- a/src/netmonitor/back/notification_service.py
+++ /dev/null
@@ -1,49 +0,0 @@
-import json
-import requests
-from pathlib import Path
-
-CONFIG_FILE = Path("data/global_config.json")
-
-class NotificationService:
- def __init__(self):
- self.webhook_url = ""
- self.load_config()
-
- def load_config(self):
- if CONFIG_FILE.exists():
- try:
- with open(CONFIG_FILE, "r") as f:
- data = json.load(f)
- self.webhook_url = data.get("discord_webhook_url", "")
- except Exception as e:
- print(f"Error loading notification config: {e}")
-
- def save_config(self, webhook_url: str):
- self.webhook_url = webhook_url
-
- CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
- try:
- with open(CONFIG_FILE, "w") as f:
- json.dump({
- "discord_webhook_url": self.webhook_url
- }, f, indent=4)
- return True
- except Exception as e:
- print(f"Error saving config: {e}")
- return False
-
- def send_message(self, message: str) -> bool:
- if not self.webhook_url:
- return False
-
- payload = {
- "content": message
- }
- try:
- response = requests.post(self.webhook_url, json=payload, timeout=5)
- return response.status_code in [200, 204]
- except Exception as e:
- print(f"Discord send error: {e}")
- return False
-
-notification_service = NotificationService()
diff --git a/src/netmonitor/back/scanner_profile.py b/src/netmonitor/back/scanner_profile.py
deleted file mode 100644
index 6a20b5b..0000000
--- a/src/netmonitor/back/scanner_profile.py
+++ /dev/null
@@ -1,138 +0,0 @@
-import os
-from pathlib import Path
-from datetime import datetime
-from tinydb import TinyDB
-import nmap
-from .notification_service import notification_service
-
-
-XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share"))
-LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/scanner/profiles_logs"
-
-class ScannerProfile:
- def __init__(self, profile_name: str, nmap_input=None, scheduler=None, cve=None):
- self.profile_name = profile_name
- self.nmap_input = nmap_input or {}
- self.scheduler = scheduler
- self.cve = cve
- self.is_active = False
-
- self.notify_enabled = False
- self.notify_only_cve = False
-
- self.nm = None
- self.db = None
- self.profile_results_path = f"{LOGS_PATH}/{profile_name}.json"
- os.makedirs(os.path.dirname(self.profile_results_path), exist_ok=True)
-
- @property
- def nmap(self):
- if self.nm is None:
- self.nm = nmap.PortScanner()
- return self.nm
-
- @property
- def tinydb(self):
- if self.db is None:
- self.db = TinyDB(self.profile_results_path)
- return self.db
-
- def __getstate__(self):
- state = self.__dict__.copy()
- state['nm'] = None
- state['db'] = None
- return state
-
- def __setstate__(self, state):
- self.__dict__.update(state)
- self.nm = None
- self.db = None
-
- def __repr__(self):
- return f"<ScannerProfile profile_name={self.profile_name!r}, active={self.is_active}>"
-
-
- def scan(self):
- targets = self.nmap_input.get("targets", "")
- arguments = self.nmap_input.get("arguments", "")
- ports = self.nmap_input.get("ports", "")
-
- if self.cve:
- arguments += " --script=vulners "
-
- if ports == '':
- self.nmap.scan(hosts=targets, arguments=arguments)
- else:
- self.nmap.scan(hosts=targets, ports=ports, arguments=arguments)
-
- xml_result = self.nmap.get_nmap_last_output()
- analyzed_results = self.nmap.analyse_nmap_xml_scan(xml_result)
-
- if isinstance(analyzed_results, dict):
- analyzed_results['_timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- self.tinydb.insert(analyzed_results)
-
- if self.notify_enabled:
- self._handle_notification(analyzed_results, targets)
-
- return analyzed_results
-
- def _handle_notification(self, results, targets):
- scan_str = str(results).lower()
- found_issues = "cve" in scan_str or "vulnerab" in scan_str
- should_send = False
- msg = f"*Scanner report: {self.profile_name}*\nTargets: `{targets}`"
- summary = ""
- try:
- if 'scan' in results:
- for host, data in results['scan'].items():
- summary += f"\n *{host}*"
-
- if 'tcp' in data:
- open_ports = [f"{p}/tcp" for p, info in data['tcp'].items() if info.get('state') == 'open']
- if open_ports:
- summary += f"\n Open ports: {', '.join(open_ports)}"
- if 'udp' in data:
- open_ports = [f"{p}/udp" for p, info in data['udp'].items() if info.get('state') == 'open']
- if open_ports:
- summary += f"\n Open ports (UDP): {', '.join(open_ports)}"
- if 'osmatch' in data and data['osmatch']:
- os_name = data['osmatch'][0].get('name', 'Unknown')
- summary += f"\n OS detected: {os_name}"
- except Exception as e:
- summary += f"\n(Error building summary: {e})"
-
- if self.notify_only_cve:
- if found_issues:
- should_send = True
- msg += "\nCVE detected!"
- msg += "\n---" + summary
- else:
- should_send = True
- msg += "\nScan completed."
- if found_issues:
- msg += "\nPotential vulnerabilities detected."
-
- if summary:
- msg += "\n---" + summary
-
- if should_send:
- if len(msg) > 1900:
- msg = msg[:1900] + "\n...."
- notification_service.send_message(msg)
-
-
- def to_dict(self):
- return {
- "profile_name": self.profile_name,
- "nmap_input": self.nmap_input,
- "scheduler": self.scheduler,
- "is_active": self.is_active,
- "notify_enabled": getattr(self, 'notify_enabled', False),
- "notify_only_cve": getattr(self, 'notify_only_cve', False),
- "results_path": self.profile_results_path,
- }
-
- def get_logs(self):
- return self.tinydb.all()
diff --git a/src/netmonitor/back/scanner_profiles_manager.py b/src/netmonitor/back/scanner_profiles_manager.py
deleted file mode 100644
index 7748835..0000000
--- a/src/netmonitor/back/scanner_profiles_manager.py
+++ /dev/null
@@ -1,177 +0,0 @@
-import re
-from pathlib import Path
-from typing import Callable, Literal
-from apscheduler.schedulers.background import BackgroundScheduler
-from apscheduler.triggers.cron import CronTrigger
-import pickle
-
-from .scanner_profile import ScannerProfile
-
-SeverityLevel = Literal["information", "warning", "error"]
-VALID_NAME_REGEX = r"^[a-zA-Z0-9_-]+$"
-
-class ScannerProfilesManager:
- def __init__(self, profiles_file: str, scheduler: BackgroundScheduler):
- self.profiles_file = Path(profiles_file)
- self.scheduler = scheduler
- self.profiles: list[ScannerProfile] = []
-
- self.on_message: Callable[[str, str, str], None] | None = None
- self.on_refresh: Callable[[], None] | None = None
-
- self.load_profiles()
-
- def _notify(self, msg: str, title: str = "Profile Manager", level: SeverityLevel = "information"):
- if self.on_message:
- self.on_message(msg, title, level)
-
- def _refresh_front(self):
- if self.on_refresh:
- self.on_refresh()
-
- def _fail(self, msg: str, level: SeverityLevel = "error", notify: bool = True) -> bool:
- if notify:
- self._notify(msg, level=level)
- return False
-
- def _ok(self, msg: str | None = None, level: SeverityLevel = "information", notify: bool = True) -> bool:
- if msg and notify:
- self._notify(msg, level=level)
- return True
-
- def try_save_profiles(self, notify: bool = True) -> bool:
- try:
- self.profiles_file.parent.mkdir(parents=True, exist_ok=True)
- with open(self.profiles_file, "wb") as f:
- pickle.dump(self.profiles, f)
- return self._ok("Profiles saved successfully.", notify=notify)
- except Exception as e:
- return self._fail(f"Error saving profiles: {e}", notify=notify)
-
- def load_profiles(self, notify: bool = True) -> bool:
- if not self.profiles_file.exists():
- if notify:
- self._notify("Profiles file not found. Creating a new one.", level="warning")
- if not self.try_save_profiles(notify=False):
- return self._fail("Failed to create profiles file!", notify=notify)
- return self._ok("New profiles file created.", notify=notify)
-
- try:
- with open(self.profiles_file, "rb") as f:
- self.profiles = pickle.load(f)
- self._refresh_front()
- return self._ok(f"Loaded {len(self.profiles)} profiles.", notify=notify)
- except Exception as e:
- return self._fail(f"Error loading profiles: {e}", notify=notify)
-
-
- def add_profile(self, profile_name: str, notify: bool = True) -> bool:
- if not profile_name:
- return self._fail("Name can't be blank.", "error", notify)
-
- if not re.match(VALID_NAME_REGEX, profile_name):
- return self._fail(f"Bad input '{profile_name}'. Use only letters, numbers, '_' and '-'.", "error", notify)
-
- if any(p.profile_name == profile_name for p in self.profiles):
- return self._fail(f"Profile {profile_name} already exists.", "warning", notify)
-
- new_profile = ScannerProfile(profile_name=profile_name)
- self.profiles.append(new_profile)
-
- if not self.try_save_profiles(notify=False):
- self.profiles.remove(new_profile)
- return self._fail(f"Failed to save profile {profile_name}.", notify=notify)
-
- self._refresh_front()
- return self._ok(f"Added profile {profile_name}.", notify=notify)
-
- def delete_profile(self, profile_name: str, notify: bool = True) -> bool:
- before = len(self.profiles)
- self.profiles = [p for p in self.profiles if p.profile_name != profile_name]
-
- if len(self.profiles) == before:
- return self._fail(f"Profile {profile_name} not found.", "warning", notify)
-
- if not self.try_save_profiles(notify=False):
- return self._fail(f"Failed to save changes after deleting {profile_name}.", notify=notify)
-
- self._refresh_front()
- return self._ok(f"Deleted profile {profile_name}.", notify=notify)
-
- def get_profile(self, profile_name: str) -> ScannerProfile | None:
- return next((p for p in self.profiles if p.profile_name == profile_name), None)
-
- def update_profile(self, profile_name: str, field: str, value, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} does not exist.", notify=notify)
-
- setattr(p, field, value)
- if self.try_save_profiles(notify=False):
- return self._ok(f"Updated profile {profile_name}.", notify=notify)
- else:
- return self._fail(f"Failed to save updated profile {profile_name}.", notify=notify)
-
-
- def run_profile_once(self, profile_name: str, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
-
- try:
- p.scan()
- return self._ok(f"Scan completed for {profile_name}.", notify=notify)
- except Exception as e:
- return self._fail(f"Scan error: {e}", notify=notify)
-
- def turn_on_profile(self, profile_name: str, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
-
- try:
- job_id = f"profile_{p.profile_name}"
- self.scheduler.add_job(
- lambda: self.run_profile_once(p.profile_name, notify=False),
- trigger=CronTrigger.from_crontab(p.scheduler),
- id=job_id,
- replace_existing=True
- )
- p.is_active = True
- return self._ok(f"Profile {profile_name} activated.", notify=notify)
- except Exception as e:
- return self._fail(f"Error activating profile: {e}", notify=notify)
-
- def turn_off_profile(self, profile_name: str, notify: bool = True) -> bool:
- job_id = f"profile_{profile_name}"
- try:
- self.scheduler.remove_job(job_id)
- p = self.get_profile(profile_name)
- if p:
- p.is_active = False
- return self._ok(f"Profile {profile_name} deactivated.", notify=notify)
- except Exception as e:
- return self._fail(f"Error deactivating profile: {e}", notify=notify)
-
- def set_validated_scheduler(self, profile_name: str, cron_str: str, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
-
- try:
- CronTrigger.from_crontab(cron_str)
- p.scheduler = cron_str
- if not self.try_save_profiles(notify=False):
- return self._fail(f"Failed to save scheduler for {profile_name}.", notify=notify)
- return self._ok(f"Scheduler updated for {profile_name}.", notify=notify)
- except ValueError as ve:
- return self._fail(f"Invalid CRON: {ve}", notify=notify)
- except Exception as e:
- return self._fail(f"Unexpected error: {e}", notify=notify)
-
- def get_profile_logs(self, profile_name: str, notify: bool = True):
- p = self.get_profile(profile_name)
- if not p:
- self._fail(f"Profile {profile_name} not found.", "error", notify)
- return None
- return p.get_logs()
diff --git a/src/netmonitor/back/window.py b/src/netmonitor/back/window.py
deleted file mode 100644
index dfa8e7d..0000000
--- a/src/netmonitor/back/window.py
+++ /dev/null
@@ -1,251 +0,0 @@
-import time
-from collections import defaultdict
-from statistics import mean, pstdev
-import math
-
-from scapy.all import IP, IPv6, TCP, UDP, ICMP
-
-class Window:
- def __init__(self, window_duration: float, enabled_features: list[str]):
-
- self.window_duration = float(window_duration)
- self.enabled = set(enabled_features)
-
- self.window_start = time.time()
-
- self.raw_packets_buffer = []
-
- self.flows = defaultdict(lambda: {
- "pkt_count": 0,
- "byte_count": 0,
- "dst_ports": defaultdict(int),
- "src_ports": defaultdict(int),
- "tcp_flags": {
- "syn": 0, "fin": 0, "rst": 0, "ack": 0,
- "psh": 0, "urg": 0, "xmas": 0, "null": 0,
- },
- "sizes": [],
- "start_ts": None,
- "end_ts": None,
- "tcp_pkts": 0,
- "udp_pkts": 0,
- "icmp_pkts": 0,
- })
-
- def add_packet(self, pkt):
- now = time.time()
-
- if now - self.window_start >= self.window_duration:
- features = self._finish_window()
-
- self.window_start = now
-
- raw = list(self.raw_packets_buffer)
-
- self.raw_packets_buffer.clear()
- self.flows.clear()
-
- self._process_single_packet(pkt)
-
- return features, raw
-
- self._process_single_packet(pkt)
- return None
-
- def _process_single_packet(self, pkt):
- self.raw_packets_buffer.append(pkt)
-
- proto = None
- if IP in pkt:
- src = pkt[IP].src
- dst = pkt[IP].dst
- proto = pkt[IP].proto
- elif IPv6 in pkt:
- src = pkt[IPv6].src
- dst = pkt[IPv6].dst
- proto = pkt[IPv6].nh
- else:
- return
-
- key = (src, dst, proto)
- f = self.flows[key]
-
- now = time.time()
- if f["start_ts"] is None:
- f["start_ts"] = now
- f["end_ts"] = now
-
- size = len(pkt)
- f["pkt_count"] += 1
- f["byte_count"] += size
- f["sizes"].append(size)
-
- if TCP in pkt:
- f["tcp_pkts"] += 1
- dport = pkt[TCP].dport
- sport = pkt[TCP].sport
- f["dst_ports"][dport] += 1
- f["src_ports"][sport] += 1
-
- flags = pkt[TCP].flags
- if flags & 0x02: f["tcp_flags"]["syn"] += 1
- if flags & 0x01: f["tcp_flags"]["fin"] += 1
- if flags & 0x04: f["tcp_flags"]["rst"] += 1
- if flags & 0x10: f["tcp_flags"]["ack"] += 1
- if flags & 0x08: f["tcp_flags"]["psh"] += 1
- if flags & 0x20: f["tcp_flags"]["urg"] += 1
-
- if flags in [0x29, 0x3F, 0x3B]:
- f["tcp_flags"]["xmas"] += 1
- if flags == 0:
- f["tcp_flags"]["null"] += 1
-
- elif UDP in pkt:
- f["udp_pkts"] += 1
- dport = pkt[UDP].dport
- sport = pkt[UDP].sport
- f["dst_ports"][dport] += 1
- f["src_ports"][sport] += 1
-
- elif ICMP in pkt:
- f["icmp_pkts"] += 1
-
- def _finish_window(self):
- if not self.flows:
- return {}
-
- total_flows = len(self.flows)
- total_packets = 0
- total_bytes = 0
-
- tcp_flags_global = {
- "syn": 0, "fin": 0, "rst": 0, "ack": 0,
- "psh": 0, "urg": 0, "xmas": 0, "null": 0
- }
-
- all_sizes = []
- proto_tcp = 0
- proto_udp = 0
- proto_icmp = 0
-
- dst_port_counts = defaultdict(int)
- src_port_counts = defaultdict(int)
-
- for f in self.flows.values():
- total_packets += f["pkt_count"]
- total_bytes += f["byte_count"]
-
- for p, c in f["dst_ports"].items():
- dst_port_counts[p] += c
- for p, c in f["src_ports"].items():
- src_port_counts[p] += c
-
- for k in tcp_flags_global:
- tcp_flags_global[k] += f["tcp_flags"][k]
-
- all_sizes.extend(f["sizes"])
- proto_tcp += f["tcp_pkts"]
- proto_udp += f["udp_pkts"]
- proto_icmp += f["icmp_pkts"]
-
- window_len = self.window_duration
- total_pkts = total_packets if total_packets > 0 else 1
-
- feat = {}
-
- if "flow_count" in self.enabled: feat["flow_count"] = total_flows
- if "total_packets" in self.enabled: feat["total_packets"] = total_packets
- if "total_bytes" in self.enabled: feat["total_bytes"] = total_bytes
- if "avg_bytes_per_flow" in self.enabled: feat["avg_bytes_per_flow"] = total_bytes / total_flows if total_flows else 0
- if "pkt_rate" in self.enabled: feat["pkt_rate"] = total_packets / window_len
- if "byte_rate" in self.enabled: feat["byte_rate"] = total_bytes / window_len
-
- if "syn_count" in self.enabled: feat["syn_count"] = tcp_flags_global["syn"]
- if "fin_count" in self.enabled: feat["fin_count"] = tcp_flags_global["fin"]
- if "rst_count" in self.enabled: feat["rst_count"] = tcp_flags_global["rst"]
- if "ack_count" in self.enabled: feat["ack_count"] = tcp_flags_global["ack"]
- if "psh_count" in self.enabled: feat["psh_count"] = tcp_flags_global["psh"]
- if "urg_count" in self.enabled: feat["urg_count"] = tcp_flags_global["urg"]
-
- if "syn_ratio" in self.enabled: feat["syn_ratio"] = tcp_flags_global["syn"] / total_pkts
- if "fin_ratio" in self.enabled: feat["fin_ratio"] = tcp_flags_global["fin"] / total_pkts
- if "xmas_total" in self.enabled: feat["xmas_total"] = tcp_flags_global["xmas"]
- if "null_scan_total" in self.enabled: feat["null_scan_total"] = tcp_flags_global["null"]
-
- if "unique_dst_ports" in self.enabled: feat["unique_dst_ports"] = len(dst_port_counts)
- if "unique_src_ports" in self.enabled: feat["unique_src_ports"] = len(src_port_counts)
- if "port_entropy_dst" in self.enabled: feat["port_entropy_dst"] = entropy(dst_port_counts)
- if "port_entropy_src" in self.enabled: feat["port_entropy_src"] = entropy(src_port_counts)
-
- if all_sizes:
- if "avg_pkt_size" in self.enabled: feat["avg_pkt_size"] = mean(all_sizes)
- if "min_pkt_size" in self.enabled: feat["min_pkt_size"] = min(all_sizes)
- if "max_pkt_size" in self.enabled: feat["max_pkt_size"] = max(all_sizes)
- if "std_pkt_size" in self.enabled: feat["std_pkt_size"] = pstdev(all_sizes)
- else:
- for k in ["avg_pkt_size", "min_pkt_size", "max_pkt_size", "std_pkt_size"]:
- if k in self.enabled: feat[k] = 0
-
- if "avg_packets_per_flow" in self.enabled:
- feat["avg_packets_per_flow"] = total_packets / total_flows if total_flows else 0
- if "avg_bytes_per_packet" in self.enabled:
- feat["avg_bytes_per_packet"] = total_bytes / total_pkts
-
- if "proto_tcp_ratio" in self.enabled: feat["proto_tcp_ratio"] = proto_tcp / total_pkts
- if "proto_udp_ratio" in self.enabled: feat["proto_udp_ratio"] = proto_udp / total_pkts
- if "proto_icmp_ratio" in self.enabled: feat["proto_icmp_ratio"] = proto_icmp / total_pkts
-
- return feat
-
-
-FEATURE_LIST = [
- "flow_count",
- "total_packets",
- "total_bytes",
- "avg_bytes_per_flow",
- "pkt_rate",
- "byte_rate",
-
- "syn_count",
- "fin_count",
- "rst_count",
- "ack_count",
- "psh_count",
- "urg_count",
- "syn_ratio",
- "fin_ratio",
- "xmas_total",
- "null_scan_total",
-
- "unique_dst_ports",
- "unique_src_ports",
- "port_entropy_dst",
- "port_entropy_src",
-
- "avg_pkt_size",
- "min_pkt_size",
- "max_pkt_size",
- "std_pkt_size",
-
- "avg_packets_per_flow",
- "avg_bytes_per_packet",
- "proto_tcp_ratio",
- "proto_udp_ratio",
- "proto_icmp_ratio",
-]
-
-
-def entropy(values):
- if not values:
- return 0.0
-
- total = sum(values.values())
- if total == 0:
- return 0.0
-
- entropy_val = 0.0
- for count in values.values():
- p = count / total
- entropy_val -= p * math.log2(p)
-
- return entropy_val
diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc
deleted file mode 100644
index 6a32fac..0000000
--- a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index a74ee0c..0000000
--- a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc
deleted file mode 100644
index 0257359..0000000
--- a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index 8672061..0000000
--- a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc
deleted file mode 100644
index 227bfa8..0000000
--- a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc
deleted file mode 100644
index d6fcdfb..0000000
--- a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index a4db183..0000000
--- a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc
deleted file mode 100644
index 8a8d03a..0000000
--- a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc
deleted file mode 100644
index 28cc805..0000000
--- a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc
+++ /dev/null
Binary files differ
diff --git a/src/netmonitor/front/detector_profiles_tab.py b/src/netmonitor/front/detector_profiles_tab.py
deleted file mode 100644
index 7860e16..0000000
--- a/src/netmonitor/front/detector_profiles_tab.py
+++ /dev/null
@@ -1,77 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Button, Label, Switch
-from textual.containers import Vertical, Horizontal, VerticalScroll
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-from .detector_profiles_tab_pushscreens import ConfirmDeletePushScreen, ShowLogsPushScreen, ShowProfilePushScreen, SetDetectorNotificationPushScreen
-
-class DetectorProfilesTab(Vertical):
- def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.manager.on_refresh = self.refresh_profiles
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- yield VerticalScroll(id="profiles-list")
-
- def on_mount(self) -> None:
- self.refresh_profiles()
-
- def refresh_profiles(self) -> None:
- profiles_list = self.query_one("#profiles-list", VerticalScroll)
- profiles_list.remove_children()
-
- if not self.manager.profiles:
- return
-
- for profile in self.manager.profiles:
- row = Horizontal(
- Switch(id=f"activate-profile-switch-{profile.profile_name}", value=profile.is_active),
- Label(f"{profile.profile_name}", classes="profile-profile_name"),
- Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="primary"),
- Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action button-delete", variant="error"),
- classes="profile-row"
- )
- profiles_list.mount(row)
-
- @on(Switch.Changed)
- def any_switch_changed(self, event: Switch.Changed) -> None:
- switch_id = event.switch.id
- if not switch_id:
- return
- elif switch_id.startswith("activate-profile-switch"):
- profile_name = switch_id.removeprefix("activate-profile-switch-")
-
- if event.switch.value:
- is_turned_on = self.manager.turn_on_profile(profile_name)
- if not is_turned_on:
- event.switch.value = False
- else:
- is_turned_off = self.manager.turn_off_profile(profile_name)
- if not is_turned_off:
- event.switch.value = False
-
- @on(Button.Pressed)
- def on_any_button_pressed(self, event: Button.Pressed) -> None:
- button_id = event.button.id
- if not button_id:
- return
- elif button_id.startswith("show-profile-button-"):
- profile_name = button_id.removeprefix("show-profile-button-")
- self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name))
- elif button_id.startswith("set-notifications-button-"):
- profile_name = button_id.removeprefix("set-notifications-button-")
- self.app.push_screen(SetDetectorNotificationPushScreen(self.manager, profile_name))
- elif button_id.startswith("show-logs-button-"):
- profile_name = button_id.removeprefix("show-logs-button-")
- self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name))
- elif button_id.startswith("delete-"):
- profile_name = button_id.removeprefix("delete-")
- self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name))
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/detector_profiles_tab_pushscreens.py b/src/netmonitor/front/detector_profiles_tab_pushscreens.py
deleted file mode 100644
index fc5b731..0000000
--- a/src/netmonitor/front/detector_profiles_tab_pushscreens.py
+++ /dev/null
@@ -1,187 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.screen import ModalScreen
-from textual.widgets import Button, Label, Pretty, DataTable, Switch
-from textual.containers import Vertical, Horizontal, VerticalScroll, Container
-from textual_plotext import PlotextPlot
-
-from datetime import datetime
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-from ..back.detector_profile_HST import DetectorProfileHST
-
-class PlotTab(Container):
- def __init__(self, profile: DetectorProfileHST, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.profile = profile
- self.classes = "plot-card"
-
- def compose(self):
- yield PlotextPlot()
-
- def on_mount(self):
- self.set_interval(1, self.update_plot)
-
- def update_plot(self):
- plot_widget = self.query_one(PlotextPlot)
- plt = plot_widget.plt
-
- y = list(getattr(self.profile, "plot_data", []))
-
- plt.clear_figure()
- plt.theme("dark")
-
- plt.plot(y, marker="dot", color="green")
- plt.title("Anomaly Score")
- plt.ylabel("last 30 windows")
- plt.ylim(0, 1)
-
- threshold = self.profile.params.get("threshold", 0.7)
- if threshold is not None:
- plt.horizontal_line(float(threshold), color="red")
-
-
- plot_widget.refresh()
-
-
-class ShowProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager, profile_name: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.profile = self.manager.get_profile(profile_name)
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window large-modal"):
- yield Label(f"Profile: {self.profile_name}", classes="modal-header")
-
- with Horizontal(classes="modal-split-container"):
-
- with Vertical(classes="left-panel"):
- yield PlotTab(self.profile)
-
- with Vertical(classes="right-panel"):
- yield Label("Runtime Stats (Live)", classes="section-header")
- with VerticalScroll(classes="info-box", id="stats-box"):
- yield Pretty({}, id="runtime-stats-pretty")
-
- yield Label("Configuration", classes="section-header")
- with VerticalScroll(classes="info-box"):
- yield Pretty(self.profile.to_dict())
-
- with Container(classes="modal-footer"):
- yield Button("Close", id="cancel-button", variant="primary")
-
- def on_mount(self):
- self.set_interval(1.0, self.update_stats)
- self.update_stats()
-
- def update_stats(self):
- if self.profile:
- stats = self.profile.get_runtime_stats()
- self.query_one("#runtime-stats-pretty", Pretty).update(stats)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- self.dismiss(None)
-
-
-class ShowLogsPushScreen(ModalScreen[str]):
- def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window medium-modal"):
- yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header")
-
- with Container(classes="table-container"):
- yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Clear History", id="clear-button", variant="warning")
- yield Button("Close", id="cancel-button", variant="primary")
-
-
- def on_mount(self):
- table = self.query_one("#logs_table", DataTable)
- table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict")
-
- logs = self.manager.get_profile_logs(self.profile_name)
-
- if not logs:
- return
-
- sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True)
-
- for log in sorted_logs:
- dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S")
-
- score = f"{log.get('score', 0):.4f}"
- rate = f"{log.get('pkt_rate', 0):.1f}"
- proto = str(log.get("proto_info", "-"))
- verdict = "ANOMALY"
-
- table.add_row(dt, score, rate, proto, verdict)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- if event.button.id == "clear-button":
- profile = self.manager.get_profile(self.profile_name)
- if profile:
- profile.clear_logs()
- self.query_one("#logs_table", DataTable).clear()
- else:
- self.dismiss(None)
-
-class SetDetectorNotificationPushScreen(ModalScreen[str]):
- def __init__(self, manager, profile_name: str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.profile = self.manager.get_profile(profile_name)
-
- def compose(self) -> ComposeResult:
- current_val = getattr(self.profile, 'notify_enabled', False)
-
- with Container(classes="modal-window small-modal"):
- yield Label(f"Notification: {self.profile_name}", classes="modal-header")
-
- with Vertical(classes="section-card"):
- yield Label("Enable notification (Discord)")
- yield Switch(value=current_val, id="switch-anomaly")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="close-button", variant="primary")
-
- @on(Switch.Changed)
- def on_switch_changed(self, event: Switch.Changed):
- if event.switch.id == "switch-anomaly":
- self.profile.notify_enabled = event.value
- self.manager.try_save_profiles(notify=False)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- self.dismiss(None)
-
-class ConfirmDeletePushScreen(ModalScreen[str]):
- def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window small-modal"):
- yield Label("Are you sure?",classes="modal-header")
- with Horizontal(classes="modal-footer"):
- yield Button("Delete", id="confirm-button", variant="error")
- yield Button("Cancel", id="cancel-button", variant="default")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- self.manager.delete_profile(self.profile_name)
- self.dismiss(None)
- else:
- self.dismiss(None)
diff --git a/src/netmonitor/front/detector_tab.py b/src/netmonitor/front/detector_tab.py
deleted file mode 100644
index 53dc908..0000000
--- a/src/netmonitor/front/detector_tab.py
+++ /dev/null
@@ -1,135 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Input, Select, Button, Label, Checkbox
-from textual.containers import Container, Horizontal, Vertical, VerticalScroll
-
-import psutil
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-from ..back.window import FEATURE_LIST
-from ..front.detector_tab_pushscreens import SaveProfilePushScreen
-
-class DetectorTab(Container):
- def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- with Horizontal(id="top-config-container"):
-
- model_section = Container(id="model-section", classes="section-card")
- model_section.border_title = "Algorithm: HalfSpaceTrees"
- with model_section:
- with VerticalScroll(classes="detector-scroll"):
- yield Label("Select interface:", classes="label")
- try:
- ifaces = list(psutil.net_if_addrs().keys())
- except:
- ifaces = []
-
- yield Select.from_values(
- ifaces,
- id="interface-select",
- allow_blank=False,
- classes="input"
- )
-
- yield Label("Model params:", classes="label")
- yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input")
- yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input")
- yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input")
- yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input")
- yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input")
- yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input")
- yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input")
-
- features_section = Container(id="features-section", classes="section-card")
- features_section.border_title = "Flow-based Features"
- with features_section:
- with VerticalScroll(classes="detector-scroll"):
- yield Label("Select features to include in model:", classes="label")
- self.feature_checkboxes = {}
- for feat in FEATURE_LIST:
- cb = Checkbox(feat, value=True, classes="input")
- self.feature_checkboxes[feat] = cb
- yield cb
-
- bpf_section = Container(id="bpf-section", classes="section-card")
- bpf_section.border_title = "BPF Filter (Optional)"
- with bpf_section:
- yield Input(
- placeholder="BPF Filter (e.g. 'tcp port 80 or udp')",
- id="param-bpf_filter",
- classes="input full"
- )
- with Container(classes="save-button-container"):
- yield Button("Save Profile", id="save-button", variant="success")
-
- def get_inputs(self):
- features = [f for f, cb in self.feature_checkboxes.items() if cb.value]
-
- if not features:
- raise ValueError("Select at least one feature.")
-
- params = {}
-
- try:
- interface = self.query_one("#interface-select", Select).value
- if not interface:
- raise ValueError("Select interface.")
- params["interface"] = interface
- except Exception:
- raise ValueError("Interface selection error.")
-
- bpf_input = self.query_one("#param-bpf_filter", Input)
- if bpf_input.value.strip():
- params["bpf_filter"] = bpf_input.value.strip()
-
- defaults = {
- "trees": 10,
- "height": 8,
- "window": 250,
- "seed": 42,
- "threshold": 0.7,
- "window_duration": 10.0,
- "queue_size": 10000,
- "bpf_filter": ""
- }
-
- model_section = self.query_one("#model-section")
- for inp in model_section.query("Input"):
- if inp.id and inp.id.startswith("param-"):
- key = inp.id.removeprefix("param-")
- if key == "bpf_filter": continue
-
- val_str = inp.value.strip()
-
- if not val_str:
- if key in defaults and defaults[key] is not None:
- params[key] = defaults[key]
- continue
-
- try:
- if key in ["trees", "height", "window", "seed","queue_size"]:
- params[key] = int(val_str)
- elif key in ["threshold", "window_duration"]:
- params[key] = float(val_str)
- except ValueError:
- raise ValueError(f"Param '{key}' must be a number.")
-
- return {
- "features": features,
- "params": params,
- }
-
- @on(Button.Pressed, "#save-button")
- async def handle_save_button(self, event: Button.Pressed):
- try:
- input_data = self.get_inputs()
- self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data))
- except ValueError as e:
- self.app.notify(str(e), title="Validation error", severity="error")
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/detector_tab_pushscreens.py b/src/netmonitor/front/detector_tab_pushscreens.py
deleted file mode 100644
index 625921b..0000000
--- a/src/netmonitor/front/detector_tab_pushscreens.py
+++ /dev/null
@@ -1,31 +0,0 @@
-from textual.app import ComposeResult
-from textual.widgets import Input, Label, Button
-from textual.containers import Vertical, Horizontal
-from textual.screen import ModalScreen
-from textual import on
-
-from ..back.detector_profiles_manager import DetectorProfilesManager
-
-class SaveProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager: DetectorProfilesManager, input_data , *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.input_data = input_data
-
-
- def compose(self) -> ComposeResult:
- with Vertical(classes="modal-window small-modal"):
- yield Label("Save scan profile", classes="modal-header")
- yield Input(placeholder="Profile name:", id="profile-name", classes="input")
- with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"):
- yield Button("Confirm", id="confirm-button", variant="success", classes="button")
- yield Button("Cancel", id="cancel-button", variant="error", classes="button")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- profile_name = self.query_one("#profile-name", Input).value.strip()
- self.manager.add_profile(profile_name,self.input_data)
- self.dismiss(None)
- else:
- self.dismiss(None)
diff --git a/src/netmonitor/front/options_tab.py b/src/netmonitor/front/options_tab.py
deleted file mode 100644
index 6cbd448..0000000
--- a/src/netmonitor/front/options_tab.py
+++ /dev/null
@@ -1,44 +0,0 @@
-from textual.app import ComposeResult
-from textual.containers import Container, Horizontal
-from textual.widgets import Button, Input, Label
-from textual import on
-
-from ..back.notification_service import notification_service
-
-class OptionsTab(Container):
- def __init__(self, scanner_manager, detector_manager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.scanner_manager = scanner_manager
- self.detector_manager = detector_manager
-
- def compose(self) -> ComposeResult:
- notify_section = Container(id="notify-section", classes="section-card")
- notify_section.border_title = "Notification config"
- with notify_section:
- yield Label("Discord Webhook URL:")
- yield Input(placeholder="https://discord.com/api/webhooks/...", id="input-webhook-url")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Save config", id="save-config", variant="success")
- yield Button("notification test", id="test-notif", variant="primary")
-
- def on_mount(self):
- self.query_one("#input-webhook-url", Input).value = notification_service.webhook_url
-
- @on(Button.Pressed, "#save-config")
- def save_configuration(self):
- url = self.query_one("#input-webhook-url", Input).value.strip()
-
- if notification_service.save_config(url):
- self.app.notify("Config saved", severity="information")
- else:
- self.app.notify("Error during saving", severity="error")
-
- @on(Button.Pressed, "#test-notif")
- def test_notification(self):
- success = notification_service.send_message("**Test NetMonitor**\n")
-
- if success:
- self.app.notify("good", severity="information")
- else:
- self.app.notify("bad", severity="error")
diff --git a/src/netmonitor/front/scanner_profiles_tab.py b/src/netmonitor/front/scanner_profiles_tab.py
deleted file mode 100644
index 8e8610c..0000000
--- a/src/netmonitor/front/scanner_profiles_tab.py
+++ /dev/null
@@ -1,84 +0,0 @@
-from textual.widgets import Button, Label, Switch
-from textual import on
-from textual.app import ComposeResult
-from textual.containers import Vertical, Horizontal, VerticalScroll
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-from .scanner_profiles_tab_pushscreens import ScanNowPushScreen, SetSchedulerPushScreen, ShowLogsPushScreen, SetNotifacationOptionsPushScreen, ShowProfilePushScreen, ConfirmDeletePushScreen
-
-class ScannerProfilesTab(Vertical):
- def __init__(self, manager: ScannerProfilesManager, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.manager.on_refresh = self.refresh_profiles
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- yield VerticalScroll(id="profiles-list")
-
- def on_mount(self) -> None:
- self.refresh_profiles()
-
- def refresh_profiles(self) -> None:
- profiles_list = self.query_one("#profiles-list", VerticalScroll)
- profiles_list.remove_children()
-
- if not self.manager.profiles:
- return
-
- for profile in self.manager.profiles:
- row = Horizontal(
- Switch(id=f"switch-{profile.profile_name}", value=profile.is_active),
- Button("Scan Now", id=f"scan-now-button-{profile.profile_name}", classes="profile-action", variant="success"),
- Label(f"{profile.profile_name}", classes="profile-profile_name"),
- Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="primary"),
- Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Set scheduler", id=f"set-scheduler-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"),
- Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action", variant="error"),
- classes="profile-row"
- )
- profiles_list.mount(row)
-
- @on(Switch.Changed)
- def switch_changed(self, event: Switch.Changed) -> None:
- switch_id = event.switch.id
- if not switch_id:
- return
- profile_name = switch_id.removeprefix("switch-")
-
- if event.switch.value:
- is_turned_on = self.manager.turn_on_profile(profile_name)
- if not is_turned_on:
- event.switch.value = False
- else:
- is_turned_off = self.manager.turn_off_profile(profile_name)
- if not is_turned_off:
- event.switch.value = False
-
- @on(Button.Pressed)
- def on_any_button_pressed(self, event: Button.Pressed) -> None:
- button_id = event.button.id
- if not button_id:
- return
- if button_id.startswith("scan-now-button-"):
- profile_name = button_id.removeprefix("scan-now-button-")
- self.app.push_screen(ScanNowPushScreen(self.manager, profile_name))
- elif button_id.startswith("show-profile-button-"):
- profile_name = button_id.removeprefix("show-profile-button-")
- self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name))
- elif button_id.startswith("set-scheduler-button-"):
- profile_name = button_id.removeprefix("set-scheduler-button-")
- self.app.push_screen(SetSchedulerPushScreen(self.manager, profile_name))
- elif button_id.startswith("set-notifications-button-"):
- profile_name = button_id.removeprefix("set-notifications-button-")
- self.app.push_screen(SetNotifacationOptionsPushScreen(self.manager, profile_name))
- elif button_id.startswith("delete-"):
- profile_name = button_id.removeprefix("delete-")
- self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name))
- elif button_id.startswith("show-logs-button-"):
- profile_name = button_id.removeprefix("show-logs-button-")
- self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name))
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py
deleted file mode 100644
index 5be9941..0000000
--- a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py
+++ /dev/null
@@ -1,180 +0,0 @@
-from textual.app import ComposeResult
-from textual import on
-from textual.screen import ModalScreen
-from textual.widgets import Input, Button, Label, Pretty, Select, Switch
-from textual.containers import Vertical, Horizontal, VerticalScroll, Container
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-
-class ScanNowPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window large-modal"):
- yield Label(f"Scanning: {self.profile_name}", classes="modal-header")
- with VerticalScroll(classes="info-box"):
- yield Pretty(self.manager.get_profile(self.profile_name).scan())
- with Horizontal(classes="modal-footer"):
- yield Button("Close", variant="primary")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- self.dismiss(None)
-
-class ShowProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window medium-modal"):
- yield Label(f"Profile: {self.profile_name}", classes="modal-header")
- with VerticalScroll(classes="info-box"):
- yield Pretty(self.manager.get_profile(self.profile_name).to_dict())
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="cancel-button", variant="primary")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- self.dismiss(None)
-
-
-class SetSchedulerPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window small-modal"):
- yield Label("Set CRON Scheduler", classes="modal-header")
- yield Label("Format: min hour day month day_of_week", classes="label")
- yield Input(placeholder="* * * * * ", id="cron-input", classes="input")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Confirm", id="confirm-button", variant="success")
- yield Button("Cancel", id="cancel-button", variant="error")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- cron_input = self.query_one("#cron-input", Input).value.strip()
- if cron_input:
- self.manager.set_validated_scheduler(self.profile_name, cron_input)
- self.dismiss(None)
- else:
- self.dismiss(None)
-
-class SetNotifacationOptionsPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.profile = self.manager.get_profile(profile_name)
-
- def compose(self) -> ComposeResult:
- current_enabled = getattr(self.profile, 'notify_enabled', False)
- current_cve_only = getattr(self.profile, 'notify_only_cve', False)
-
- with Container(classes="modal-window small-modal"):
- yield Label(f"Notifications: {self.profile_name}", classes="modal-header")
-
- with Vertical(classes="section-card"):
- yield Label("Enable notifications (Discord):")
- yield Switch(value=current_enabled, id="switch-enable")
-
- yield Label("Notify only when scanner finds CVE")
- yield Switch(value=current_cve_only, id="switch-cve-only")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="close-button", variant="primary")
-
- @on(Switch.Changed)
- def on_switch_changed(self, event: Switch.Changed):
- if event.switch.id == "switch-enable":
- self.profile.notify_enabled = event.value
- elif event.switch.id == "switch-cve-only":
- self.profile.notify_only_cve = event.value
-
- self.manager.try_save_profiles(notify=False)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- self.dismiss(None)
-
-class ShowLogsPushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
- self.logs = []
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window large-modal"):
- yield Label(f"Logs: {self.profile_name}", classes="modal-header")
-
- yield Select([], prompt="Select date", id="scan-date-select")
-
- with VerticalScroll(classes="info-box"):
- yield Pretty({}, id="log-content")
-
- with Horizontal(classes="modal-footer"):
- yield Button("Close", id="cancel-button", variant="primary")
-
- def on_mount(self):
- self.logs = self.manager.get_profile_logs(self.profile_name)
- select = self.query_one("#scan-date-select", Select)
- options = []
-
- if self.logs:
- for index, log in enumerate(reversed(self.logs)):
- real_index = len(self.logs) - 1 - index
- label = log.get('_timestamp', f"Scan #{real_index + 1} (no date)")
-
- options.append((str(label), real_index))
-
- select.set_options(options)
-
- if options:
- select.value = options[0][1]
-
- @on(Select.Changed, "#scan-date-select")
- def on_date_selected(self, event: Select.Changed):
- if event.value is not None:
- index = event.value
- if index == Select.BLANK:
- self.query_one("#log-content", Pretty).update({})
- return
- if 0 <= index < len(self.logs):
- log_entry = self.logs[index]
- self.query_one("#log-content", Pretty).update(log_entry)
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed) -> None:
- self.dismiss(None)
-
-
-class ConfirmDeletePushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.profile_name = profile_name
-
- def compose(self) -> ComposeResult:
- with Container(classes="modal-window small-modal"):
- yield Label("Are you sure?",classes="modal-header")
- with Horizontal(classes="modal-footer"):
- yield Button("Delete", id="confirm-button", variant="error")
- yield Button("Cancel", id="cancel-button", variant="default")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- self.manager.delete_profile(self.profile_name)
- self.dismiss(None)
- else:
- self.dismiss(None)
diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py
deleted file mode 100755
index 868fa74..0000000
--- a/src/netmonitor/front/scanner_tab.py
+++ /dev/null
@@ -1,277 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty
-from textual.widgets.selection_list import Selection
-from textual.containers import Container, Vertical, Horizontal, VerticalScroll
-from textual.events import Mount
-
-import psutil, ipaddress, shlex
-from typing import List, Dict, Optional
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-from .scanner_tab_pushscreens import SaveProfilePushScreen
-
-class ScannerTab(Container):
-
- def __init__(self, manager: ScannerProfilesManager):
- super().__init__()
- self.manager = manager
- self.manager.on_message = self.on_manager_message
-
- def compose(self) -> ComposeResult:
- friendly_section = Container(id="friendly-command-section", classes="section card")
- friendly_section.border_title = "Scanner Configuration"
-
- with friendly_section:
- yield Label("Interface:", classes="label")
- try:
- interfaces = list(psutil.net_if_addrs().keys())
- except Exception:
- interfaces = []
-
- yield Select.from_values(
- interfaces,
- id="interface-select",
- allow_blank=False,
- classes="input"
- )
-
- yield Label("IP address:", classes="label")
- yield Input(
- value="127.0.0.1/32",
- placeholder="192.168.0.0/24",
- name="ip-input",
- id="ip",
- classes="input"
- )
-
- yield Label("Port range (or single port)", classes="label")
- with Horizontal(id="port-range", classes="input-row"):
- yield Input(placeholder="Start / Single", id="low-port-range", classes="input half")
- yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half")
-
- yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox")
-
- yield Label("Scan options:", classes="label")
- with Horizontal(classes="selection-row"):
- yield SelectionList[str](
- Selection("Service version (-sV)", "-sV", False),
- Selection("OS detection (-O)", "-O", False),
- Selection("SYN scan (-sS)", "-sS", False),
- Selection("UDP scan (-sU)", "-sU", False),
- Selection("Fast scan (-F)", "-F", False),
- Selection("Aggressive scan (-A)", "-A", False),
- Selection("Ping skip (-Pn)", "-Pn", False),
- Selection("Timing T4 (-T4)", "-T4", True),
- classes="selection-list"
- )
-
- user_section = Container(id="user-command-section", classes="section card")
- user_section.border_title = "Manual Command"
-
- with user_section:
- yield Input(
- placeholder="sudo nmap -sS 192.168.1.1",
- name="user-nmap-input",
- id="usercommand",
- classes="input full"
- )
-
-
- final_section = Container(id="final-section", classes="section card")
- final_section.border_title = "Execution"
-
- with final_section:
- with Vertical(id="final-command", classes="command-output"):
- self.final_command = Pretty([], id="final-command-output")
- yield self.final_command
- with Vertical(id="buttons", classes="buttons-column"):
- yield Button("Scan now", id="scan-button", variant="success")
- yield Button("Save profile", id="save-button", variant="primary")
-
- results_section = Container(id="results-section", classes="section card")
- results_section.border_title = "Scan Output"
-
- with results_section:
- with VerticalScroll(id="results-scroll"):
- self.results = Pretty([], id="results", classes="results")
- yield self.results
-
- def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]:
- errors: List[str] = []
-
- if self.query_one("#cve-checkbox", Checkbox).value:
- args = nmap_input.get("arguments", "")
- if "-sV" not in args.split():
- errors.append("To check for CVE's, '-sV' option is required")
- return errors
-
- targets = nmap_input.get("targets", "").strip()
- if not targets:
- errors.append("IP field cannot be empty")
- return errors
-
- if not self.query_one("#usercommand", Input).value.strip():
- try:
- ipaddress.ip_network(targets, strict=False)
- except ValueError:
- errors.append(f"Invalid IP address/CIDR: {targets}")
- return errors
-
- if "ports" in nmap_input:
- port_str = nmap_input["ports"]
- if "-" in port_str:
- try:
- low, high = map(int, port_str.split("-"))
- if not (1 <= low <= 65535 and 1 <= high <= 65535):
- errors.append("Ports must be in range 1-65535")
- if low > high:
- errors.append("Start port cannot be greater than end port")
- except ValueError:
- errors.append("Ports (range) must be numbers")
- else:
- try:
- port = int(port_str)
- if not (1 <= port <= 65535):
- errors.append("Port must be in range 1-65535")
- except ValueError:
- errors.append("Port must be a number")
-
- dry_run_err = self.dry_run_nmap_command(nmap_input)
- if dry_run_err:
- errors.append(dry_run_err)
- return errors
-
- def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]:
- try:
- cmd = ["nmap"]
- if "arguments" in nmap_input:
- cmd += shlex.split(nmap_input["arguments"])
- if "ports" in nmap_input:
- cmd += ["-p", nmap_input["ports"]]
- cmd.append(nmap_input["targets"])
- return None
- except Exception as e:
- return f"Błąd weryfikacji komendy: {e}"
-
- def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict:
- user_command = self.query_one("#usercommand", Input).value.strip()
- low_port = self.query_one("#low-port-range", Input).value.strip()
- high_port = self.query_one("#high-port-range", Input).value.strip()
- interface = self.query_one("#interface-select", Select).value
- targets = self.query_one("#ip", Input).value.strip()
- options = self.query_one(SelectionList).selected
-
- if return_dict:
- if user_command:
- parts = shlex.split(user_command)
-
- if parts and parts[0] == "sudo": parts.pop(0)
- if parts and parts[0] == "nmap": parts.pop(0)
-
- if not parts:
- return {}
-
- target = parts[-1]
- args = parts[:-1]
-
- return {
- "targets": target,
- "arguments": " ".join(args)
- }
- else:
- nmap_dict = {"targets": targets}
-
- if low_port:
- if high_port:
- nmap_dict["ports"] = f"{low_port}-{high_port}"
- else:
- nmap_dict["ports"] = f"{low_port}"
-
- args: list[str] = []
- if options:
- args.extend(options)
- if interface:
- args.extend(["-e", str(interface)])
- if args:
- nmap_dict["arguments"] = " ".join(args)
-
- return {k: v for k, v in nmap_dict.items() if v is not None}
-
- if user_command:
- return shlex.split(user_command)
-
- parts = ["nmap"]
- if interface:
- parts.extend(["-e", str(interface)])
- if options:
- parts.extend(options)
-
- if low_port:
- if high_port:
- parts.extend(["-p", f"{low_port}-{high_port}"])
- else:
- parts.extend(["-p", f"{low_port}"])
-
- parts.append(targets)
- return parts
-
- @on(Mount)
- @on(SelectionList.SelectedChanged)
- @on(Input.Changed)
- @on(Select.Changed)
- def update_selected_view(self, event=None) -> None:
- self.final_command.update(" ".join(self.get_nmap_input()))
-
- @on(Button.Pressed, "#scan-button")
- async def handle_scan_button(self, event: Button.Pressed) -> None:
- nmap_input = self.get_nmap_input(return_dict=True)
- if not isinstance(nmap_input, dict):
- self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
- return
- errors = self.validate_inputs(nmap_input)
- if errors:
- self.notify("\n".join(errors), title="Bad Input", severity="error")
- else:
- try:
- temp_prof_name = "temp_scan_profile"
- self.notify("Start scanning", title="Notification", severity="information")
-
- if not self.manager.add_profile(temp_prof_name, notify=False):
- self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error")
- return
-
- if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False):
- self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error")
- self.manager.delete_profile(temp_prof_name, notify=False)
- return
-
- profile = self.manager.get_profile(temp_prof_name)
- if profile:
- scan_now_results = profile.scan()
- self.results.update(scan_now_results)
- self.manager.delete_profile(temp_prof_name, notify=False)
- self.notify("Scanning completed", title="Notification", severity="information")
- else:
- self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error")
-
- except Exception as e:
- self.notify(f"Scanner error: {e}", title="Error", severity="error")
-
- @on(Button.Pressed, "#save-button")
- async def handle_save_button(self, event: Button.Pressed):
- nmap_input = self.get_nmap_input(return_dict=True)
- if not isinstance(nmap_input, dict):
- self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
- return
-
- errors = self.validate_inputs(nmap_input)
- if errors:
- self.notify("\n".join(errors), title="Bad Input", severity="error")
- return
-
- cve_check = self.query_one("#cve-checkbox", Checkbox).value
- self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check))
-
- def on_manager_message(self, msg: str, title: str, severity):
- self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/scanner_tab_pushscreens.py b/src/netmonitor/front/scanner_tab_pushscreens.py
deleted file mode 100644
index 4e4a564..0000000
--- a/src/netmonitor/front/scanner_tab_pushscreens.py
+++ /dev/null
@@ -1,35 +0,0 @@
-from textual import on
-from textual.app import ComposeResult
-from textual.widgets import Input, Button, Label
-from textual.containers import Vertical, Horizontal
-from textual.screen import ModalScreen
-
-from ..back.scanner_profiles_manager import ScannerProfilesManager
-
-class SaveProfilePushScreen(ModalScreen[str]):
- def __init__(self, manager: ScannerProfilesManager, nmap_input, cve_check, *args, **kwargs):
- super().__init__(*args, **kwargs)
- self.manager = manager
- self.nmap_input = nmap_input
- self.cve_check = cve_check
-
-
- def compose(self) -> ComposeResult:
- with Vertical(classes="modal-window small-modal"):
- yield Label("Save scan profile", classes="modal-header")
- yield Input(placeholder="Profile name:", id="profile-name", classes="input")
- with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"):
- yield Button("Confirm", id="confirm-button", variant="success", classes="button")
- yield Button("Cancel", id="cancel-button", variant="error", classes="button")
-
- @on(Button.Pressed)
- async def on_button_pressed(self, event: Button.Pressed):
- if event.button.id == "confirm-button":
- profile_name = self.query_one("#profile-name", Input).value.strip()
- self.manager.add_profile(profile_name)
- self.manager.update_profile(profile_name,"nmap_input",self.nmap_input,notify=False)
- self.manager.update_profile(profile_name,"cve",self.cve_check, notify=False)
- self.dismiss(None)
- else:
- self.dismiss(None)
-
diff --git a/src/netmonitor/styles/styles.css b/src/netmonitor/styles/styles.css
deleted file mode 100644
index c004b67..0000000
--- a/src/netmonitor/styles/styles.css
+++ /dev/null
@@ -1,250 +0,0 @@
-/* Już sam nie wiem co sie dzieje w tym pliku, ale nie obchodzi mnie to */
-ScannerTab {
- layout: grid;
- grid-size: 3 5;
- grid-gutter: 1;
-}
-
-ScannerTab #friendly-command-section,
-ScannerTab #user-command-section,
-ScannerTab #results-section {
- padding: 1;
- border: round $primary;
-}
-
-ScannerTab #friendly-command-section {
- row-span: 5;
- column-span: 1;
-}
-
-ScannerTab #user-command-section {
- row-span: 1;
- column-span: 2;
-}
-
-ScannerTab #results-section {
- row-span: 3;
- column-span: 2;
-}
-
-ScannerTab Label {
- padding: 0 1;
-}
-
-ScannerTab Input,
-ScannerTab MaskedInput,
-ScannerTab Select {
- width: 1fr;
- margin: 0 0 1 0;
-}
-
-ScannerTab #final-section {
- row-span: 1;
- column-span: 3;
- layout: grid;
- grid-size: 3 1;
- grid-gutter: 1;
- padding: 0;
- border: round $primary;
-}
-
-ScannerTab #final-command {
- column-span: 2;
- padding: 1;
- align: center middle;
-}
-
-ScannerTab #final-command-output {
- border: solid $secondary;
- overflow: auto;
-}
-
-ScannerTab #buttons {
- column-span: 1;
- layout: vertical;
- align: center middle;
- border-left: solid $primary 20%;
-}
-
-ScannerTab #buttons Button {
- width: 90%;
- padding: 1;
-}
-
-ScannerTab #scan-button {
- border: none;
-}
-
-ScannerTab #save-button {
- border: none;
-}
-
-
-
-ScannerTab #results-scroll {
- height: 1fr;
- width: 100%;
- scrollbar-color: $primary ;
- scrollbar-background: $background;
- scrollbar-size-horizontal: 1;
-}
-
-
-
-DetectorTab Button {
- content-align: center middle;
-}
-
-DetectorTab Checkbox {
- width: 100%;
-}
-
-.save-button-container {
- align: center middle;
- height: auto;
- margin-top: 1;
- width: 100%;
-}
-
-.profile-row {
- layout: horizontal;
- align-vertical: middle;
- height: auto;
- min-height: 3;
-
- background: $surface;
- border-left: solid $primary;
- margin-bottom: 1;
- padding: 1;
-}
-
-.profile-row:hover {
- background: $surface-lighten-1;
- border-left: solid $accent;
-}
-
-.profile-profile_name {
- width: 2fr;
- text-style: bold;
- color: $text;
- padding: 1;
- content-align: center middle;
-}
-
-.section-card {
- padding: 1;
- border: round $primary;
- height: auto;
-}
-
-.detector-scroll {
- scrollbar-color: $primary ;
- scrollbar-background: $background;
- scrollbar-size-horizontal: 1;
-}
-
-
-ModalScreen {
- align: center middle;
-
-}
-
-.modal-window {
- background: $surface;
- border: tall $primary;
- padding: 1 2;
- layout: vertical;
-}
-
-.small-modal {
- width: 50;
- height: auto;
-}
-
-.medium-modal {
- width: 80;
- height: 80%;
-}
-
-.large-modal {
- width: 95%;
- height: 95%;
-}
-
-.modal-header {
- width: 100%;
- text-align: center;
- text-style: bold;
- color: $accent;
- border-bottom: solid $secondary;
- padding-bottom: 1;
- margin-bottom: 1;
-}
-
-.modal-footer {
- height: auto;
- width: 100%;
- align: center middle;
- padding-top: 1;
- dock: bottom;
-}
-
-.modal-footer Button {
- margin: 0 1;
- min-width: 15;
-}
-
-.modal-split-container {
- width: 100%;
- height: 1fr;
-}
-
-.left-panel {
- width: 65%;
- height: 100%;
- margin-right: 1;
-}
-
-.right-panel {
- width: 35%;
- height: 100%;
- border-left: solid $secondary;
- padding-left: 1;
-}
-
-.plot-card {
- height: 1fr;
- border: none;
- margin-bottom: 1;
- background: $surface-lighten-1;
-}
-
-.section-header {
- text-align: center;
- color: $text-muted;
- margin-bottom: 1;
-}
-
-.info-box {
- width: 100%;
- height: 1fr;
- border: tall $surface-lighten-2;
- background: $surface-darken-1;
- padding: 1;
- overflow: auto;
- scrollbar-color: $primary ;
- scrollbar-background: $background;
- scrollbar-size-horizontal: 1;
-}
-
-.table-container {
- width: 100%;
- height: 1fr;
- border: solid $secondary;
-}
-
-
-.label-muted {
- color: $text-muted;
- text-align: center;
-}