From c2f5fbe7fb93ce420caf23c5c0e06144cf953bb8 Mon Sep 17 00:00:00 2001 From: EnricoGuccii Date: Sat, 10 Jan 2026 22:12:49 +0100 Subject: yy --- src/netmonitor/front/scanner_tab.py | 277 ++++++++++++++++++++++++++++++++++++ 1 file changed, 277 insertions(+) create mode 100755 src/netmonitor/front/scanner_tab.py (limited to 'src/netmonitor/front/scanner_tab.py') diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py new file mode 100755 index 0000000..868fa74 --- /dev/null +++ b/src/netmonitor/front/scanner_tab.py @@ -0,0 +1,277 @@ +from textual import on +from textual.app import ComposeResult +from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty +from textual.widgets.selection_list import Selection +from textual.containers import Container, Vertical, Horizontal, VerticalScroll +from textual.events import Mount + +import psutil, ipaddress, shlex +from typing import List, Dict, Optional + +from ..back.scanner_profiles_manager import ScannerProfilesManager +from .scanner_tab_pushscreens import SaveProfilePushScreen + +class ScannerTab(Container): + + def __init__(self, manager: ScannerProfilesManager): + super().__init__() + self.manager = manager + self.manager.on_message = self.on_manager_message + + def compose(self) -> ComposeResult: + friendly_section = Container(id="friendly-command-section", classes="section card") + friendly_section.border_title = "Scanner Configuration" + + with friendly_section: + yield Label("Interface:", classes="label") + try: + interfaces = list(psutil.net_if_addrs().keys()) + except Exception: + interfaces = [] + + yield Select.from_values( + interfaces, + id="interface-select", + allow_blank=False, + classes="input" + ) + + yield Label("IP address:", classes="label") + yield Input( + value="127.0.0.1/32", + placeholder="192.168.0.0/24", + name="ip-input", + id="ip", + classes="input" + ) + + yield Label("Port range (or single port)", classes="label") + with Horizontal(id="port-range", classes="input-row"): + yield Input(placeholder="Start / Single", id="low-port-range", classes="input half") + yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half") + + yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox") + + yield Label("Scan options:", classes="label") + with Horizontal(classes="selection-row"): + yield SelectionList[str]( + Selection("Service version (-sV)", "-sV", False), + Selection("OS detection (-O)", "-O", False), + Selection("SYN scan (-sS)", "-sS", False), + Selection("UDP scan (-sU)", "-sU", False), + Selection("Fast scan (-F)", "-F", False), + Selection("Aggressive scan (-A)", "-A", False), + Selection("Ping skip (-Pn)", "-Pn", False), + Selection("Timing T4 (-T4)", "-T4", True), + classes="selection-list" + ) + + user_section = Container(id="user-command-section", classes="section card") + user_section.border_title = "Manual Command" + + with user_section: + yield Input( + placeholder="sudo nmap -sS 192.168.1.1", + name="user-nmap-input", + id="usercommand", + classes="input full" + ) + + + final_section = Container(id="final-section", classes="section card") + final_section.border_title = "Execution" + + with final_section: + with Vertical(id="final-command", classes="command-output"): + self.final_command = Pretty([], id="final-command-output") + yield self.final_command + with Vertical(id="buttons", classes="buttons-column"): + yield Button("Scan now", id="scan-button", variant="success") + yield Button("Save profile", id="save-button", variant="primary") + + results_section = Container(id="results-section", classes="section card") + results_section.border_title = "Scan Output" + + with results_section: + with VerticalScroll(id="results-scroll"): + self.results = Pretty([], id="results", classes="results") + yield self.results + + def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]: + errors: List[str] = [] + + if self.query_one("#cve-checkbox", Checkbox).value: + args = nmap_input.get("arguments", "") + if "-sV" not in args.split(): + errors.append("To check for CVE's, '-sV' option is required") + return errors + + targets = nmap_input.get("targets", "").strip() + if not targets: + errors.append("IP field cannot be empty") + return errors + + if not self.query_one("#usercommand", Input).value.strip(): + try: + ipaddress.ip_network(targets, strict=False) + except ValueError: + errors.append(f"Invalid IP address/CIDR: {targets}") + return errors + + if "ports" in nmap_input: + port_str = nmap_input["ports"] + if "-" in port_str: + try: + low, high = map(int, port_str.split("-")) + if not (1 <= low <= 65535 and 1 <= high <= 65535): + errors.append("Ports must be in range 1-65535") + if low > high: + errors.append("Start port cannot be greater than end port") + except ValueError: + errors.append("Ports (range) must be numbers") + else: + try: + port = int(port_str) + if not (1 <= port <= 65535): + errors.append("Port must be in range 1-65535") + except ValueError: + errors.append("Port must be a number") + + dry_run_err = self.dry_run_nmap_command(nmap_input) + if dry_run_err: + errors.append(dry_run_err) + return errors + + def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]: + try: + cmd = ["nmap"] + if "arguments" in nmap_input: + cmd += shlex.split(nmap_input["arguments"]) + if "ports" in nmap_input: + cmd += ["-p", nmap_input["ports"]] + cmd.append(nmap_input["targets"]) + return None + except Exception as e: + return f"Błąd weryfikacji komendy: {e}" + + def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict: + user_command = self.query_one("#usercommand", Input).value.strip() + low_port = self.query_one("#low-port-range", Input).value.strip() + high_port = self.query_one("#high-port-range", Input).value.strip() + interface = self.query_one("#interface-select", Select).value + targets = self.query_one("#ip", Input).value.strip() + options = self.query_one(SelectionList).selected + + if return_dict: + if user_command: + parts = shlex.split(user_command) + + if parts and parts[0] == "sudo": parts.pop(0) + if parts and parts[0] == "nmap": parts.pop(0) + + if not parts: + return {} + + target = parts[-1] + args = parts[:-1] + + return { + "targets": target, + "arguments": " ".join(args) + } + else: + nmap_dict = {"targets": targets} + + if low_port: + if high_port: + nmap_dict["ports"] = f"{low_port}-{high_port}" + else: + nmap_dict["ports"] = f"{low_port}" + + args: list[str] = [] + if options: + args.extend(options) + if interface: + args.extend(["-e", str(interface)]) + if args: + nmap_dict["arguments"] = " ".join(args) + + return {k: v for k, v in nmap_dict.items() if v is not None} + + if user_command: + return shlex.split(user_command) + + parts = ["nmap"] + if interface: + parts.extend(["-e", str(interface)]) + if options: + parts.extend(options) + + if low_port: + if high_port: + parts.extend(["-p", f"{low_port}-{high_port}"]) + else: + parts.extend(["-p", f"{low_port}"]) + + parts.append(targets) + return parts + + @on(Mount) + @on(SelectionList.SelectedChanged) + @on(Input.Changed) + @on(Select.Changed) + def update_selected_view(self, event=None) -> None: + self.final_command.update(" ".join(self.get_nmap_input())) + + @on(Button.Pressed, "#scan-button") + async def handle_scan_button(self, event: Button.Pressed) -> None: + nmap_input = self.get_nmap_input(return_dict=True) + if not isinstance(nmap_input, dict): + self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error") + return + errors = self.validate_inputs(nmap_input) + if errors: + self.notify("\n".join(errors), title="Bad Input", severity="error") + else: + try: + temp_prof_name = "temp_scan_profile" + self.notify("Start scanning", title="Notification", severity="information") + + if not self.manager.add_profile(temp_prof_name, notify=False): + self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error") + return + + if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False): + self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error") + self.manager.delete_profile(temp_prof_name, notify=False) + return + + profile = self.manager.get_profile(temp_prof_name) + if profile: + scan_now_results = profile.scan() + self.results.update(scan_now_results) + self.manager.delete_profile(temp_prof_name, notify=False) + self.notify("Scanning completed", title="Notification", severity="information") + else: + self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error") + + except Exception as e: + self.notify(f"Scanner error: {e}", title="Error", severity="error") + + @on(Button.Pressed, "#save-button") + async def handle_save_button(self, event: Button.Pressed): + nmap_input = self.get_nmap_input(return_dict=True) + if not isinstance(nmap_input, dict): + self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error") + return + + errors = self.validate_inputs(nmap_input) + if errors: + self.notify("\n".join(errors), title="Bad Input", severity="error") + return + + cve_check = self.query_one("#cve-checkbox", Checkbox).value + self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check)) + + def on_manager_message(self, msg: str, title: str, severity): + self.app.notify(message=msg, title=title, severity=severity) -- cgit v1.2.3