summaryrefslogtreecommitdiff
path: root/src/netmonitor/back/scanner_profile.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/netmonitor/back/scanner_profile.py')
-rw-r--r--src/netmonitor/back/scanner_profile.py138
1 files changed, 0 insertions, 138 deletions
diff --git a/src/netmonitor/back/scanner_profile.py b/src/netmonitor/back/scanner_profile.py
deleted file mode 100644
index 6a20b5b..0000000
--- a/src/netmonitor/back/scanner_profile.py
+++ /dev/null
@@ -1,138 +0,0 @@
-import os
-from pathlib import Path
-from datetime import datetime
-from tinydb import TinyDB
-import nmap
-from .notification_service import notification_service
-
-
-XDG_DATA_HOME = Path(os.environ.get("XDG_DATA_HOME", Path.home() / ".local/share"))
-LOGS_PATH = f"{XDG_DATA_HOME}/netmonitor/scanner/profiles_logs"
-
-class ScannerProfile:
- def __init__(self, profile_name: str, nmap_input=None, scheduler=None, cve=None):
- self.profile_name = profile_name
- self.nmap_input = nmap_input or {}
- self.scheduler = scheduler
- self.cve = cve
- self.is_active = False
-
- self.notify_enabled = False
- self.notify_only_cve = False
-
- self.nm = None
- self.db = None
- self.profile_results_path = f"{LOGS_PATH}/{profile_name}.json"
- os.makedirs(os.path.dirname(self.profile_results_path), exist_ok=True)
-
- @property
- def nmap(self):
- if self.nm is None:
- self.nm = nmap.PortScanner()
- return self.nm
-
- @property
- def tinydb(self):
- if self.db is None:
- self.db = TinyDB(self.profile_results_path)
- return self.db
-
- def __getstate__(self):
- state = self.__dict__.copy()
- state['nm'] = None
- state['db'] = None
- return state
-
- def __setstate__(self, state):
- self.__dict__.update(state)
- self.nm = None
- self.db = None
-
- def __repr__(self):
- return f"<ScannerProfile profile_name={self.profile_name!r}, active={self.is_active}>"
-
-
- def scan(self):
- targets = self.nmap_input.get("targets", "")
- arguments = self.nmap_input.get("arguments", "")
- ports = self.nmap_input.get("ports", "")
-
- if self.cve:
- arguments += " --script=vulners "
-
- if ports == '':
- self.nmap.scan(hosts=targets, arguments=arguments)
- else:
- self.nmap.scan(hosts=targets, ports=ports, arguments=arguments)
-
- xml_result = self.nmap.get_nmap_last_output()
- analyzed_results = self.nmap.analyse_nmap_xml_scan(xml_result)
-
- if isinstance(analyzed_results, dict):
- analyzed_results['_timestamp'] = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
-
- self.tinydb.insert(analyzed_results)
-
- if self.notify_enabled:
- self._handle_notification(analyzed_results, targets)
-
- return analyzed_results
-
- def _handle_notification(self, results, targets):
- scan_str = str(results).lower()
- found_issues = "cve" in scan_str or "vulnerab" in scan_str
- should_send = False
- msg = f"*Scanner report: {self.profile_name}*\nTargets: `{targets}`"
- summary = ""
- try:
- if 'scan' in results:
- for host, data in results['scan'].items():
- summary += f"\n *{host}*"
-
- if 'tcp' in data:
- open_ports = [f"{p}/tcp" for p, info in data['tcp'].items() if info.get('state') == 'open']
- if open_ports:
- summary += f"\n Open ports: {', '.join(open_ports)}"
- if 'udp' in data:
- open_ports = [f"{p}/udp" for p, info in data['udp'].items() if info.get('state') == 'open']
- if open_ports:
- summary += f"\n Open ports (UDP): {', '.join(open_ports)}"
- if 'osmatch' in data and data['osmatch']:
- os_name = data['osmatch'][0].get('name', 'Unknown')
- summary += f"\n OS detected: {os_name}"
- except Exception as e:
- summary += f"\n(Error building summary: {e})"
-
- if self.notify_only_cve:
- if found_issues:
- should_send = True
- msg += "\nCVE detected!"
- msg += "\n---" + summary
- else:
- should_send = True
- msg += "\nScan completed."
- if found_issues:
- msg += "\nPotential vulnerabilities detected."
-
- if summary:
- msg += "\n---" + summary
-
- if should_send:
- if len(msg) > 1900:
- msg = msg[:1900] + "\n...."
- notification_service.send_message(msg)
-
-
- def to_dict(self):
- return {
- "profile_name": self.profile_name,
- "nmap_input": self.nmap_input,
- "scheduler": self.scheduler,
- "is_active": self.is_active,
- "notify_enabled": getattr(self, 'notify_enabled', False),
- "notify_only_cve": getattr(self, 'notify_only_cve', False),
- "results_path": self.profile_results_path,
- }
-
- def get_logs(self):
- return self.tinydb.all()