summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/netmonitor/__init__.py0
-rw-r--r--src/netmonitor/__pycache__/__init__.cpython-312.pycbin0 -> 159 bytes
-rw-r--r--src/netmonitor/__pycache__/app.cpython-312.pycbin0 -> 4846 bytes
-rw-r--r--src/netmonitor/app.py74
-rw-r--r--src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pycbin0 -> 11545 bytes
-rw-r--r--src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pycbin0 -> 10030 bytes
-rw-r--r--src/netmonitor/back/__pycache__/flow_features.cpython-312.pycbin0 -> 1145 bytes
-rw-r--r--src/netmonitor/back/__pycache__/flow_table.cpython-312.pycbin0 -> 9184 bytes
-rw-r--r--src/netmonitor/back/__pycache__/notification_service.cpython-312.pycbin0 -> 2912 bytes
-rw-r--r--src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pycbin0 -> 7203 bytes
-rw-r--r--src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pycbin0 -> 12392 bytes
-rw-r--r--src/netmonitor/back/__pycache__/window.cpython-312.pycbin0 -> 9507 bytes
-rw-r--r--src/netmonitor/back/detector_profile_HST.py215
-rw-r--r--src/netmonitor/back/detector_profiles_manager.py153
-rw-r--r--src/netmonitor/back/notification_service.py49
-rw-r--r--src/netmonitor/back/scanner_profile.py138
-rw-r--r--src/netmonitor/back/scanner_profiles_manager.py177
-rw-r--r--src/netmonitor/back/window.py251
-rw-r--r--src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pycbin0 -> 5924 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pycbin0 -> 13721 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_tab.cpython-312.pycbin0 -> 7672 bytes
-rw-r--r--src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pycbin0 -> 2798 bytes
-rw-r--r--src/netmonitor/front/__pycache__/options_tab.cpython-312.pycbin0 -> 3442 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pycbin0 -> 6552 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pycbin0 -> 14133 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pycbin0 -> 14995 bytes
-rw-r--r--src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pycbin0 -> 3075 bytes
-rw-r--r--src/netmonitor/front/detector_profiles_tab.py77
-rw-r--r--src/netmonitor/front/detector_profiles_tab_pushscreens.py187
-rw-r--r--src/netmonitor/front/detector_tab.py135
-rw-r--r--src/netmonitor/front/detector_tab_pushscreens.py31
-rw-r--r--src/netmonitor/front/options_tab.py44
-rw-r--r--src/netmonitor/front/scanner_profiles_tab.py84
-rw-r--r--src/netmonitor/front/scanner_profiles_tab_pushscreens.py180
-rwxr-xr-xsrc/netmonitor/front/scanner_tab.py277
-rw-r--r--src/netmonitor/front/scanner_tab_pushscreens.py35
-rw-r--r--src/netmonitor/styles/styles.css250
37 files changed, 2357 insertions, 0 deletions
diff --git a/src/netmonitor/__init__.py b/src/netmonitor/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/src/netmonitor/__init__.py
diff --git a/src/netmonitor/__pycache__/__init__.cpython-312.pyc b/src/netmonitor/__pycache__/__init__.cpython-312.pyc
new file mode 100644
index 0000000..bf05138
--- /dev/null
+++ b/src/netmonitor/__pycache__/__init__.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/__pycache__/app.cpython-312.pyc b/src/netmonitor/__pycache__/app.cpython-312.pyc
new file mode 100644
index 0000000..38fa57a
--- /dev/null
+++ b/src/netmonitor/__pycache__/app.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/app.py b/src/netmonitor/app.py
new file mode 100644
index 0000000..e60108f
--- /dev/null
+++ b/src/netmonitor/app.py
@@ -0,0 +1,74 @@
+from textual.app import App, ComposeResult
+from textual.widgets import TabbedContent, TabPane
+from textual.theme import Theme
+
+from apscheduler.schedulers.background import BackgroundScheduler
+from pathlib import Path
+import os
+
+from .front.scanner_tab import ScannerTab
+from .front.scanner_profiles_tab import ScannerProfilesTab
+from .front.detector_tab import DetectorTab
+from .front.detector_profiles_tab import DetectorProfilesTab
+from .front.options_tab import OptionsTab
+
+from .back.scanner_profiles_manager import ScannerProfilesManager
+from .back.detector_profiles_manager import DetectorProfilesManager
+
+
+XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share"))
+
+theme = Theme(
+ name="pastel_blue_theme",
+ primary="#82A6F2",
+ secondary="#778899",
+ accent="#E0FFFF",
+ # background="#1a1b26",
+ surface="#1e1e20",
+ error="#ffb3ba",
+ success="#baffc9",
+ warning="#ffffba",
+)
+
+class NetMonitor(App):
+ CSS_PATH = "styles/styles.css"
+
+ def __init__(self):
+ super().__init__()
+ self.scheduler = BackgroundScheduler()
+ self.scheduler.start()
+ self.scanner_profiles_manager = ScannerProfilesManager(profiles_file=f"{XDG_DATA_HOME}/netmonitor/objects/scanner_profiles_objects", scheduler=self.scheduler)
+ self.detector_profiles_manager = DetectorProfilesManager(profiles_file=f"{XDG_DATA_HOME}/netmonitor/objects/detector_profiles_objects")
+
+ def compose(self) -> ComposeResult:
+ with TabbedContent():
+ with TabPane(title="Scanner", id="scanner", classes="scanner-theme"):
+ with TabbedContent():
+ with TabPane(title="Scan"):
+ yield ScannerTab(self.manager.scanner_profiles_manager)
+ with TabPane(title="Profiles"):
+ yield ScannerProfilesTab(self.manager.scanner_profiles_manager)
+
+ with TabPane(title="Detector", id="detector", classes="detector-theme"):
+ with TabbedContent():
+ with TabPane(title="Models"):
+ yield DetectorTab(self.manager.detector_profiles_manager)
+ with TabPane(title="Profiles"):
+ yield DetectorProfilesTab(self.manager.detector_profiles_manager)
+
+ with TabPane(title="Options", id="options", classes="options-theme"):
+ yield OptionsTab(self.scanner_profiles_manager, self.detector_profiles_manager)
+
+ def on_mount(self):
+ self.register_theme(theme)
+ self.theme = "pastel_blue_theme"
+
+ @property
+ def manager(self):
+ return self
+
+def main():
+ NetMonitor().run()
+
+if __name__ == "__main__":
+ main()
diff --git a/src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pyc b/src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pyc
new file mode 100644
index 0000000..6dc15c8
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/detector_profile_HST.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pyc b/src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pyc
new file mode 100644
index 0000000..b17c763
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/detector_profiles_manager.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/flow_features.cpython-312.pyc b/src/netmonitor/back/__pycache__/flow_features.cpython-312.pyc
new file mode 100644
index 0000000..b5e28d0
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/flow_features.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/flow_table.cpython-312.pyc b/src/netmonitor/back/__pycache__/flow_table.cpython-312.pyc
new file mode 100644
index 0000000..ffb1702
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/flow_table.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/notification_service.cpython-312.pyc b/src/netmonitor/back/__pycache__/notification_service.cpython-312.pyc
new file mode 100644
index 0000000..71ee4ce
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/notification_service.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pyc b/src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pyc
new file mode 100644
index 0000000..0c8b662
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/scanner_profile.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pyc b/src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pyc
new file mode 100644
index 0000000..f9dc637
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/scanner_profiles_manager.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/__pycache__/window.cpython-312.pyc b/src/netmonitor/back/__pycache__/window.cpython-312.pyc
new file mode 100644
index 0000000..c6971ec
--- /dev/null
+++ b/src/netmonitor/back/__pycache__/window.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/back/detector_profile_HST.py b/src/netmonitor/back/detector_profile_HST.py
new file mode 100644
index 0000000..267b307
--- /dev/null
+++ b/src/netmonitor/back/detector_profile_HST.py
@@ -0,0 +1,215 @@
+import threading
+import queue
+import time
+import os
+from pathlib import Path
+from scapy.all import wrpcap, AsyncSniffer
+
+from river.anomaly import HalfSpaceTrees
+from tinydb import TinyDB
+from tinydb.table import Document
+
+from .window import Window
+from .notification_service import notification_service
+
+XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share"))
+LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/detector/profiles_logs"
+PCAP_PATH = f"{XDG_DATA_HOME}/netmonitor/detector/profiles_pcaps"
+
+
+class DetectorProfileHST:
+
+ def __init__(
+ self,
+ profile_name: str,
+ input_data: dict
+ ):
+ self.profile_name = profile_name
+ self.features = input_data.get("features", [])
+ self.params = input_data.get("params", {})
+ self.n_trees = int(self.params.get("trees", 10))
+ self.height = int(self.params.get("height", 8))
+ self.window_size = int(self.params.get("window", 250))
+ self.seed = int(self.params.get("seed", 42))
+ self.threshold = float(self.params.get("threshold", 0.7))
+ self.window_duration = float(self.params.get("window_duration", 10.0))
+ self.bpf_filter = self.params.get("bpf_filter", "")
+ self.interface = self.params.get("interface", None)
+ self.queue_size = int(self.params.get("queue_size", 10000))
+ self.logs_path = f"{LOGS_PATH}/{profile_name}.json"
+ os.makedirs(os.path.dirname(self.logs_path), exist_ok=True)
+
+ self.is_active = False
+
+ self.notify_enabled = False
+ self._init_runtime_objects()
+
+
+ def _init_runtime_objects(self):
+ self.queue = queue.Queue(maxsize=self.queue_size)
+
+ os.makedirs(f"{LOGS_PATH}", exist_ok=True)
+ self.db = TinyDB(f"{LOGS_PATH}/{self.profile_name}.json")
+
+ self.model = HalfSpaceTrees(
+ n_trees=self.n_trees,
+ height=self.height,
+ window_size=self.window_size,
+ seed=self.seed
+ )
+
+ self.window = Window(
+ window_duration=self.window_duration,
+ enabled_features=self.features
+ )
+
+ self.processor_thread = None
+
+ self.packets_read = 0
+ self.windows_analyzed = 0
+
+ if not hasattr(self, 'plot_data'):
+ self.plot_data = []
+
+
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ cols_to_remove = ['sniffer_thread', 'processor_thread', 'queue', 'db', 'window']
+ for col in cols_to_remove:
+ if col in state:
+ del state[col]
+ return state
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+ self._init_runtime_objects()
+ self.is_active = False
+
+ def __repr__(self):
+ return f"<DetectorProfileHST profile_name={self.profile_name!r}, active={self.is_active}>"
+
+ def turn_on(self):
+ if self.is_active:
+ return
+
+ self.is_active = True
+ os.makedirs(os.path.dirname(self.logs_path), exist_ok=True)
+ self.window.window_duration = self.window_duration
+ self.window.window_start = time.time()
+
+ self.sniffer = AsyncSniffer(
+ iface=self.interface,
+ filter=self.bpf_filter,
+ store=False,
+ prn=self._add_to_queue
+ )
+ self.sniffer.start()
+
+ self.processor_thread = threading.Thread(target=self._process_thread, daemon=True)
+
+ self.processor_thread.start()
+
+ def turn_off(self):
+ self.is_active = False
+ if self.sniffer:
+ self.sniffer.stop()
+
+ time.sleep(0.2)
+
+ def _add_to_queue(self, pkt):
+ if self.queue:
+ try:
+ self.queue.put_nowait(pkt)
+ self.packets_read += 1
+ except queue.Full:
+ pass
+
+ def _process_thread(self):
+ while self.is_active:
+ try:
+ pkt = self.queue.get(timeout=1)
+ except queue.Empty:
+ continue
+
+ result = self.window.add_packet(pkt)
+
+ if result is None:
+ continue
+
+ features, raw_packets = result
+ self.windows_analyzed += 1
+
+ if not features:
+ continue
+
+ sample = {feat: 0.0 for feat in self.features}
+ for k, v in features.items():
+ if k in sample:
+ sample[k] = float(v)
+
+ score = self.model.score_one(sample)
+ self.model.learn_one(sample)
+
+ self.plot_data.append(score)
+ if len(self.plot_data) > 30:
+ self.plot_data.pop(0)
+
+ if score > self.threshold:
+ self._handle_anomaly(score, sample, raw_packets)
+
+ def _handle_anomaly(self, score: float, features: dict, raw_packets: list):
+ timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")
+
+ os.makedirs(os.path.dirname(f"{PCAP_PATH}/{self.profile_name}"), exist_ok=True)
+ filename = f"{PCAP_PATH}/{self.profile_name}/anom_{timestamp}.pcap"
+
+ if raw_packets:
+ try:
+ wrpcap(filename, raw_packets)
+ except Exception as e:
+ print(f"Error saving pcap: {e}")
+
+ if self.notify_enabled:
+ msg = f"*Anomaly detected: {self.profile_name}*\nScore: `{score:.4f}`\nSaved: `{filename}`"
+ notification_service.send_message(message=msg)
+
+ if self.db:
+ self.db.insert(
+ Document({
+ "ts": time.time(),
+ "timestamp": timestamp,
+ "profile": self.profile_name,
+ "score": float(score),
+ "pcap": filename,
+ "pkt_rate": features.get("pkt_rate", 0),
+ "proto_info": f"TCP:{features.get('proto_tcp_ratio',0):.2f} UDP:{features.get('proto_udp_ratio',0):.2f}",
+ }, doc_id=None)
+ )
+
+ def to_dict(self):
+ return {
+ "profile name": self.profile_name,
+ "logs_path": self.logs_path,
+ "pcap_path": f"{PCAP_PATH}/{self.profile_name}",
+ "params": self.params,
+ "features": self.features,
+ }
+
+ def get_logs(self):
+ if self.db:
+ return self.db.all()
+ return []
+
+ def clear_logs(self):
+ if self.db:
+ self.db.truncate()
+
+ def get_runtime_stats(self):
+ return {
+ "is_active": self.is_active,
+ "notify_enabled": self.notify_enabled,
+ "packets_captured": getattr(self, "packets_read", 0),
+ "queue_size": self.queue.qsize() if hasattr(self, "queue") and self.queue else 0,
+ "windows_processed": getattr(self, "windows_analyzed", 0),
+ "window_duration": self.window_duration
+ }
diff --git a/src/netmonitor/back/detector_profiles_manager.py b/src/netmonitor/back/detector_profiles_manager.py
new file mode 100644
index 0000000..1d1c5ea
--- /dev/null
+++ b/src/netmonitor/back/detector_profiles_manager.py
@@ -0,0 +1,153 @@
+import re
+from pathlib import Path
+from typing import Callable, Literal
+import pickle
+from netmonitor.back.detector_profile_HST import DetectorProfileHST
+
+
+SeverityLevel = Literal["information", "warning", "error"]
+VALID_NAME_REGEX = r"^[a-zA-Z0-9_-]+$"
+
+class DetectorProfilesManager:
+ def __init__(self, profiles_file: str):
+ self.profiles_file = Path(profiles_file)
+ self.profiles: list[DetectorProfileHST] = []
+
+ self.on_message: Callable[[str, str, str], None] | None = None
+ self.on_refresh: Callable[[], None] | None = None
+
+ self.load_profiles()
+
+
+ def _notify(self, msg: str, title: str = "Profile Manager", level: SeverityLevel = "information"):
+ if self.on_message:
+ self.on_message(msg, title, level)
+
+
+ def _refresh_front(self):
+ if self.on_refresh:
+ self.on_refresh()
+
+
+ def _fail(self, msg: str, level: SeverityLevel = "error", notify: bool = True) -> bool:
+ if notify:
+ self._notify(msg, level=level)
+ return False
+
+
+ def _ok(self, msg: str | None = None, level: SeverityLevel = "information", notify: bool = True) -> bool:
+ if msg and notify:
+ self._notify(msg, level=level)
+ return True
+
+
+ def try_save_profiles(self, notify: bool = True) -> bool:
+ try:
+ self.profiles_file.parent.mkdir(parents=True, exist_ok=True)
+
+ with open(self.profiles_file, "wb") as f:
+ pickle.dump(self.profiles, f, protocol=pickle.HIGHEST_PROTOCOL)
+
+ return self._ok("Profiles saved successfully.", notify=notify)
+ except Exception as e:
+ import traceback
+ traceback.print_exc()
+ return self._fail(f"Error saving profiles: {e}", notify=notify)
+
+
+ def load_profiles(self, notify: bool = True) -> bool:
+ if not self.profiles_file.exists():
+ if notify:
+ self._notify("Profiles file not found. Creating a new one.", level="warning")
+ if not self.try_save_profiles(notify=False):
+ return self._fail("Failed to create profiles file!", notify=notify)
+ return self._ok("New profiles file created.", notify=notify)
+
+ try:
+ with open(self.profiles_file, "rb") as f:
+ self.profiles = pickle.load(f)
+ self._refresh_front()
+ return self._ok(f"Loaded {len(self.profiles)} profiles.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Error loading profiles: {e}", notify=notify)
+
+
+ def add_profile(self, profile_name: str, input_data: dict, notify: bool = True) -> bool:
+ if not profile_name:
+ return self._fail("Name can't be blank.", "error", notify)
+
+ if not re.match(VALID_NAME_REGEX, profile_name):
+ return self._fail(f"Bad input '{profile_name}'. Use only letters, numbers, '_' and '-'.", "error", notify)
+
+ if any(p.profile_name == profile_name for p in self.profiles):
+ return self._fail(f"Profile {profile_name} already exists.", "warning", notify)
+
+ new_profile = DetectorProfileHST(profile_name=profile_name, input_data = input_data)
+ self.profiles.append(new_profile)
+
+ if not self.try_save_profiles(notify=False):
+ self.profiles.remove(new_profile)
+ return self._fail(f"Failed to save profile {profile_name}.", notify=notify)
+
+ self._refresh_front()
+ return self._ok(f"Added profile {profile_name}.", notify=notify)
+
+
+ def delete_profile(self, profile_name: str, notify: bool = True) -> bool:
+ before = len(self.profiles)
+ self.profiles = [p for p in self.profiles if p.profile_name != profile_name]
+
+ if len(self.profiles) == before:
+ return self._fail(f"Profile {profile_name} not found.", "warning", notify)
+
+ if not self.try_save_profiles(notify=False):
+ return self._fail(f"Failed to save changes after deleting {profile_name}.", notify=notify)
+
+ self._refresh_front()
+ return self._ok(f"Deleted profile {profile_name}.", notify=notify)
+
+
+ def get_profile(self, profile_name: str) -> DetectorProfileHST | None:
+ return next((p for p in self.profiles if p.profile_name == profile_name), None)
+
+
+ def update_profile(self, profile_name: str, field: str, value, notify: bool = True) -> bool:
+ p = self.get_profile(profile_name)
+ if not p:
+ return self._fail(f"Profile {profile_name} does not exist.", notify=notify)
+
+ setattr(p, field, value)
+ if self.try_save_profiles(notify=False):
+ return self._ok(f"Updated profile {profile_name}.", notify=notify)
+ else:
+ return self._fail(f"Failed to save updated profile {profile_name}.", notify=notify)
+
+
+ def turn_on_profile(self, profile_name: str, app=None, notify: bool = True) -> bool:
+ p = self.get_profile(profile_name)
+ if not p:
+ return self._fail(f"Profile {profile_name} not found.", notify=notify)
+ try:
+ p.turn_on()
+ return self._ok(f"Profile {profile_name} activated.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Error activating profile: {e}", notify=notify)
+
+
+ def turn_off_profile(self, profile_name: str, notify: bool = True) -> bool:
+ p = self.get_profile(profile_name)
+ if not p:
+ return self._fail(f"Profile {profile_name} not found.", notify=notify)
+ try:
+ p.turn_off()
+ return self._ok(f"Profile {profile_name} deactivated.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Error deactivating profile: {e}", notify=notify)
+
+
+ def get_profile_logs(self, profile_name: str, notify: bool = True):
+ p = self.get_profile(profile_name)
+ if not p:
+ self._fail(f"Profile {profile_name} not found.", "error", notify)
+ return None
+ return p.get_logs()
diff --git a/src/netmonitor/back/notification_service.py b/src/netmonitor/back/notification_service.py
new file mode 100644
index 0000000..e347faf
--- /dev/null
+++ b/src/netmonitor/back/notification_service.py
@@ -0,0 +1,49 @@
+import json
+import requests
+from pathlib import Path
+
+CONFIG_FILE = Path("data/global_config.json")
+
+class NotificationService:
+ def __init__(self):
+ self.webhook_url = ""
+ self.load_config()
+
+ def load_config(self):
+ if CONFIG_FILE.exists():
+ try:
+ with open(CONFIG_FILE, "r") as f:
+ data = json.load(f)
+ self.webhook_url = data.get("discord_webhook_url", "")
+ except Exception as e:
+ print(f"Error loading notification config: {e}")
+
+ def save_config(self, webhook_url: str):
+ self.webhook_url = webhook_url
+
+ CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True)
+ try:
+ with open(CONFIG_FILE, "w") as f:
+ json.dump({
+ "discord_webhook_url": self.webhook_url
+ }, f, indent=4)
+ return True
+ except Exception as e:
+ print(f"Error saving config: {e}")
+ return False
+
+ def send_message(self, message: str) -> bool:
+ if not self.webhook_url:
+ return False
+
+ payload = {
+ "content": message
+ }
+ try:
+ response = requests.post(self.webhook_url, json=payload, timeout=5)
+ return response.status_code in [200, 204]
+ except Exception as e:
+ print(f"Discord send error: {e}")
+ return False
+
+notification_service = NotificationService()
diff --git a/src/netmonitor/back/scanner_profile.py b/src/netmonitor/back/scanner_profile.py
new file mode 100644
index 0000000..6a20b5b
--- /dev/null
+++ b/src/netmonitor/back/scanner_profile.py
@@ -0,0 +1,138 @@
+import os
+from pathlib import Path
+from datetime import datetime
+from tinydb import TinyDB
+import nmap
+from .notification_service import notification_service
+
+
+XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share"))
+LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/scanner/profiles_logs"
+
+class ScannerProfile:
+ def __init__(self, profile_name: str, nmap_input=None, scheduler=None, cve=None):
+ self.profile_name = profile_name
+ self.nmap_input = nmap_input or {}
+ self.scheduler = scheduler
+ self.cve = cve
+ self.is_active = False
+
+ self.notify_enabled = False
+ self.notify_only_cve = False
+
+ self.nm = None
+ self.db = None
+ self.profile_results_path = f"{LOGS_PATH}/{profile_name}.json"
+ os.makedirs(os.path.dirname(self.profile_results_path), exist_ok=True)
+
+ @property
+ def nmap(self):
+ if self.nm is None:
+ self.nm = nmap.PortScanner()
+ return self.nm
+
+ @property
+ def tinydb(self):
+ if self.db is None:
+ self.db = TinyDB(self.profile_results_path)
+ return self.db
+
+ def __getstate__(self):
+ state = self.__dict__.copy()
+ state['nm'] = None
+ state['db'] = None
+ return state
+
+ def __setstate__(self, state):
+ self.__dict__.update(state)
+ self.nm = None
+ self.db = None
+
+ def __repr__(self):
+ return f"<ScannerProfile profile_name={self.profile_name!r}, active={self.is_active}>"
+
+
+ def scan(self):
+ targets = self.nmap_input.get("targets", "")
+ arguments = self.nmap_input.get("arguments", "")
+ ports = self.nmap_input.get("ports", "")
+
+ if self.cve:
+ arguments += " --script=vulners "
+
+ if ports == '':
+ self.nmap.scan(hosts=targets, arguments=arguments)
+ else:
+ self.nmap.scan(hosts=targets, ports=ports, arguments=arguments)
+
+ xml_result = self.nmap.get_nmap_last_output()
+ analyzed_results = self.nmap.analyse_nmap_xml_scan(xml_result)
+
+ if isinstance(analyzed_results, dict):
+ analyzed_results['_timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
+
+ self.tinydb.insert(analyzed_results)
+
+ if self.notify_enabled:
+ self._handle_notification(analyzed_results, targets)
+
+ return analyzed_results
+
+ def _handle_notification(self, results, targets):
+ scan_str = str(results).lower()
+ found_issues = "cve" in scan_str or "vulnerab" in scan_str
+ should_send = False
+ msg = f"*Scanner report: {self.profile_name}*\nTargets: `{targets}`"
+ summary = ""
+ try:
+ if 'scan' in results:
+ for host, data in results['scan'].items():
+ summary += f"\n *{host}*"
+
+ if 'tcp' in data:
+ open_ports = [f"{p}/tcp" for p, info in data['tcp'].items() if info.get('state') == 'open']
+ if open_ports:
+ summary += f"\n Open ports: {', '.join(open_ports)}"
+ if 'udp' in data:
+ open_ports = [f"{p}/udp" for p, info in data['udp'].items() if info.get('state') == 'open']
+ if open_ports:
+ summary += f"\n Open ports (UDP): {', '.join(open_ports)}"
+ if 'osmatch' in data and data['osmatch']:
+ os_name = data['osmatch'][0].get('name', 'Unknown')
+ summary += f"\n OS detected: {os_name}"
+ except Exception as e:
+ summary += f"\n(Error building summary: {e})"
+
+ if self.notify_only_cve:
+ if found_issues:
+ should_send = True
+ msg += "\nCVE detected!"
+ msg += "\n---" + summary
+ else:
+ should_send = True
+ msg += "\nScan completed."
+ if found_issues:
+ msg += "\nPotential vulnerabilities detected."
+
+ if summary:
+ msg += "\n---" + summary
+
+ if should_send:
+ if len(msg) > 1900:
+ msg = msg[:1900] + "\n...."
+ notification_service.send_message(msg)
+
+
+ def to_dict(self):
+ return {
+ "profile_name": self.profile_name,
+ "nmap_input": self.nmap_input,
+ "scheduler": self.scheduler,
+ "is_active": self.is_active,
+ "notify_enabled": getattr(self, 'notify_enabled', False),
+ "notify_only_cve": getattr(self, 'notify_only_cve', False),
+ "results_path": self.profile_results_path,
+ }
+
+ def get_logs(self):
+ return self.tinydb.all()
diff --git a/src/netmonitor/back/scanner_profiles_manager.py b/src/netmonitor/back/scanner_profiles_manager.py
new file mode 100644
index 0000000..7748835
--- /dev/null
+++ b/src/netmonitor/back/scanner_profiles_manager.py
@@ -0,0 +1,177 @@
+import re
+from pathlib import Path
+from typing import Callable, Literal
+from apscheduler.schedulers.background import BackgroundScheduler
+from apscheduler.triggers.cron import CronTrigger
+import pickle
+
+from .scanner_profile import ScannerProfile
+
+SeverityLevel = Literal["information", "warning", "error"]
+VALID_NAME_REGEX = r"^[a-zA-Z0-9_-]+$"
+
+class ScannerProfilesManager:
+ def __init__(self, profiles_file: str, scheduler: BackgroundScheduler):
+ self.profiles_file = Path(profiles_file)
+ self.scheduler = scheduler
+ self.profiles: list[ScannerProfile] = []
+
+ self.on_message: Callable[[str, str, str], None] | None = None
+ self.on_refresh: Callable[[], None] | None = None
+
+ self.load_profiles()
+
+ def _notify(self, msg: str, title: str = "Profile Manager", level: SeverityLevel = "information"):
+ if self.on_message:
+ self.on_message(msg, title, level)
+
+ def _refresh_front(self):
+ if self.on_refresh:
+ self.on_refresh()
+
+ def _fail(self, msg: str, level: SeverityLevel = "error", notify: bool = True) -> bool:
+ if notify:
+ self._notify(msg, level=level)
+ return False
+
+ def _ok(self, msg: str | None = None, level: SeverityLevel = "information", notify: bool = True) -> bool:
+ if msg and notify:
+ self._notify(msg, level=level)
+ return True
+
+ def try_save_profiles(self, notify: bool = True) -> bool:
+ try:
+ self.profiles_file.parent.mkdir(parents=True, exist_ok=True)
+ with open(self.profiles_file, "wb") as f:
+ pickle.dump(self.profiles, f)
+ return self._ok("Profiles saved successfully.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Error saving profiles: {e}", notify=notify)
+
+ def load_profiles(self, notify: bool = True) -> bool:
+ if not self.profiles_file.exists():
+ if notify:
+ self._notify("Profiles file not found. Creating a new one.", level="warning")
+ if not self.try_save_profiles(notify=False):
+ return self._fail("Failed to create profiles file!", notify=notify)
+ return self._ok("New profiles file created.", notify=notify)
+
+ try:
+ with open(self.profiles_file, "rb") as f:
+ self.profiles = pickle.load(f)
+ self._refresh_front()
+ return self._ok(f"Loaded {len(self.profiles)} profiles.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Error loading profiles: {e}", notify=notify)
+
+
+ def add_profile(self, profile_name: str, notify: bool = True) -> bool:
+ if not profile_name:
+ return self._fail("Name can't be blank.", "error", notify)
+
+ if not re.match(VALID_NAME_REGEX, profile_name):
+ return self._fail(f"Bad input '{profile_name}'. Use only letters, numbers, '_' and '-'.", "error", notify)
+
+ if any(p.profile_name == profile_name for p in self.profiles):
+ return self._fail(f"Profile {profile_name} already exists.", "warning", notify)
+
+ new_profile = ScannerProfile(profile_name=profile_name)
+ self.profiles.append(new_profile)
+
+ if not self.try_save_profiles(notify=False):
+ self.profiles.remove(new_profile)
+ return self._fail(f"Failed to save profile {profile_name}.", notify=notify)
+
+ self._refresh_front()
+ return self._ok(f"Added profile {profile_name}.", notify=notify)
+
+ def delete_profile(self, profile_name: str, notify: bool = True) -> bool:
+ before = len(self.profiles)
+ self.profiles = [p for p in self.profiles if p.profile_name != profile_name]
+
+ if len(self.profiles) == before:
+ return self._fail(f"Profile {profile_name} not found.", "warning", notify)
+
+ if not self.try_save_profiles(notify=False):
+ return self._fail(f"Failed to save changes after deleting {profile_name}.", notify=notify)
+
+ self._refresh_front()
+ return self._ok(f"Deleted profile {profile_name}.", notify=notify)
+
+ def get_profile(self, profile_name: str) -> ScannerProfile | None:
+ return next((p for p in self.profiles if p.profile_name == profile_name), None)
+
+ def update_profile(self, profile_name: str, field: str, value, notify: bool = True) -> bool:
+ p = self.get_profile(profile_name)
+ if not p:
+ return self._fail(f"Profile {profile_name} does not exist.", notify=notify)
+
+ setattr(p, field, value)
+ if self.try_save_profiles(notify=False):
+ return self._ok(f"Updated profile {profile_name}.", notify=notify)
+ else:
+ return self._fail(f"Failed to save updated profile {profile_name}.", notify=notify)
+
+
+ def run_profile_once(self, profile_name: str, notify: bool = True) -> bool:
+ p = self.get_profile(profile_name)
+ if not p:
+ return self._fail(f"Profile {profile_name} not found.", notify=notify)
+
+ try:
+ p.scan()
+ return self._ok(f"Scan completed for {profile_name}.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Scan error: {e}", notify=notify)
+
+ def turn_on_profile(self, profile_name: str, notify: bool = True) -> bool:
+ p = self.get_profile(profile_name)
+ if not p:
+ return self._fail(f"Profile {profile_name} not found.", notify=notify)
+
+ try:
+ job_id = f"profile_{p.profile_name}"
+ self.scheduler.add_job(
+ lambda: self.run_profile_once(p.profile_name, notify=False),
+ trigger=CronTrigger.from_crontab(p.scheduler),
+ id=job_id,
+ replace_existing=True
+ )
+ p.is_active = True
+ return self._ok(f"Profile {profile_name} activated.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Error activating profile: {e}", notify=notify)
+
+ def turn_off_profile(self, profile_name: str, notify: bool = True) -> bool:
+ job_id = f"profile_{profile_name}"
+ try:
+ self.scheduler.remove_job(job_id)
+ p = self.get_profile(profile_name)
+ if p:
+ p.is_active = False
+ return self._ok(f"Profile {profile_name} deactivated.", notify=notify)
+ except Exception as e:
+ return self._fail(f"Error deactivating profile: {e}", notify=notify)
+
+ def set_validated_scheduler(self, profile_name: str, cron_str: str, notify: bool = True) -> bool:
+ p = self.get_profile(profile_name)
+ if not p:
+ return self._fail(f"Profile {profile_name} not found.", notify=notify)
+
+ try:
+ CronTrigger.from_crontab(cron_str)
+ p.scheduler = cron_str
+ if not self.try_save_profiles(notify=False):
+ return self._fail(f"Failed to save scheduler for {profile_name}.", notify=notify)
+ return self._ok(f"Scheduler updated for {profile_name}.", notify=notify)
+ except ValueError as ve:
+ return self._fail(f"Invalid CRON: {ve}", notify=notify)
+ except Exception as e:
+ return self._fail(f"Unexpected error: {e}", notify=notify)
+
+ def get_profile_logs(self, profile_name: str, notify: bool = True):
+ p = self.get_profile(profile_name)
+ if not p:
+ self._fail(f"Profile {profile_name} not found.", "error", notify)
+ return None
+ return p.get_logs()
diff --git a/src/netmonitor/back/window.py b/src/netmonitor/back/window.py
new file mode 100644
index 0000000..dfa8e7d
--- /dev/null
+++ b/src/netmonitor/back/window.py
@@ -0,0 +1,251 @@
+import time
+from collections import defaultdict
+from statistics import mean, pstdev
+import math
+
+from scapy.all import IP, IPv6, TCP, UDP, ICMP
+
+class Window:
+ def __init__(self, window_duration: float, enabled_features: list[str]):
+
+ self.window_duration = float(window_duration)
+ self.enabled = set(enabled_features)
+
+ self.window_start = time.time()
+
+ self.raw_packets_buffer = []
+
+ self.flows = defaultdict(lambda: {
+ "pkt_count": 0,
+ "byte_count": 0,
+ "dst_ports": defaultdict(int),
+ "src_ports": defaultdict(int),
+ "tcp_flags": {
+ "syn": 0, "fin": 0, "rst": 0, "ack": 0,
+ "psh": 0, "urg": 0, "xmas": 0, "null": 0,
+ },
+ "sizes": [],
+ "start_ts": None,
+ "end_ts": None,
+ "tcp_pkts": 0,
+ "udp_pkts": 0,
+ "icmp_pkts": 0,
+ })
+
+ def add_packet(self, pkt):
+ now = time.time()
+
+ if now - self.window_start >= self.window_duration:
+ features = self._finish_window()
+
+ self.window_start = now
+
+ raw = list(self.raw_packets_buffer)
+
+ self.raw_packets_buffer.clear()
+ self.flows.clear()
+
+ self._process_single_packet(pkt)
+
+ return features, raw
+
+ self._process_single_packet(pkt)
+ return None
+
+ def _process_single_packet(self, pkt):
+ self.raw_packets_buffer.append(pkt)
+
+ proto = None
+ if IP in pkt:
+ src = pkt[IP].src
+ dst = pkt[IP].dst
+ proto = pkt[IP].proto
+ elif IPv6 in pkt:
+ src = pkt[IPv6].src
+ dst = pkt[IPv6].dst
+ proto = pkt[IPv6].nh
+ else:
+ return
+
+ key = (src, dst, proto)
+ f = self.flows[key]
+
+ now = time.time()
+ if f["start_ts"] is None:
+ f["start_ts"] = now
+ f["end_ts"] = now
+
+ size = len(pkt)
+ f["pkt_count"] += 1
+ f["byte_count"] += size
+ f["sizes"].append(size)
+
+ if TCP in pkt:
+ f["tcp_pkts"] += 1
+ dport = pkt[TCP].dport
+ sport = pkt[TCP].sport
+ f["dst_ports"][dport] += 1
+ f["src_ports"][sport] += 1
+
+ flags = pkt[TCP].flags
+ if flags & 0x02: f["tcp_flags"]["syn"] += 1
+ if flags & 0x01: f["tcp_flags"]["fin"] += 1
+ if flags & 0x04: f["tcp_flags"]["rst"] += 1
+ if flags & 0x10: f["tcp_flags"]["ack"] += 1
+ if flags & 0x08: f["tcp_flags"]["psh"] += 1
+ if flags & 0x20: f["tcp_flags"]["urg"] += 1
+
+ if flags in [0x29, 0x3F, 0x3B]:
+ f["tcp_flags"]["xmas"] += 1
+ if flags == 0:
+ f["tcp_flags"]["null"] += 1
+
+ elif UDP in pkt:
+ f["udp_pkts"] += 1
+ dport = pkt[UDP].dport
+ sport = pkt[UDP].sport
+ f["dst_ports"][dport] += 1
+ f["src_ports"][sport] += 1
+
+ elif ICMP in pkt:
+ f["icmp_pkts"] += 1
+
+ def _finish_window(self):
+ if not self.flows:
+ return {}
+
+ total_flows = len(self.flows)
+ total_packets = 0
+ total_bytes = 0
+
+ tcp_flags_global = {
+ "syn": 0, "fin": 0, "rst": 0, "ack": 0,
+ "psh": 0, "urg": 0, "xmas": 0, "null": 0
+ }
+
+ all_sizes = []
+ proto_tcp = 0
+ proto_udp = 0
+ proto_icmp = 0
+
+ dst_port_counts = defaultdict(int)
+ src_port_counts = defaultdict(int)
+
+ for f in self.flows.values():
+ total_packets += f["pkt_count"]
+ total_bytes += f["byte_count"]
+
+ for p, c in f["dst_ports"].items():
+ dst_port_counts[p] += c
+ for p, c in f["src_ports"].items():
+ src_port_counts[p] += c
+
+ for k in tcp_flags_global:
+ tcp_flags_global[k] += f["tcp_flags"][k]
+
+ all_sizes.extend(f["sizes"])
+ proto_tcp += f["tcp_pkts"]
+ proto_udp += f["udp_pkts"]
+ proto_icmp += f["icmp_pkts"]
+
+ window_len = self.window_duration
+ total_pkts = total_packets if total_packets > 0 else 1
+
+ feat = {}
+
+ if "flow_count" in self.enabled: feat["flow_count"] = total_flows
+ if "total_packets" in self.enabled: feat["total_packets"] = total_packets
+ if "total_bytes" in self.enabled: feat["total_bytes"] = total_bytes
+ if "avg_bytes_per_flow" in self.enabled: feat["avg_bytes_per_flow"] = total_bytes / total_flows if total_flows else 0
+ if "pkt_rate" in self.enabled: feat["pkt_rate"] = total_packets / window_len
+ if "byte_rate" in self.enabled: feat["byte_rate"] = total_bytes / window_len
+
+ if "syn_count" in self.enabled: feat["syn_count"] = tcp_flags_global["syn"]
+ if "fin_count" in self.enabled: feat["fin_count"] = tcp_flags_global["fin"]
+ if "rst_count" in self.enabled: feat["rst_count"] = tcp_flags_global["rst"]
+ if "ack_count" in self.enabled: feat["ack_count"] = tcp_flags_global["ack"]
+ if "psh_count" in self.enabled: feat["psh_count"] = tcp_flags_global["psh"]
+ if "urg_count" in self.enabled: feat["urg_count"] = tcp_flags_global["urg"]
+
+ if "syn_ratio" in self.enabled: feat["syn_ratio"] = tcp_flags_global["syn"] / total_pkts
+ if "fin_ratio" in self.enabled: feat["fin_ratio"] = tcp_flags_global["fin"] / total_pkts
+ if "xmas_total" in self.enabled: feat["xmas_total"] = tcp_flags_global["xmas"]
+ if "null_scan_total" in self.enabled: feat["null_scan_total"] = tcp_flags_global["null"]
+
+ if "unique_dst_ports" in self.enabled: feat["unique_dst_ports"] = len(dst_port_counts)
+ if "unique_src_ports" in self.enabled: feat["unique_src_ports"] = len(src_port_counts)
+ if "port_entropy_dst" in self.enabled: feat["port_entropy_dst"] = entropy(dst_port_counts)
+ if "port_entropy_src" in self.enabled: feat["port_entropy_src"] = entropy(src_port_counts)
+
+ if all_sizes:
+ if "avg_pkt_size" in self.enabled: feat["avg_pkt_size"] = mean(all_sizes)
+ if "min_pkt_size" in self.enabled: feat["min_pkt_size"] = min(all_sizes)
+ if "max_pkt_size" in self.enabled: feat["max_pkt_size"] = max(all_sizes)
+ if "std_pkt_size" in self.enabled: feat["std_pkt_size"] = pstdev(all_sizes)
+ else:
+ for k in ["avg_pkt_size", "min_pkt_size", "max_pkt_size", "std_pkt_size"]:
+ if k in self.enabled: feat[k] = 0
+
+ if "avg_packets_per_flow" in self.enabled:
+ feat["avg_packets_per_flow"] = total_packets / total_flows if total_flows else 0
+ if "avg_bytes_per_packet" in self.enabled:
+ feat["avg_bytes_per_packet"] = total_bytes / total_pkts
+
+ if "proto_tcp_ratio" in self.enabled: feat["proto_tcp_ratio"] = proto_tcp / total_pkts
+ if "proto_udp_ratio" in self.enabled: feat["proto_udp_ratio"] = proto_udp / total_pkts
+ if "proto_icmp_ratio" in self.enabled: feat["proto_icmp_ratio"] = proto_icmp / total_pkts
+
+ return feat
+
+
+FEATURE_LIST = [
+ "flow_count",
+ "total_packets",
+ "total_bytes",
+ "avg_bytes_per_flow",
+ "pkt_rate",
+ "byte_rate",
+
+ "syn_count",
+ "fin_count",
+ "rst_count",
+ "ack_count",
+ "psh_count",
+ "urg_count",
+ "syn_ratio",
+ "fin_ratio",
+ "xmas_total",
+ "null_scan_total",
+
+ "unique_dst_ports",
+ "unique_src_ports",
+ "port_entropy_dst",
+ "port_entropy_src",
+
+ "avg_pkt_size",
+ "min_pkt_size",
+ "max_pkt_size",
+ "std_pkt_size",
+
+ "avg_packets_per_flow",
+ "avg_bytes_per_packet",
+ "proto_tcp_ratio",
+ "proto_udp_ratio",
+ "proto_icmp_ratio",
+]
+
+
+def entropy(values):
+ if not values:
+ return 0.0
+
+ total = sum(values.values())
+ if total == 0:
+ return 0.0
+
+ entropy_val = 0.0
+ for count in values.values():
+ p = count / total
+ entropy_val -= p * math.log2(p)
+
+ return entropy_val
diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc
new file mode 100644
index 0000000..6a32fac
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/detector_profiles_tab.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
new file mode 100644
index 0000000..a74ee0c
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/detector_profiles_tab_pushscreens.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc
new file mode 100644
index 0000000..0257359
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/detector_tab.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
new file mode 100644
index 0000000..8672061
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/detector_tab_pushscreens.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc
new file mode 100644
index 0000000..227bfa8
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/options_tab.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc
new file mode 100644
index 0000000..d6fcdfb
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/scanner_profiles_tab.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc
new file mode 100644
index 0000000..a4db183
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/scanner_profiles_tab_pushscreens.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc
new file mode 100644
index 0000000..8a8d03a
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/scanner_tab.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc
new file mode 100644
index 0000000..28cc805
--- /dev/null
+++ b/src/netmonitor/front/__pycache__/scanner_tab_pushscreens.cpython-312.pyc
Binary files differ
diff --git a/src/netmonitor/front/detector_profiles_tab.py b/src/netmonitor/front/detector_profiles_tab.py
new file mode 100644
index 0000000..7860e16
--- /dev/null
+++ b/src/netmonitor/front/detector_profiles_tab.py
@@ -0,0 +1,77 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.widgets import Button, Label, Switch
+from textual.containers import Vertical, Horizontal, VerticalScroll
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+from .detector_profiles_tab_pushscreens import ConfirmDeletePushScreen, ShowLogsPushScreen, ShowProfilePushScreen, SetDetectorNotificationPushScreen
+
+class DetectorProfilesTab(Vertical):
+ def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.manager.on_refresh = self.refresh_profiles
+ self.manager.on_message = self.on_manager_message
+
+ def compose(self) -> ComposeResult:
+ yield VerticalScroll(id="profiles-list")
+
+ def on_mount(self) -> None:
+ self.refresh_profiles()
+
+ def refresh_profiles(self) -> None:
+ profiles_list = self.query_one("#profiles-list", VerticalScroll)
+ profiles_list.remove_children()
+
+ if not self.manager.profiles:
+ return
+
+ for profile in self.manager.profiles:
+ row = Horizontal(
+ Switch(id=f"activate-profile-switch-{profile.profile_name}", value=profile.is_active),
+ Label(f"{profile.profile_name}", classes="profile-profile_name"),
+ Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="primary"),
+ Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="default"),
+ Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"),
+ Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action button-delete", variant="error"),
+ classes="profile-row"
+ )
+ profiles_list.mount(row)
+
+ @on(Switch.Changed)
+ def any_switch_changed(self, event: Switch.Changed) -> None:
+ switch_id = event.switch.id
+ if not switch_id:
+ return
+ elif switch_id.startswith("activate-profile-switch"):
+ profile_name = switch_id.removeprefix("activate-profile-switch-")
+
+ if event.switch.value:
+ is_turned_on = self.manager.turn_on_profile(profile_name)
+ if not is_turned_on:
+ event.switch.value = False
+ else:
+ is_turned_off = self.manager.turn_off_profile(profile_name)
+ if not is_turned_off:
+ event.switch.value = False
+
+ @on(Button.Pressed)
+ def on_any_button_pressed(self, event: Button.Pressed) -> None:
+ button_id = event.button.id
+ if not button_id:
+ return
+ elif button_id.startswith("show-profile-button-"):
+ profile_name = button_id.removeprefix("show-profile-button-")
+ self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name))
+ elif button_id.startswith("set-notifications-button-"):
+ profile_name = button_id.removeprefix("set-notifications-button-")
+ self.app.push_screen(SetDetectorNotificationPushScreen(self.manager, profile_name))
+ elif button_id.startswith("show-logs-button-"):
+ profile_name = button_id.removeprefix("show-logs-button-")
+ self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name))
+ elif button_id.startswith("delete-"):
+ profile_name = button_id.removeprefix("delete-")
+ self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name))
+
+ def on_manager_message(self, msg: str, title: str, severity):
+ self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/detector_profiles_tab_pushscreens.py b/src/netmonitor/front/detector_profiles_tab_pushscreens.py
new file mode 100644
index 0000000..fc5b731
--- /dev/null
+++ b/src/netmonitor/front/detector_profiles_tab_pushscreens.py
@@ -0,0 +1,187 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.screen import ModalScreen
+from textual.widgets import Button, Label, Pretty, DataTable, Switch
+from textual.containers import Vertical, Horizontal, VerticalScroll, Container
+from textual_plotext import PlotextPlot
+
+from datetime import datetime
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+from ..back.detector_profile_HST import DetectorProfileHST
+
+class PlotTab(Container):
+ def __init__(self, profile: DetectorProfileHST, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.profile = profile
+ self.classes = "plot-card"
+
+ def compose(self):
+ yield PlotextPlot()
+
+ def on_mount(self):
+ self.set_interval(1, self.update_plot)
+
+ def update_plot(self):
+ plot_widget = self.query_one(PlotextPlot)
+ plt = plot_widget.plt
+
+ y = list(getattr(self.profile, "plot_data", []))
+
+ plt.clear_figure()
+ plt.theme("dark")
+
+ plt.plot(y, marker="dot", color="green")
+ plt.title("Anomaly Score")
+ plt.ylabel("last 30 windows")
+ plt.ylim(0, 1)
+
+ threshold = self.profile.params.get("threshold", 0.7)
+ if threshold is not None:
+ plt.horizontal_line(float(threshold), color="red")
+
+
+ plot_widget.refresh()
+
+
+class ShowProfilePushScreen(ModalScreen[str]):
+ def __init__(self, manager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.profile = self.manager.get_profile(profile_name)
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window large-modal"):
+ yield Label(f"Profile: {self.profile_name}", classes="modal-header")
+
+ with Horizontal(classes="modal-split-container"):
+
+ with Vertical(classes="left-panel"):
+ yield PlotTab(self.profile)
+
+ with Vertical(classes="right-panel"):
+ yield Label("Runtime Stats (Live)", classes="section-header")
+ with VerticalScroll(classes="info-box", id="stats-box"):
+ yield Pretty({}, id="runtime-stats-pretty")
+
+ yield Label("Configuration", classes="section-header")
+ with VerticalScroll(classes="info-box"):
+ yield Pretty(self.profile.to_dict())
+
+ with Container(classes="modal-footer"):
+ yield Button("Close", id="cancel-button", variant="primary")
+
+ def on_mount(self):
+ self.set_interval(1.0, self.update_stats)
+ self.update_stats()
+
+ def update_stats(self):
+ if self.profile:
+ stats = self.profile.get_runtime_stats()
+ self.query_one("#runtime-stats-pretty", Pretty).update(stats)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ self.dismiss(None)
+
+
+class ShowLogsPushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window medium-modal"):
+ yield Label(f"Anomaly Logs: {self.profile_name}", classes="modal-header")
+
+ with Container(classes="table-container"):
+ yield DataTable(id="logs_table", zebra_stripes=True, cursor_type="row")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Clear History", id="clear-button", variant="warning")
+ yield Button("Close", id="cancel-button", variant="primary")
+
+
+ def on_mount(self):
+ table = self.query_one("#logs_table", DataTable)
+ table.add_columns("Timestamp", "Score", "Packets Rate", "Protocol Info", "Verdict")
+
+ logs = self.manager.get_profile_logs(self.profile_name)
+
+ if not logs:
+ return
+
+ sorted_logs = sorted(logs, key=lambda x: x.get("ts", 0), reverse=True)
+
+ for log in sorted_logs:
+ dt = datetime.fromtimestamp(log.get("ts", 0)).strftime("%Y-%m-%d %H:%M:%S")
+
+ score = f"{log.get('score', 0):.4f}"
+ rate = f"{log.get('pkt_rate', 0):.1f}"
+ proto = str(log.get("proto_info", "-"))
+ verdict = "ANOMALY"
+
+ table.add_row(dt, score, rate, proto, verdict)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed) -> None:
+ if event.button.id == "clear-button":
+ profile = self.manager.get_profile(self.profile_name)
+ if profile:
+ profile.clear_logs()
+ self.query_one("#logs_table", DataTable).clear()
+ else:
+ self.dismiss(None)
+
+class SetDetectorNotificationPushScreen(ModalScreen[str]):
+ def __init__(self, manager, profile_name: str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.profile = self.manager.get_profile(profile_name)
+
+ def compose(self) -> ComposeResult:
+ current_val = getattr(self.profile, 'notify_enabled', False)
+
+ with Container(classes="modal-window small-modal"):
+ yield Label(f"Notification: {self.profile_name}", classes="modal-header")
+
+ with Vertical(classes="section-card"):
+ yield Label("Enable notification (Discord)")
+ yield Switch(value=current_val, id="switch-anomaly")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Close", id="close-button", variant="primary")
+
+ @on(Switch.Changed)
+ def on_switch_changed(self, event: Switch.Changed):
+ if event.switch.id == "switch-anomaly":
+ self.profile.notify_enabled = event.value
+ self.manager.try_save_profiles(notify=False)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ self.dismiss(None)
+
+class ConfirmDeletePushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window small-modal"):
+ yield Label("Are you sure?",classes="modal-header")
+ with Horizontal(classes="modal-footer"):
+ yield Button("Delete", id="confirm-button", variant="error")
+ yield Button("Cancel", id="cancel-button", variant="default")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ self.manager.delete_profile(self.profile_name)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)
diff --git a/src/netmonitor/front/detector_tab.py b/src/netmonitor/front/detector_tab.py
new file mode 100644
index 0000000..53dc908
--- /dev/null
+++ b/src/netmonitor/front/detector_tab.py
@@ -0,0 +1,135 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.widgets import Input, Select, Button, Label, Checkbox
+from textual.containers import Container, Horizontal, Vertical, VerticalScroll
+
+import psutil
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+from ..back.window import FEATURE_LIST
+from ..front.detector_tab_pushscreens import SaveProfilePushScreen
+
+class DetectorTab(Container):
+ def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.manager.on_message = self.on_manager_message
+
+ def compose(self) -> ComposeResult:
+ with Horizontal(id="top-config-container"):
+
+ model_section = Container(id="model-section", classes="section-card")
+ model_section.border_title = "Algorithm: HalfSpaceTrees"
+ with model_section:
+ with VerticalScroll(classes="detector-scroll"):
+ yield Label("Select interface:", classes="label")
+ try:
+ ifaces = list(psutil.net_if_addrs().keys())
+ except:
+ ifaces = []
+
+ yield Select.from_values(
+ ifaces,
+ id="interface-select",
+ allow_blank=False,
+ classes="input"
+ )
+
+ yield Label("Model params:", classes="label")
+ yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input")
+ yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input")
+ yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input")
+ yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input")
+ yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input")
+ yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input")
+ yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input")
+
+ features_section = Container(id="features-section", classes="section-card")
+ features_section.border_title = "Flow-based Features"
+ with features_section:
+ with VerticalScroll(classes="detector-scroll"):
+ yield Label("Select features to include in model:", classes="label")
+ self.feature_checkboxes = {}
+ for feat in FEATURE_LIST:
+ cb = Checkbox(feat, value=True, classes="input")
+ self.feature_checkboxes[feat] = cb
+ yield cb
+
+ bpf_section = Container(id="bpf-section", classes="section-card")
+ bpf_section.border_title = "BPF Filter (Optional)"
+ with bpf_section:
+ yield Input(
+ placeholder="BPF Filter (e.g. 'tcp port 80 or udp')",
+ id="param-bpf_filter",
+ classes="input full"
+ )
+ with Container(classes="save-button-container"):
+ yield Button("Save Profile", id="save-button", variant="success")
+
+ def get_inputs(self):
+ features = [f for f, cb in self.feature_checkboxes.items() if cb.value]
+
+ if not features:
+ raise ValueError("Select at least one feature.")
+
+ params = {}
+
+ try:
+ interface = self.query_one("#interface-select", Select).value
+ if not interface:
+ raise ValueError("Select interface.")
+ params["interface"] = interface
+ except Exception:
+ raise ValueError("Interface selection error.")
+
+ bpf_input = self.query_one("#param-bpf_filter", Input)
+ if bpf_input.value.strip():
+ params["bpf_filter"] = bpf_input.value.strip()
+
+ defaults = {
+ "trees": 10,
+ "height": 8,
+ "window": 250,
+ "seed": 42,
+ "threshold": 0.7,
+ "window_duration": 10.0,
+ "queue_size": 10000,
+ "bpf_filter": ""
+ }
+
+ model_section = self.query_one("#model-section")
+ for inp in model_section.query("Input"):
+ if inp.id and inp.id.startswith("param-"):
+ key = inp.id.removeprefix("param-")
+ if key == "bpf_filter": continue
+
+ val_str = inp.value.strip()
+
+ if not val_str:
+ if key in defaults and defaults[key] is not None:
+ params[key] = defaults[key]
+ continue
+
+ try:
+ if key in ["trees", "height", "window", "seed","queue_size"]:
+ params[key] = int(val_str)
+ elif key in ["threshold", "window_duration"]:
+ params[key] = float(val_str)
+ except ValueError:
+ raise ValueError(f"Param '{key}' must be a number.")
+
+ return {
+ "features": features,
+ "params": params,
+ }
+
+ @on(Button.Pressed, "#save-button")
+ async def handle_save_button(self, event: Button.Pressed):
+ try:
+ input_data = self.get_inputs()
+ self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data))
+ except ValueError as e:
+ self.app.notify(str(e), title="Validation error", severity="error")
+
+ def on_manager_message(self, msg: str, title: str, severity):
+ self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/detector_tab_pushscreens.py b/src/netmonitor/front/detector_tab_pushscreens.py
new file mode 100644
index 0000000..625921b
--- /dev/null
+++ b/src/netmonitor/front/detector_tab_pushscreens.py
@@ -0,0 +1,31 @@
+from textual.app import ComposeResult
+from textual.widgets import Input, Label, Button
+from textual.containers import Vertical, Horizontal
+from textual.screen import ModalScreen
+from textual import on
+
+from ..back.detector_profiles_manager import DetectorProfilesManager
+
+class SaveProfilePushScreen(ModalScreen[str]):
+ def __init__(self, manager: DetectorProfilesManager, input_data , *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.input_data = input_data
+
+
+ def compose(self) -> ComposeResult:
+ with Vertical(classes="modal-window small-modal"):
+ yield Label("Save scan profile", classes="modal-header")
+ yield Input(placeholder="Profile name:", id="profile-name", classes="input")
+ with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"):
+ yield Button("Confirm", id="confirm-button", variant="success", classes="button")
+ yield Button("Cancel", id="cancel-button", variant="error", classes="button")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ profile_name = self.query_one("#profile-name", Input).value.strip()
+ self.manager.add_profile(profile_name,self.input_data)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)
diff --git a/src/netmonitor/front/options_tab.py b/src/netmonitor/front/options_tab.py
new file mode 100644
index 0000000..6cbd448
--- /dev/null
+++ b/src/netmonitor/front/options_tab.py
@@ -0,0 +1,44 @@
+from textual.app import ComposeResult
+from textual.containers import Container, Horizontal
+from textual.widgets import Button, Input, Label
+from textual import on
+
+from ..back.notification_service import notification_service
+
+class OptionsTab(Container):
+ def __init__(self, scanner_manager, detector_manager, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.scanner_manager = scanner_manager
+ self.detector_manager = detector_manager
+
+ def compose(self) -> ComposeResult:
+ notify_section = Container(id="notify-section", classes="section-card")
+ notify_section.border_title = "Notification config"
+ with notify_section:
+ yield Label("Discord Webhook URL:")
+ yield Input(placeholder="https://discord.com/api/webhooks/...", id="input-webhook-url")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Save config", id="save-config", variant="success")
+ yield Button("notification test", id="test-notif", variant="primary")
+
+ def on_mount(self):
+ self.query_one("#input-webhook-url", Input).value = notification_service.webhook_url
+
+ @on(Button.Pressed, "#save-config")
+ def save_configuration(self):
+ url = self.query_one("#input-webhook-url", Input).value.strip()
+
+ if notification_service.save_config(url):
+ self.app.notify("Config saved", severity="information")
+ else:
+ self.app.notify("Error during saving", severity="error")
+
+ @on(Button.Pressed, "#test-notif")
+ def test_notification(self):
+ success = notification_service.send_message("**Test NetMonitor**\n")
+
+ if success:
+ self.app.notify("good", severity="information")
+ else:
+ self.app.notify("bad", severity="error")
diff --git a/src/netmonitor/front/scanner_profiles_tab.py b/src/netmonitor/front/scanner_profiles_tab.py
new file mode 100644
index 0000000..8e8610c
--- /dev/null
+++ b/src/netmonitor/front/scanner_profiles_tab.py
@@ -0,0 +1,84 @@
+from textual.widgets import Button, Label, Switch
+from textual import on
+from textual.app import ComposeResult
+from textual.containers import Vertical, Horizontal, VerticalScroll
+
+from ..back.scanner_profiles_manager import ScannerProfilesManager
+from .scanner_profiles_tab_pushscreens import ScanNowPushScreen, SetSchedulerPushScreen, ShowLogsPushScreen, SetNotifacationOptionsPushScreen, ShowProfilePushScreen, ConfirmDeletePushScreen
+
+class ScannerProfilesTab(Vertical):
+ def __init__(self, manager: ScannerProfilesManager, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.manager.on_refresh = self.refresh_profiles
+ self.manager.on_message = self.on_manager_message
+
+ def compose(self) -> ComposeResult:
+ yield VerticalScroll(id="profiles-list")
+
+ def on_mount(self) -> None:
+ self.refresh_profiles()
+
+ def refresh_profiles(self) -> None:
+ profiles_list = self.query_one("#profiles-list", VerticalScroll)
+ profiles_list.remove_children()
+
+ if not self.manager.profiles:
+ return
+
+ for profile in self.manager.profiles:
+ row = Horizontal(
+ Switch(id=f"switch-{profile.profile_name}", value=profile.is_active),
+ Button("Scan Now", id=f"scan-now-button-{profile.profile_name}", classes="profile-action", variant="success"),
+ Label(f"{profile.profile_name}", classes="profile-profile_name"),
+ Button("Show Profile", id=f"show-profile-button-{profile.profile_name}", classes="profile-action", variant="primary"),
+ Button("Show logs", id=f"show-logs-button-{profile.profile_name}", classes="profile-action", variant="default"),
+ Button("Set scheduler", id=f"set-scheduler-button-{profile.profile_name}", classes="profile-action", variant="default"),
+ Button("Notifications", id=f"set-notifications-button-{profile.profile_name}", classes="profile-action", variant="default"),
+ Button("Delete", id=f"delete-{profile.profile_name}", classes="profile-action", variant="error"),
+ classes="profile-row"
+ )
+ profiles_list.mount(row)
+
+ @on(Switch.Changed)
+ def switch_changed(self, event: Switch.Changed) -> None:
+ switch_id = event.switch.id
+ if not switch_id:
+ return
+ profile_name = switch_id.removeprefix("switch-")
+
+ if event.switch.value:
+ is_turned_on = self.manager.turn_on_profile(profile_name)
+ if not is_turned_on:
+ event.switch.value = False
+ else:
+ is_turned_off = self.manager.turn_off_profile(profile_name)
+ if not is_turned_off:
+ event.switch.value = False
+
+ @on(Button.Pressed)
+ def on_any_button_pressed(self, event: Button.Pressed) -> None:
+ button_id = event.button.id
+ if not button_id:
+ return
+ if button_id.startswith("scan-now-button-"):
+ profile_name = button_id.removeprefix("scan-now-button-")
+ self.app.push_screen(ScanNowPushScreen(self.manager, profile_name))
+ elif button_id.startswith("show-profile-button-"):
+ profile_name = button_id.removeprefix("show-profile-button-")
+ self.app.push_screen(ShowProfilePushScreen(self.manager, profile_name))
+ elif button_id.startswith("set-scheduler-button-"):
+ profile_name = button_id.removeprefix("set-scheduler-button-")
+ self.app.push_screen(SetSchedulerPushScreen(self.manager, profile_name))
+ elif button_id.startswith("set-notifications-button-"):
+ profile_name = button_id.removeprefix("set-notifications-button-")
+ self.app.push_screen(SetNotifacationOptionsPushScreen(self.manager, profile_name))
+ elif button_id.startswith("delete-"):
+ profile_name = button_id.removeprefix("delete-")
+ self.app.push_screen(ConfirmDeletePushScreen(self.manager, profile_name))
+ elif button_id.startswith("show-logs-button-"):
+ profile_name = button_id.removeprefix("show-logs-button-")
+ self.app.push_screen(ShowLogsPushScreen(self.manager, profile_name))
+
+ def on_manager_message(self, msg: str, title: str, severity):
+ self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/scanner_profiles_tab_pushscreens.py b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py
new file mode 100644
index 0000000..5be9941
--- /dev/null
+++ b/src/netmonitor/front/scanner_profiles_tab_pushscreens.py
@@ -0,0 +1,180 @@
+from textual.app import ComposeResult
+from textual import on
+from textual.screen import ModalScreen
+from textual.widgets import Input, Button, Label, Pretty, Select, Switch
+from textual.containers import Vertical, Horizontal, VerticalScroll, Container
+
+from ..back.scanner_profiles_manager import ScannerProfilesManager
+
+class ScanNowPushScreen(ModalScreen[str]):
+ def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window large-modal"):
+ yield Label(f"Scanning: {self.profile_name}", classes="modal-header")
+ with VerticalScroll(classes="info-box"):
+ yield Pretty(self.manager.get_profile(self.profile_name).scan())
+ with Horizontal(classes="modal-footer"):
+ yield Button("Close", variant="primary")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed) -> None:
+ self.dismiss(None)
+
+class ShowProfilePushScreen(ModalScreen[str]):
+ def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window medium-modal"):
+ yield Label(f"Profile: {self.profile_name}", classes="modal-header")
+ with VerticalScroll(classes="info-box"):
+ yield Pretty(self.manager.get_profile(self.profile_name).to_dict())
+ with Horizontal(classes="modal-footer"):
+ yield Button("Close", id="cancel-button", variant="primary")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed) -> None:
+ self.dismiss(None)
+
+
+class SetSchedulerPushScreen(ModalScreen[str]):
+ def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window small-modal"):
+ yield Label("Set CRON Scheduler", classes="modal-header")
+ yield Label("Format: min hour day month day_of_week", classes="label")
+ yield Input(placeholder="* * * * * ", id="cron-input", classes="input")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Confirm", id="confirm-button", variant="success")
+ yield Button("Cancel", id="cancel-button", variant="error")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ cron_input = self.query_one("#cron-input", Input).value.strip()
+ if cron_input:
+ self.manager.set_validated_scheduler(self.profile_name, cron_input)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)
+
+class SetNotifacationOptionsPushScreen(ModalScreen[str]):
+ def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.profile = self.manager.get_profile(profile_name)
+
+ def compose(self) -> ComposeResult:
+ current_enabled = getattr(self.profile, 'notify_enabled', False)
+ current_cve_only = getattr(self.profile, 'notify_only_cve', False)
+
+ with Container(classes="modal-window small-modal"):
+ yield Label(f"Notifications: {self.profile_name}", classes="modal-header")
+
+ with Vertical(classes="section-card"):
+ yield Label("Enable notifications (Discord):")
+ yield Switch(value=current_enabled, id="switch-enable")
+
+ yield Label("Notify only when scanner finds CVE")
+ yield Switch(value=current_cve_only, id="switch-cve-only")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Close", id="close-button", variant="primary")
+
+ @on(Switch.Changed)
+ def on_switch_changed(self, event: Switch.Changed):
+ if event.switch.id == "switch-enable":
+ self.profile.notify_enabled = event.value
+ elif event.switch.id == "switch-cve-only":
+ self.profile.notify_only_cve = event.value
+
+ self.manager.try_save_profiles(notify=False)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ self.dismiss(None)
+
+class ShowLogsPushScreen(ModalScreen[str]):
+ def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+ self.logs = []
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window large-modal"):
+ yield Label(f"Logs: {self.profile_name}", classes="modal-header")
+
+ yield Select([], prompt="Select date", id="scan-date-select")
+
+ with VerticalScroll(classes="info-box"):
+ yield Pretty({}, id="log-content")
+
+ with Horizontal(classes="modal-footer"):
+ yield Button("Close", id="cancel-button", variant="primary")
+
+ def on_mount(self):
+ self.logs = self.manager.get_profile_logs(self.profile_name)
+ select = self.query_one("#scan-date-select", Select)
+ options = []
+
+ if self.logs:
+ for index, log in enumerate(reversed(self.logs)):
+ real_index = len(self.logs) - 1 - index
+ label = log.get('_timestamp', f"Scan #{real_index + 1} (no date)")
+
+ options.append((str(label), real_index))
+
+ select.set_options(options)
+
+ if options:
+ select.value = options[0][1]
+
+ @on(Select.Changed, "#scan-date-select")
+ def on_date_selected(self, event: Select.Changed):
+ if event.value is not None:
+ index = event.value
+ if index == Select.BLANK:
+ self.query_one("#log-content", Pretty).update({})
+ return
+ if 0 <= index < len(self.logs):
+ log_entry = self.logs[index]
+ self.query_one("#log-content", Pretty).update(log_entry)
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed) -> None:
+ self.dismiss(None)
+
+
+class ConfirmDeletePushScreen(ModalScreen[str]):
+ def __init__(self, manager: ScannerProfilesManager, profile_name:str, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.profile_name = profile_name
+
+ def compose(self) -> ComposeResult:
+ with Container(classes="modal-window small-modal"):
+ yield Label("Are you sure?",classes="modal-header")
+ with Horizontal(classes="modal-footer"):
+ yield Button("Delete", id="confirm-button", variant="error")
+ yield Button("Cancel", id="cancel-button", variant="default")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ self.manager.delete_profile(self.profile_name)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)
diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py
new file mode 100755
index 0000000..868fa74
--- /dev/null
+++ b/src/netmonitor/front/scanner_tab.py
@@ -0,0 +1,277 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty
+from textual.widgets.selection_list import Selection
+from textual.containers import Container, Vertical, Horizontal, VerticalScroll
+from textual.events import Mount
+
+import psutil, ipaddress, shlex
+from typing import List, Dict, Optional
+
+from ..back.scanner_profiles_manager import ScannerProfilesManager
+from .scanner_tab_pushscreens import SaveProfilePushScreen
+
+class ScannerTab(Container):
+
+ def __init__(self, manager: ScannerProfilesManager):
+ super().__init__()
+ self.manager = manager
+ self.manager.on_message = self.on_manager_message
+
+ def compose(self) -> ComposeResult:
+ friendly_section = Container(id="friendly-command-section", classes="section card")
+ friendly_section.border_title = "Scanner Configuration"
+
+ with friendly_section:
+ yield Label("Interface:", classes="label")
+ try:
+ interfaces = list(psutil.net_if_addrs().keys())
+ except Exception:
+ interfaces = []
+
+ yield Select.from_values(
+ interfaces,
+ id="interface-select",
+ allow_blank=False,
+ classes="input"
+ )
+
+ yield Label("IP address:", classes="label")
+ yield Input(
+ value="127.0.0.1/32",
+ placeholder="192.168.0.0/24",
+ name="ip-input",
+ id="ip",
+ classes="input"
+ )
+
+ yield Label("Port range (or single port)", classes="label")
+ with Horizontal(id="port-range", classes="input-row"):
+ yield Input(placeholder="Start / Single", id="low-port-range", classes="input half")
+ yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half")
+
+ yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox")
+
+ yield Label("Scan options:", classes="label")
+ with Horizontal(classes="selection-row"):
+ yield SelectionList[str](
+ Selection("Service version (-sV)", "-sV", False),
+ Selection("OS detection (-O)", "-O", False),
+ Selection("SYN scan (-sS)", "-sS", False),
+ Selection("UDP scan (-sU)", "-sU", False),
+ Selection("Fast scan (-F)", "-F", False),
+ Selection("Aggressive scan (-A)", "-A", False),
+ Selection("Ping skip (-Pn)", "-Pn", False),
+ Selection("Timing T4 (-T4)", "-T4", True),
+ classes="selection-list"
+ )
+
+ user_section = Container(id="user-command-section", classes="section card")
+ user_section.border_title = "Manual Command"
+
+ with user_section:
+ yield Input(
+ placeholder="sudo nmap -sS 192.168.1.1",
+ name="user-nmap-input",
+ id="usercommand",
+ classes="input full"
+ )
+
+
+ final_section = Container(id="final-section", classes="section card")
+ final_section.border_title = "Execution"
+
+ with final_section:
+ with Vertical(id="final-command", classes="command-output"):
+ self.final_command = Pretty([], id="final-command-output")
+ yield self.final_command
+ with Vertical(id="buttons", classes="buttons-column"):
+ yield Button("Scan now", id="scan-button", variant="success")
+ yield Button("Save profile", id="save-button", variant="primary")
+
+ results_section = Container(id="results-section", classes="section card")
+ results_section.border_title = "Scan Output"
+
+ with results_section:
+ with VerticalScroll(id="results-scroll"):
+ self.results = Pretty([], id="results", classes="results")
+ yield self.results
+
+ def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]:
+ errors: List[str] = []
+
+ if self.query_one("#cve-checkbox", Checkbox).value:
+ args = nmap_input.get("arguments", "")
+ if "-sV" not in args.split():
+ errors.append("To check for CVE's, '-sV' option is required")
+ return errors
+
+ targets = nmap_input.get("targets", "").strip()
+ if not targets:
+ errors.append("IP field cannot be empty")
+ return errors
+
+ if not self.query_one("#usercommand", Input).value.strip():
+ try:
+ ipaddress.ip_network(targets, strict=False)
+ except ValueError:
+ errors.append(f"Invalid IP address/CIDR: {targets}")
+ return errors
+
+ if "ports" in nmap_input:
+ port_str = nmap_input["ports"]
+ if "-" in port_str:
+ try:
+ low, high = map(int, port_str.split("-"))
+ if not (1 <= low <= 65535 and 1 <= high <= 65535):
+ errors.append("Ports must be in range 1-65535")
+ if low > high:
+ errors.append("Start port cannot be greater than end port")
+ except ValueError:
+ errors.append("Ports (range) must be numbers")
+ else:
+ try:
+ port = int(port_str)
+ if not (1 <= port <= 65535):
+ errors.append("Port must be in range 1-65535")
+ except ValueError:
+ errors.append("Port must be a number")
+
+ dry_run_err = self.dry_run_nmap_command(nmap_input)
+ if dry_run_err:
+ errors.append(dry_run_err)
+ return errors
+
+ def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]:
+ try:
+ cmd = ["nmap"]
+ if "arguments" in nmap_input:
+ cmd += shlex.split(nmap_input["arguments"])
+ if "ports" in nmap_input:
+ cmd += ["-p", nmap_input["ports"]]
+ cmd.append(nmap_input["targets"])
+ return None
+ except Exception as e:
+ return f"Błąd weryfikacji komendy: {e}"
+
+ def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict:
+ user_command = self.query_one("#usercommand", Input).value.strip()
+ low_port = self.query_one("#low-port-range", Input).value.strip()
+ high_port = self.query_one("#high-port-range", Input).value.strip()
+ interface = self.query_one("#interface-select", Select).value
+ targets = self.query_one("#ip", Input).value.strip()
+ options = self.query_one(SelectionList).selected
+
+ if return_dict:
+ if user_command:
+ parts = shlex.split(user_command)
+
+ if parts and parts[0] == "sudo": parts.pop(0)
+ if parts and parts[0] == "nmap": parts.pop(0)
+
+ if not parts:
+ return {}
+
+ target = parts[-1]
+ args = parts[:-1]
+
+ return {
+ "targets": target,
+ "arguments": " ".join(args)
+ }
+ else:
+ nmap_dict = {"targets": targets}
+
+ if low_port:
+ if high_port:
+ nmap_dict["ports"] = f"{low_port}-{high_port}"
+ else:
+ nmap_dict["ports"] = f"{low_port}"
+
+ args: list[str] = []
+ if options:
+ args.extend(options)
+ if interface:
+ args.extend(["-e", str(interface)])
+ if args:
+ nmap_dict["arguments"] = " ".join(args)
+
+ return {k: v for k, v in nmap_dict.items() if v is not None}
+
+ if user_command:
+ return shlex.split(user_command)
+
+ parts = ["nmap"]
+ if interface:
+ parts.extend(["-e", str(interface)])
+ if options:
+ parts.extend(options)
+
+ if low_port:
+ if high_port:
+ parts.extend(["-p", f"{low_port}-{high_port}"])
+ else:
+ parts.extend(["-p", f"{low_port}"])
+
+ parts.append(targets)
+ return parts
+
+ @on(Mount)
+ @on(SelectionList.SelectedChanged)
+ @on(Input.Changed)
+ @on(Select.Changed)
+ def update_selected_view(self, event=None) -> None:
+ self.final_command.update(" ".join(self.get_nmap_input()))
+
+ @on(Button.Pressed, "#scan-button")
+ async def handle_scan_button(self, event: Button.Pressed) -> None:
+ nmap_input = self.get_nmap_input(return_dict=True)
+ if not isinstance(nmap_input, dict):
+ self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
+ return
+ errors = self.validate_inputs(nmap_input)
+ if errors:
+ self.notify("\n".join(errors), title="Bad Input", severity="error")
+ else:
+ try:
+ temp_prof_name = "temp_scan_profile"
+ self.notify("Start scanning", title="Notification", severity="information")
+
+ if not self.manager.add_profile(temp_prof_name, notify=False):
+ self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error")
+ return
+
+ if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False):
+ self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error")
+ self.manager.delete_profile(temp_prof_name, notify=False)
+ return
+
+ profile = self.manager.get_profile(temp_prof_name)
+ if profile:
+ scan_now_results = profile.scan()
+ self.results.update(scan_now_results)
+ self.manager.delete_profile(temp_prof_name, notify=False)
+ self.notify("Scanning completed", title="Notification", severity="information")
+ else:
+ self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error")
+
+ except Exception as e:
+ self.notify(f"Scanner error: {e}", title="Error", severity="error")
+
+ @on(Button.Pressed, "#save-button")
+ async def handle_save_button(self, event: Button.Pressed):
+ nmap_input = self.get_nmap_input(return_dict=True)
+ if not isinstance(nmap_input, dict):
+ self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
+ return
+
+ errors = self.validate_inputs(nmap_input)
+ if errors:
+ self.notify("\n".join(errors), title="Bad Input", severity="error")
+ return
+
+ cve_check = self.query_one("#cve-checkbox", Checkbox).value
+ self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check))
+
+ def on_manager_message(self, msg: str, title: str, severity):
+ self.app.notify(message=msg, title=title, severity=severity)
diff --git a/src/netmonitor/front/scanner_tab_pushscreens.py b/src/netmonitor/front/scanner_tab_pushscreens.py
new file mode 100644
index 0000000..4e4a564
--- /dev/null
+++ b/src/netmonitor/front/scanner_tab_pushscreens.py
@@ -0,0 +1,35 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.widgets import Input, Button, Label
+from textual.containers import Vertical, Horizontal
+from textual.screen import ModalScreen
+
+from ..back.scanner_profiles_manager import ScannerProfilesManager
+
+class SaveProfilePushScreen(ModalScreen[str]):
+ def __init__(self, manager: ScannerProfilesManager, nmap_input, cve_check, *args, **kwargs):
+ super().__init__(*args, **kwargs)
+ self.manager = manager
+ self.nmap_input = nmap_input
+ self.cve_check = cve_check
+
+
+ def compose(self) -> ComposeResult:
+ with Vertical(classes="modal-window small-modal"):
+ yield Label("Save scan profile", classes="modal-header")
+ yield Input(placeholder="Profile name:", id="profile-name", classes="input")
+ with Horizontal(id="buttons-row", classes="modal-buttons modal-footer"):
+ yield Button("Confirm", id="confirm-button", variant="success", classes="button")
+ yield Button("Cancel", id="cancel-button", variant="error", classes="button")
+
+ @on(Button.Pressed)
+ async def on_button_pressed(self, event: Button.Pressed):
+ if event.button.id == "confirm-button":
+ profile_name = self.query_one("#profile-name", Input).value.strip()
+ self.manager.add_profile(profile_name)
+ self.manager.update_profile(profile_name,"nmap_input",self.nmap_input,notify=False)
+ self.manager.update_profile(profile_name,"cve",self.cve_check, notify=False)
+ self.dismiss(None)
+ else:
+ self.dismiss(None)
+
diff --git a/src/netmonitor/styles/styles.css b/src/netmonitor/styles/styles.css
new file mode 100644
index 0000000..c004b67
--- /dev/null
+++ b/src/netmonitor/styles/styles.css
@@ -0,0 +1,250 @@
+/* Już sam nie wiem co sie dzieje w tym pliku, ale nie obchodzi mnie to */
+ScannerTab {
+ layout: grid;
+ grid-size: 3 5;
+ grid-gutter: 1;
+}
+
+ScannerTab #friendly-command-section,
+ScannerTab #user-command-section,
+ScannerTab #results-section {
+ padding: 1;
+ border: round $primary;
+}
+
+ScannerTab #friendly-command-section {
+ row-span: 5;
+ column-span: 1;
+}
+
+ScannerTab #user-command-section {
+ row-span: 1;
+ column-span: 2;
+}
+
+ScannerTab #results-section {
+ row-span: 3;
+ column-span: 2;
+}
+
+ScannerTab Label {
+ padding: 0 1;
+}
+
+ScannerTab Input,
+ScannerTab MaskedInput,
+ScannerTab Select {
+ width: 1fr;
+ margin: 0 0 1 0;
+}
+
+ScannerTab #final-section {
+ row-span: 1;
+ column-span: 3;
+ layout: grid;
+ grid-size: 3 1;
+ grid-gutter: 1;
+ padding: 0;
+ border: round $primary;
+}
+
+ScannerTab #final-command {
+ column-span: 2;
+ padding: 1;
+ align: center middle;
+}
+
+ScannerTab #final-command-output {
+ border: solid $secondary;
+ overflow: auto;
+}
+
+ScannerTab #buttons {
+ column-span: 1;
+ layout: vertical;
+ align: center middle;
+ border-left: solid $primary 20%;
+}
+
+ScannerTab #buttons Button {
+ width: 90%;
+ padding: 1;
+}
+
+ScannerTab #scan-button {
+ border: none;
+}
+
+ScannerTab #save-button {
+ border: none;
+}
+
+
+
+ScannerTab #results-scroll {
+ height: 1fr;
+ width: 100%;
+ scrollbar-color: $primary ;
+ scrollbar-background: $background;
+ scrollbar-size-horizontal: 1;
+}
+
+
+
+DetectorTab Button {
+ content-align: center middle;
+}
+
+DetectorTab Checkbox {
+ width: 100%;
+}
+
+.save-button-container {
+ align: center middle;
+ height: auto;
+ margin-top: 1;
+ width: 100%;
+}
+
+.profile-row {
+ layout: horizontal;
+ align-vertical: middle;
+ height: auto;
+ min-height: 3;
+
+ background: $surface;
+ border-left: solid $primary;
+ margin-bottom: 1;
+ padding: 1;
+}
+
+.profile-row:hover {
+ background: $surface-lighten-1;
+ border-left: solid $accent;
+}
+
+.profile-profile_name {
+ width: 2fr;
+ text-style: bold;
+ color: $text;
+ padding: 1;
+ content-align: center middle;
+}
+
+.section-card {
+ padding: 1;
+ border: round $primary;
+ height: auto;
+}
+
+.detector-scroll {
+ scrollbar-color: $primary ;
+ scrollbar-background: $background;
+ scrollbar-size-horizontal: 1;
+}
+
+
+ModalScreen {
+ align: center middle;
+
+}
+
+.modal-window {
+ background: $surface;
+ border: tall $primary;
+ padding: 1 2;
+ layout: vertical;
+}
+
+.small-modal {
+ width: 50;
+ height: auto;
+}
+
+.medium-modal {
+ width: 80;
+ height: 80%;
+}
+
+.large-modal {
+ width: 95%;
+ height: 95%;
+}
+
+.modal-header {
+ width: 100%;
+ text-align: center;
+ text-style: bold;
+ color: $accent;
+ border-bottom: solid $secondary;
+ padding-bottom: 1;
+ margin-bottom: 1;
+}
+
+.modal-footer {
+ height: auto;
+ width: 100%;
+ align: center middle;
+ padding-top: 1;
+ dock: bottom;
+}
+
+.modal-footer Button {
+ margin: 0 1;
+ min-width: 15;
+}
+
+.modal-split-container {
+ width: 100%;
+ height: 1fr;
+}
+
+.left-panel {
+ width: 65%;
+ height: 100%;
+ margin-right: 1;
+}
+
+.right-panel {
+ width: 35%;
+ height: 100%;
+ border-left: solid $secondary;
+ padding-left: 1;
+}
+
+.plot-card {
+ height: 1fr;
+ border: none;
+ margin-bottom: 1;
+ background: $surface-lighten-1;
+}
+
+.section-header {
+ text-align: center;
+ color: $text-muted;
+ margin-bottom: 1;
+}
+
+.info-box {
+ width: 100%;
+ height: 1fr;
+ border: tall $surface-lighten-2;
+ background: $surface-darken-1;
+ padding: 1;
+ overflow: auto;
+ scrollbar-color: $primary ;
+ scrollbar-background: $background;
+ scrollbar-size-horizontal: 1;
+}
+
+.table-container {
+ width: 100%;
+ height: 1fr;
+ border: solid $secondary;
+}
+
+
+.label-muted {
+ color: $text-muted;
+ text-align: center;
+}