diff options
| author | EnricoGuccii <partyka.003@proton.me> | 2026-01-10 22:43:36 +0100 |
|---|---|---|
| committer | EnricoGuccii <partyka.003@proton.me> | 2026-01-10 22:43:36 +0100 |
| commit | 6d7410e286ce0fde31f89185c095fe90e85597f3 (patch) | |
| tree | 5092c99d353382a71f00d8fe18d53b8073cf3f58 /src/streamml/back | |
| parent | c2f5fbe7fb93ce420caf23c5c0e06144cf953bb8 (diff) | |
bloat removed
Diffstat (limited to 'src/streamml/back')
| -rw-r--r-- | src/streamml/back/__pycache__/detector_profile_HST.cpython-312.pyc | bin | 0 -> 11541 bytes | |||
| -rw-r--r-- | src/streamml/back/__pycache__/detector_profiles_manager.cpython-312.pyc | bin | 0 -> 10024 bytes | |||
| -rw-r--r-- | src/streamml/back/__pycache__/notification_service.cpython-312.pyc | bin | 0 -> 2908 bytes | |||
| -rw-r--r-- | src/streamml/back/__pycache__/window.cpython-312.pyc | bin | 0 -> 9503 bytes | |||
| -rw-r--r-- | src/streamml/back/detector_profile_HST.py | 215 | ||||
| -rw-r--r-- | src/streamml/back/detector_profiles_manager.py | 153 | ||||
| -rw-r--r-- | src/streamml/back/notification_service.py | 49 | ||||
| -rw-r--r-- | src/streamml/back/window.py | 251 |
8 files changed, 668 insertions, 0 deletions
diff --git a/src/streamml/back/__pycache__/detector_profile_HST.cpython-312.pyc b/src/streamml/back/__pycache__/detector_profile_HST.cpython-312.pyc Binary files differnew file mode 100644 index 0000000..72054e8 --- /dev/null +++ b/src/streamml/back/__pycache__/detector_profile_HST.cpython-312.pyc diff --git a/src/streamml/back/__pycache__/detector_profiles_manager.cpython-312.pyc b/src/streamml/back/__pycache__/detector_profiles_manager.cpython-312.pyc Binary files differnew file mode 100644 index 0000000..598bd22 --- /dev/null +++ b/src/streamml/back/__pycache__/detector_profiles_manager.cpython-312.pyc diff --git a/src/streamml/back/__pycache__/notification_service.cpython-312.pyc b/src/streamml/back/__pycache__/notification_service.cpython-312.pyc Binary files differnew file mode 100644 index 0000000..d19dcaa --- /dev/null +++ b/src/streamml/back/__pycache__/notification_service.cpython-312.pyc diff --git a/src/streamml/back/__pycache__/window.cpython-312.pyc b/src/streamml/back/__pycache__/window.cpython-312.pyc Binary files differnew file mode 100644 index 0000000..49f5f63 --- /dev/null +++ b/src/streamml/back/__pycache__/window.cpython-312.pyc diff --git a/src/streamml/back/detector_profile_HST.py b/src/streamml/back/detector_profile_HST.py new file mode 100644 index 0000000..6583769 --- /dev/null +++ b/src/streamml/back/detector_profile_HST.py @@ -0,0 +1,215 @@ +import threading +import queue +import time +import os +from pathlib import Path +from scapy.all import wrpcap, AsyncSniffer + +from river.anomaly import HalfSpaceTrees +from tinydb import TinyDB +from tinydb.table import Document + +from .window import Window +from .notification_service import notification_service + +XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) +LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/detector/profiles_logs" +PCAP_PATH = f"{XDG_DATA_HOME}/netmonitor/detector/profiles_pcaps" + + +class DetectorProfileHST: + + def __init__( + self, + profile_name: str, + input_data: dict + ): + self.profile_name = profile_name + self.features = input_data.get("features", []) + self.params = input_data.get("params", {}) + self.n_trees = int(self.params.get("trees", 10)) + self.height = int(self.params.get("height", 8)) + self.window_size = int(self.params.get("window", 250)) + self.seed = int(self.params.get("seed", 42)) + self.threshold = float(self.params.get("threshold", 0.7)) + self.window_duration = float(self.params.get("window_duration", 10.0)) + self.bpf_filter = self.params.get("bpf_filter", "") + self.interface = self.params.get("interface", None) + self.queue_size = int(self.params.get("queue_size", 10000)) + self.logs_path = f"{LOGS_PATH}/{profile_name}.json" + os.makedirs(os.path.dirname(self.logs_path), exist_ok=True) + + self.is_active = False + + self.notify_enabled = False + self._init_runtime_objects() + + + def _init_runtime_objects(self): + self.queue = queue.Queue(maxsize=self.queue_size) + + os.makedirs(f"{LOGS_PATH}", exist_ok=True) + self.db = TinyDB(f"{LOGS_PATH}/{self.profile_name}.json") + + self.model = HalfSpaceTrees( + n_trees=self.n_trees, + height=self.height, + window_size=self.window_size, + seed=self.seed + ) + + self.window = Window( + window_duration=self.window_duration, + enabled_features=self.features + ) + + self.processor_thread = None + + self.packets_read = 0 + self.windows_analyzed = 0 + + if not hasattr(self, 'plot_data'): + self.plot_data = [] + + + def __getstate__(self): + state = self.__dict__.copy() + cols_to_remove = ['sniffer_thread', 'processor_thread', 'queue', 'db', 'window'] + for col in cols_to_remove: + if col in state: + del state[col] + return state + + def __setstate__(self, state): + self.__dict__.update(state) + self._init_runtime_objects() + self.is_active = False + + def __repr__(self): + return f"<DetectorProfileHST profile_name={self.profile_name!r}, active={self.is_active}>" + + def turn_on(self): + if self.is_active: + return + + self.is_active = True + os.makedirs(os.path.dirname(self.logs_path), exist_ok=True) + self.window.window_duration = self.window_duration + self.window.window_start = time.time() + + self.sniffer = AsyncSniffer( + iface=self.interface, + filter=self.bpf_filter, + store=False, + prn=self._add_to_queue + ) + self.sniffer.start() + + self.processor_thread = threading.Thread(target=self._process_thread, daemon=True) + + self.processor_thread.start() + + def turn_off(self): + self.is_active = False + if self.sniffer: + self.sniffer.stop() + + time.sleep(0.2) + + def _add_to_queue(self, pkt): + if self.queue: + try: + self.queue.put_nowait(pkt) + self.packets_read += 1 + except queue.Full: + pass + + def _process_thread(self): + while self.is_active: + try: + pkt = self.queue.get(timeout=1) + except queue.Empty: + continue + + result = self.window.add_packet(pkt) + + if result is None: + continue + + features, raw_packets = result + self.windows_analyzed += 1 + + if not features: + continue + + sample = {feat: 0.0 for feat in self.features} + for k, v in features.items(): + if k in sample: + sample[k] = float(v) + + score = self.model.score_one(sample) + self.model.learn_one(sample) + + self.plot_data.append(score) + if len(self.plot_data) > 30: + self.plot_data.pop(0) + + if score > self.threshold: + self._handle_anomaly(score, sample, raw_packets) + + def _handle_anomaly(self, score: float, features: dict, raw_packets: list): + timestamp = time.strftime("%Y-%m-%d_%H-%M-%S") + + os.makedirs(os.path.dirname(f"{PCAP_PATH}/{self.profile_name}"), exist_ok=True) + filename = f"{PCAP_PATH}/{self.profile_name}/anom_{timestamp}.pcap" + + if raw_packets: + try: + wrpcap(filename, raw_packets) + except Exception as e: + print(f"Error saving pcap: {e}") + + if self.notify_enabled: + msg = f"*Anomaly detected: {self.profile_name}*\nScore: `{score:.4f}`\nSaved: `{filename}`" + notification_service.send_message(message=msg) + + if self.db: + self.db.insert( + Document({ + "ts": time.time(), + "timestamp": timestamp, + "profile": self.profile_name, + "score": float(score), + "pcap": filename, + "pkt_rate": features.get("pkt_rate", 0), + "proto_info": f"TCP:{features.get('proto_tcp_ratio',0):.2f} UDP:{features.get('proto_udp_ratio',0):.2f}", + }, doc_id=None) + ) + + def to_dict(self): + return { + "profile name": self.profile_name, + "logs_path": self.logs_path, + "pcap_path": f"{PCAP_PATH}/{self.profile_name}", + "params": self.params, + "features": self.features, + } + + def get_logs(self): + if self.db: + return self.db.all() + return [] + + def clear_logs(self): + if self.db: + self.db.truncate() + + def get_runtime_stats(self): + return { + "is_active": self.is_active, + "notify_enabled": self.notify_enabled, + "packets_sniffed": getattr(self, "packets_read", 0), + "queue_size": self.queue.qsize() if hasattr(self, "queue") and self.queue else 0, + "windows_processed": getattr(self, "windows_analyzed", 0), + "window_duration": self.window_duration + } diff --git a/src/streamml/back/detector_profiles_manager.py b/src/streamml/back/detector_profiles_manager.py new file mode 100644 index 0000000..2f1543c --- /dev/null +++ b/src/streamml/back/detector_profiles_manager.py @@ -0,0 +1,153 @@ +import re +from pathlib import Path +from typing import Callable, Literal +import pickle +from streamml.back.detector_profile_HST import DetectorProfileHST + + +SeverityLevel = Literal["information", "warning", "error"] +VALID_NAME_REGEX = r"^[a-zA-Z0-9_-]+$" + +class DetectorProfilesManager: + def __init__(self, profiles_file: str): + self.profiles_file = Path(profiles_file) + self.profiles: list[DetectorProfileHST] = [] + + self.on_message: Callable[[str, str, str], None] | None = None + self.on_refresh: Callable[[], None] | None = None + + self.load_profiles() + + + def _notify(self, msg: str, title: str = "Profile Manager", level: SeverityLevel = "information"): + if self.on_message: + self.on_message(msg, title, level) + + + def _refresh_front(self): + if self.on_refresh: + self.on_refresh() + + + def _fail(self, msg: str, level: SeverityLevel = "error", notify: bool = True) -> bool: + if notify: + self._notify(msg, level=level) + return False + + + def _ok(self, msg: str | None = None, level: SeverityLevel = "information", notify: bool = True) -> bool: + if msg and notify: + self._notify(msg, level=level) + return True + + + def try_save_profiles(self, notify: bool = True) -> bool: + try: + self.profiles_file.parent.mkdir(parents=True, exist_ok=True) + + with open(self.profiles_file, "wb") as f: + pickle.dump(self.profiles, f, protocol=pickle.HIGHEST_PROTOCOL) + + return self._ok("Profiles saved successfully.", notify=notify) + except Exception as e: + import traceback + traceback.print_exc() + return self._fail(f"Error saving profiles: {e}", notify=notify) + + + def load_profiles(self, notify: bool = True) -> bool: + if not self.profiles_file.exists(): + if notify: + self._notify("Profiles file not found. Creating a new one.", level="warning") + if not self.try_save_profiles(notify=False): + return self._fail("Failed to create profiles file!", notify=notify) + return self._ok("New profiles file created.", notify=notify) + + try: + with open(self.profiles_file, "rb") as f: + self.profiles = pickle.load(f) + self._refresh_front() + return self._ok(f"Loaded {len(self.profiles)} profiles.", notify=notify) + except Exception as e: + return self._fail(f"Error loading profiles: {e}", notify=notify) + + + def add_profile(self, profile_name: str, input_data: dict, notify: bool = True) -> bool: + if not profile_name: + return self._fail("Name can't be blank.", "error", notify) + + if not re.match(VALID_NAME_REGEX, profile_name): + return self._fail(f"Bad input '{profile_name}'. Use only letters, numbers, '_' and '-'.", "error", notify) + + if any(p.profile_name == profile_name for p in self.profiles): + return self._fail(f"Profile {profile_name} already exists.", "warning", notify) + + new_profile = DetectorProfileHST(profile_name=profile_name, input_data = input_data) + self.profiles.append(new_profile) + + if not self.try_save_profiles(notify=False): + self.profiles.remove(new_profile) + return self._fail(f"Failed to save profile {profile_name}.", notify=notify) + + self._refresh_front() + return self._ok(f"Added profile {profile_name}.", notify=notify) + + + def delete_profile(self, profile_name: str, notify: bool = True) -> bool: + before = len(self.profiles) + self.profiles = [p for p in self.profiles if p.profile_name != profile_name] + + if len(self.profiles) == before: + return self._fail(f"Profile {profile_name} not found.", "warning", notify) + + if not self.try_save_profiles(notify=False): + return self._fail(f"Failed to save changes after deleting {profile_name}.", notify=notify) + + self._refresh_front() + return self._ok(f"Deleted profile {profile_name}.", notify=notify) + + + def get_profile(self, profile_name: str) -> DetectorProfileHST | None: + return next((p for p in self.profiles if p.profile_name == profile_name), None) + + + def update_profile(self, profile_name: str, field: str, value, notify: bool = True) -> bool: + p = self.get_profile(profile_name) + if not p: + return self._fail(f"Profile {profile_name} does not exist.", notify=notify) + + setattr(p, field, value) + if self.try_save_profiles(notify=False): + return self._ok(f"Updated profile {profile_name}.", notify=notify) + else: + return self._fail(f"Failed to save updated profile {profile_name}.", notify=notify) + + + def turn_on_profile(self, profile_name: str, app=None, notify: bool = True) -> bool: + p = self.get_profile(profile_name) + if not p: + return self._fail(f"Profile {profile_name} not found.", notify=notify) + try: + p.turn_on() + return self._ok(f"Profile {profile_name} activated.", notify=notify) + except Exception as e: + return self._fail(f"Error activating profile: {e}", notify=notify) + + + def turn_off_profile(self, profile_name: str, notify: bool = True) -> bool: + p = self.get_profile(profile_name) + if not p: + return self._fail(f"Profile {profile_name} not found.", notify=notify) + try: + p.turn_off() + return self._ok(f"Profile {profile_name} deactivated.", notify=notify) + except Exception as e: + return self._fail(f"Error deactivating profile: {e}", notify=notify) + + + def get_profile_logs(self, profile_name: str, notify: bool = True): + p = self.get_profile(profile_name) + if not p: + self._fail(f"Profile {profile_name} not found.", "error", notify) + return None + return p.get_logs() diff --git a/src/streamml/back/notification_service.py b/src/streamml/back/notification_service.py new file mode 100644 index 0000000..e347faf --- /dev/null +++ b/src/streamml/back/notification_service.py @@ -0,0 +1,49 @@ +import json +import requests +from pathlib import Path + +CONFIG_FILE = Path("data/global_config.json") + +class NotificationService: + def __init__(self): + self.webhook_url = "" + self.load_config() + + def load_config(self): + if CONFIG_FILE.exists(): + try: + with open(CONFIG_FILE, "r") as f: + data = json.load(f) + self.webhook_url = data.get("discord_webhook_url", "") + except Exception as e: + print(f"Error loading notification config: {e}") + + def save_config(self, webhook_url: str): + self.webhook_url = webhook_url + + CONFIG_FILE.parent.mkdir(parents=True, exist_ok=True) + try: + with open(CONFIG_FILE, "w") as f: + json.dump({ + "discord_webhook_url": self.webhook_url + }, f, indent=4) + return True + except Exception as e: + print(f"Error saving config: {e}") + return False + + def send_message(self, message: str) -> bool: + if not self.webhook_url: + return False + + payload = { + "content": message + } + try: + response = requests.post(self.webhook_url, json=payload, timeout=5) + return response.status_code in [200, 204] + except Exception as e: + print(f"Discord send error: {e}") + return False + +notification_service = NotificationService() diff --git a/src/streamml/back/window.py b/src/streamml/back/window.py new file mode 100644 index 0000000..dfa8e7d --- /dev/null +++ b/src/streamml/back/window.py @@ -0,0 +1,251 @@ +import time +from collections import defaultdict +from statistics import mean, pstdev +import math + +from scapy.all import IP, IPv6, TCP, UDP, ICMP + +class Window: + def __init__(self, window_duration: float, enabled_features: list[str]): + + self.window_duration = float(window_duration) + self.enabled = set(enabled_features) + + self.window_start = time.time() + + self.raw_packets_buffer = [] + + self.flows = defaultdict(lambda: { + "pkt_count": 0, + "byte_count": 0, + "dst_ports": defaultdict(int), + "src_ports": defaultdict(int), + "tcp_flags": { + "syn": 0, "fin": 0, "rst": 0, "ack": 0, + "psh": 0, "urg": 0, "xmas": 0, "null": 0, + }, + "sizes": [], + "start_ts": None, + "end_ts": None, + "tcp_pkts": 0, + "udp_pkts": 0, + "icmp_pkts": 0, + }) + + def add_packet(self, pkt): + now = time.time() + + if now - self.window_start >= self.window_duration: + features = self._finish_window() + + self.window_start = now + + raw = list(self.raw_packets_buffer) + + self.raw_packets_buffer.clear() + self.flows.clear() + + self._process_single_packet(pkt) + + return features, raw + + self._process_single_packet(pkt) + return None + + def _process_single_packet(self, pkt): + self.raw_packets_buffer.append(pkt) + + proto = None + if IP in pkt: + src = pkt[IP].src + dst = pkt[IP].dst + proto = pkt[IP].proto + elif IPv6 in pkt: + src = pkt[IPv6].src + dst = pkt[IPv6].dst + proto = pkt[IPv6].nh + else: + return + + key = (src, dst, proto) + f = self.flows[key] + + now = time.time() + if f["start_ts"] is None: + f["start_ts"] = now + f["end_ts"] = now + + size = len(pkt) + f["pkt_count"] += 1 + f["byte_count"] += size + f["sizes"].append(size) + + if TCP in pkt: + f["tcp_pkts"] += 1 + dport = pkt[TCP].dport + sport = pkt[TCP].sport + f["dst_ports"][dport] += 1 + f["src_ports"][sport] += 1 + + flags = pkt[TCP].flags + if flags & 0x02: f["tcp_flags"]["syn"] += 1 + if flags & 0x01: f["tcp_flags"]["fin"] += 1 + if flags & 0x04: f["tcp_flags"]["rst"] += 1 + if flags & 0x10: f["tcp_flags"]["ack"] += 1 + if flags & 0x08: f["tcp_flags"]["psh"] += 1 + if flags & 0x20: f["tcp_flags"]["urg"] += 1 + + if flags in [0x29, 0x3F, 0x3B]: + f["tcp_flags"]["xmas"] += 1 + if flags == 0: + f["tcp_flags"]["null"] += 1 + + elif UDP in pkt: + f["udp_pkts"] += 1 + dport = pkt[UDP].dport + sport = pkt[UDP].sport + f["dst_ports"][dport] += 1 + f["src_ports"][sport] += 1 + + elif ICMP in pkt: + f["icmp_pkts"] += 1 + + def _finish_window(self): + if not self.flows: + return {} + + total_flows = len(self.flows) + total_packets = 0 + total_bytes = 0 + + tcp_flags_global = { + "syn": 0, "fin": 0, "rst": 0, "ack": 0, + "psh": 0, "urg": 0, "xmas": 0, "null": 0 + } + + all_sizes = [] + proto_tcp = 0 + proto_udp = 0 + proto_icmp = 0 + + dst_port_counts = defaultdict(int) + src_port_counts = defaultdict(int) + + for f in self.flows.values(): + total_packets += f["pkt_count"] + total_bytes += f["byte_count"] + + for p, c in f["dst_ports"].items(): + dst_port_counts[p] += c + for p, c in f["src_ports"].items(): + src_port_counts[p] += c + + for k in tcp_flags_global: + tcp_flags_global[k] += f["tcp_flags"][k] + + all_sizes.extend(f["sizes"]) + proto_tcp += f["tcp_pkts"] + proto_udp += f["udp_pkts"] + proto_icmp += f["icmp_pkts"] + + window_len = self.window_duration + total_pkts = total_packets if total_packets > 0 else 1 + + feat = {} + + if "flow_count" in self.enabled: feat["flow_count"] = total_flows + if "total_packets" in self.enabled: feat["total_packets"] = total_packets + if "total_bytes" in self.enabled: feat["total_bytes"] = total_bytes + if "avg_bytes_per_flow" in self.enabled: feat["avg_bytes_per_flow"] = total_bytes / total_flows if total_flows else 0 + if "pkt_rate" in self.enabled: feat["pkt_rate"] = total_packets / window_len + if "byte_rate" in self.enabled: feat["byte_rate"] = total_bytes / window_len + + if "syn_count" in self.enabled: feat["syn_count"] = tcp_flags_global["syn"] + if "fin_count" in self.enabled: feat["fin_count"] = tcp_flags_global["fin"] + if "rst_count" in self.enabled: feat["rst_count"] = tcp_flags_global["rst"] + if "ack_count" in self.enabled: feat["ack_count"] = tcp_flags_global["ack"] + if "psh_count" in self.enabled: feat["psh_count"] = tcp_flags_global["psh"] + if "urg_count" in self.enabled: feat["urg_count"] = tcp_flags_global["urg"] + + if "syn_ratio" in self.enabled: feat["syn_ratio"] = tcp_flags_global["syn"] / total_pkts + if "fin_ratio" in self.enabled: feat["fin_ratio"] = tcp_flags_global["fin"] / total_pkts + if "xmas_total" in self.enabled: feat["xmas_total"] = tcp_flags_global["xmas"] + if "null_scan_total" in self.enabled: feat["null_scan_total"] = tcp_flags_global["null"] + + if "unique_dst_ports" in self.enabled: feat["unique_dst_ports"] = len(dst_port_counts) + if "unique_src_ports" in self.enabled: feat["unique_src_ports"] = len(src_port_counts) + if "port_entropy_dst" in self.enabled: feat["port_entropy_dst"] = entropy(dst_port_counts) + if "port_entropy_src" in self.enabled: feat["port_entropy_src"] = entropy(src_port_counts) + + if all_sizes: + if "avg_pkt_size" in self.enabled: feat["avg_pkt_size"] = mean(all_sizes) + if "min_pkt_size" in self.enabled: feat["min_pkt_size"] = min(all_sizes) + if "max_pkt_size" in self.enabled: feat["max_pkt_size"] = max(all_sizes) + if "std_pkt_size" in self.enabled: feat["std_pkt_size"] = pstdev(all_sizes) + else: + for k in ["avg_pkt_size", "min_pkt_size", "max_pkt_size", "std_pkt_size"]: + if k in self.enabled: feat[k] = 0 + + if "avg_packets_per_flow" in self.enabled: + feat["avg_packets_per_flow"] = total_packets / total_flows if total_flows else 0 + if "avg_bytes_per_packet" in self.enabled: + feat["avg_bytes_per_packet"] = total_bytes / total_pkts + + if "proto_tcp_ratio" in self.enabled: feat["proto_tcp_ratio"] = proto_tcp / total_pkts + if "proto_udp_ratio" in self.enabled: feat["proto_udp_ratio"] = proto_udp / total_pkts + if "proto_icmp_ratio" in self.enabled: feat["proto_icmp_ratio"] = proto_icmp / total_pkts + + return feat + + +FEATURE_LIST = [ + "flow_count", + "total_packets", + "total_bytes", + "avg_bytes_per_flow", + "pkt_rate", + "byte_rate", + + "syn_count", + "fin_count", + "rst_count", + "ack_count", + "psh_count", + "urg_count", + "syn_ratio", + "fin_ratio", + "xmas_total", + "null_scan_total", + + "unique_dst_ports", + "unique_src_ports", + "port_entropy_dst", + "port_entropy_src", + + "avg_pkt_size", + "min_pkt_size", + "max_pkt_size", + "std_pkt_size", + + "avg_packets_per_flow", + "avg_bytes_per_packet", + "proto_tcp_ratio", + "proto_udp_ratio", + "proto_icmp_ratio", +] + + +def entropy(values): + if not values: + return 0.0 + + total = sum(values.values()) + if total == 0: + return 0.0 + + entropy_val = 0.0 + for count in values.values(): + p = count / total + entropy_val -= p * math.log2(p) + + return entropy_val |