diff options
Diffstat (limited to 'src/netmonitor/back/scanner_profile.py')
| -rw-r--r-- | src/netmonitor/back/scanner_profile.py | 138 |
1 files changed, 0 insertions, 138 deletions
diff --git a/src/netmonitor/back/scanner_profile.py b/src/netmonitor/back/scanner_profile.py deleted file mode 100644 index 6a20b5b..0000000 --- a/src/netmonitor/back/scanner_profile.py +++ /dev/null @@ -1,138 +0,0 @@ -import os -from pathlib import Path -from datetime import datetime -from tinydb import TinyDB -import nmap -from .notification_service import notification_service - - -XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share")) -LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/scanner/profiles_logs" - -class ScannerProfile: - def __init__(self, profile_name: str, nmap_input=None, scheduler=None, cve=None): - self.profile_name = profile_name - self.nmap_input = nmap_input or {} - self.scheduler = scheduler - self.cve = cve - self.is_active = False - - self.notify_enabled = False - self.notify_only_cve = False - - self.nm = None - self.db = None - self.profile_results_path = f"{LOGS_PATH}/{profile_name}.json" - os.makedirs(os.path.dirname(self.profile_results_path), exist_ok=True) - - @property - def nmap(self): - if self.nm is None: - self.nm = nmap.PortScanner() - return self.nm - - @property - def tinydb(self): - if self.db is None: - self.db = TinyDB(self.profile_results_path) - return self.db - - def __getstate__(self): - state = self.__dict__.copy() - state['nm'] = None - state['db'] = None - return state - - def __setstate__(self, state): - self.__dict__.update(state) - self.nm = None - self.db = None - - def __repr__(self): - return f"<ScannerProfile profile_name={self.profile_name!r}, active={self.is_active}>" - - - def scan(self): - targets = self.nmap_input.get("targets", "") - arguments = self.nmap_input.get("arguments", "") - ports = self.nmap_input.get("ports", "") - - if self.cve: - arguments += " --script=vulners " - - if ports == '': - self.nmap.scan(hosts=targets, arguments=arguments) - else: - self.nmap.scan(hosts=targets, ports=ports, arguments=arguments) - - xml_result = self.nmap.get_nmap_last_output() - analyzed_results = self.nmap.analyse_nmap_xml_scan(xml_result) - - if isinstance(analyzed_results, dict): - analyzed_results['_timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - self.tinydb.insert(analyzed_results) - - if self.notify_enabled: - self._handle_notification(analyzed_results, targets) - - return analyzed_results - - def _handle_notification(self, results, targets): - scan_str = str(results).lower() - found_issues = "cve" in scan_str or "vulnerab" in scan_str - should_send = False - msg = f"*Scanner report: {self.profile_name}*\nTargets: `{targets}`" - summary = "" - try: - if 'scan' in results: - for host, data in results['scan'].items(): - summary += f"\n *{host}*" - - if 'tcp' in data: - open_ports = [f"{p}/tcp" for p, info in data['tcp'].items() if info.get('state') == 'open'] - if open_ports: - summary += f"\n Open ports: {', '.join(open_ports)}" - if 'udp' in data: - open_ports = [f"{p}/udp" for p, info in data['udp'].items() if info.get('state') == 'open'] - if open_ports: - summary += f"\n Open ports (UDP): {', '.join(open_ports)}" - if 'osmatch' in data and data['osmatch']: - os_name = data['osmatch'][0].get('name', 'Unknown') - summary += f"\n OS detected: {os_name}" - except Exception as e: - summary += f"\n(Error building summary: {e})" - - if self.notify_only_cve: - if found_issues: - should_send = True - msg += "\nCVE detected!" - msg += "\n---" + summary - else: - should_send = True - msg += "\nScan completed." - if found_issues: - msg += "\nPotential vulnerabilities detected." - - if summary: - msg += "\n---" + summary - - if should_send: - if len(msg) > 1900: - msg = msg[:1900] + "\n...." - notification_service.send_message(msg) - - - def to_dict(self): - return { - "profile_name": self.profile_name, - "nmap_input": self.nmap_input, - "scheduler": self.scheduler, - "is_active": self.is_active, - "notify_enabled": getattr(self, 'notify_enabled', False), - "notify_only_cve": getattr(self, 'notify_only_cve', False), - "results_path": self.profile_results_path, - } - - def get_logs(self): - return self.tinydb.all() |