diff options
Diffstat (limited to 'src/netmonitor/back/scanner_profile.py')
| -rw-r--r-- | src/netmonitor/back/scanner_profile.py | 138 |
1 files changed, 138 insertions, 0 deletions
diff --git a/src/netmonitor/back/scanner_profile.py b/src/netmonitor/back/scanner_profile.py new file mode 100644 index 0000000..6a20b5b --- /dev/null +++ b/src/netmonitor/back/scanner_profile.py @@ -0,0 +1,138 @@ +import os +from pathlib import Path +from datetime import datetime +from tinydb import TinyDB +import nmap +from .notification_service import notification_service + + +XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) +LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/scanner/profiles_logs" + +class ScannerProfile: + def __init__(self, profile_name: str, nmap_input=None, scheduler=None, cve=None): + self.profile_name = profile_name + self.nmap_input = nmap_input or {} + self.scheduler = scheduler + self.cve = cve + self.is_active = False + + self.notify_enabled = False + self.notify_only_cve = False + + self.nm = None + self.db = None + self.profile_results_path = f"{LOGS_PATH}/{profile_name}.json" + os.makedirs(os.path.dirname(self.profile_results_path), exist_ok=True) + + @property + def nmap(self): + if self.nm is None: + self.nm = nmap.PortScanner() + return self.nm + + @property + def tinydb(self): + if self.db is None: + self.db = TinyDB(self.profile_results_path) + return self.db + + def __getstate__(self): + state = self.__dict__.copy() + state['nm'] = None + state['db'] = None + return state + + def __setstate__(self, state): + self.__dict__.update(state) + self.nm = None + self.db = None + + def __repr__(self): + return f"<ScannerProfile profile_name={self.profile_name!r}, active={self.is_active}>" + + + def scan(self): + targets = self.nmap_input.get("targets", "") + arguments = self.nmap_input.get("arguments", "") + ports = self.nmap_input.get("ports", "") + + if self.cve: + arguments += " --script=vulners " + + if ports == '': + self.nmap.scan(hosts=targets, arguments=arguments) + else: + self.nmap.scan(hosts=targets, ports=ports, arguments=arguments) + + xml_result = self.nmap.get_nmap_last_output() + analyzed_results = self.nmap.analyse_nmap_xml_scan(xml_result) + + if isinstance(analyzed_results, dict): + analyzed_results['_timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + self.tinydb.insert(analyzed_results) + + if self.notify_enabled: + self._handle_notification(analyzed_results, targets) + + return analyzed_results + + def _handle_notification(self, results, targets): + scan_str = str(results).lower() + found_issues = "cve" in scan_str or "vulnerab" in scan_str + should_send = False + msg = f"*Scanner report: {self.profile_name}*\nTargets: `{targets}`" + summary = "" + try: + if 'scan' in results: + for host, data in results['scan'].items(): + summary += f"\n *{host}*" + + if 'tcp' in data: + open_ports = [f"{p}/tcp" for p, info in data['tcp'].items() if info.get('state') == 'open'] + if open_ports: + summary += f"\n Open ports: {', '.join(open_ports)}" + if 'udp' in data: + open_ports = [f"{p}/udp" for p, info in data['udp'].items() if info.get('state') == 'open'] + if open_ports: + summary += f"\n Open ports (UDP): {', '.join(open_ports)}" + if 'osmatch' in data and data['osmatch']: + os_name = data['osmatch'][0].get('name', 'Unknown') + summary += f"\n OS detected: {os_name}" + except Exception as e: + summary += f"\n(Error building summary: {e})" + + if self.notify_only_cve: + if found_issues: + should_send = True + msg += "\nCVE detected!" + msg += "\n---" + summary + else: + should_send = True + msg += "\nScan completed." + if found_issues: + msg += "\nPotential vulnerabilities detected." + + if summary: + msg += "\n---" + summary + + if should_send: + if len(msg) > 1900: + msg = msg[:1900] + "\n...." + notification_service.send_message(msg) + + + def to_dict(self): + return { + "profile_name": self.profile_name, + "nmap_input": self.nmap_input, + "scheduler": self.scheduler, + "is_active": self.is_active, + "notify_enabled": getattr(self, 'notify_enabled', False), + "notify_only_cve": getattr(self, 'notify_only_cve', False), + "results_path": self.profile_results_path, + } + + def get_logs(self): + return self.tinydb.all() |