summaryrefslogtreecommitdiff
path: root/src/netmonitor/front/scanner_tab.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/netmonitor/front/scanner_tab.py')
-rwxr-xr-xsrc/netmonitor/front/scanner_tab.py277
1 files changed, 277 insertions, 0 deletions
diff --git a/src/netmonitor/front/scanner_tab.py b/src/netmonitor/front/scanner_tab.py
new file mode 100755
index 0000000..868fa74
--- /dev/null
+++ b/src/netmonitor/front/scanner_tab.py
@@ -0,0 +1,277 @@
+from textual import on
+from textual.app import ComposeResult
+from textual.widgets import Checkbox, Input, Select, Button, Label, SelectionList, Pretty
+from textual.widgets.selection_list import Selection
+from textual.containers import Container, Vertical, Horizontal, VerticalScroll
+from textual.events import Mount
+
+import psutil, ipaddress, shlex
+from typing import List, Dict, Optional
+
+from ..back.scanner_profiles_manager import ScannerProfilesManager
+from .scanner_tab_pushscreens import SaveProfilePushScreen
+
+class ScannerTab(Container):
+
+ def __init__(self, manager: ScannerProfilesManager):
+ super().__init__()
+ self.manager = manager
+ self.manager.on_message = self.on_manager_message
+
+ def compose(self) -> ComposeResult:
+ friendly_section = Container(id="friendly-command-section", classes="section card")
+ friendly_section.border_title = "Scanner Configuration"
+
+ with friendly_section:
+ yield Label("Interface:", classes="label")
+ try:
+ interfaces = list(psutil.net_if_addrs().keys())
+ except Exception:
+ interfaces = []
+
+ yield Select.from_values(
+ interfaces,
+ id="interface-select",
+ allow_blank=False,
+ classes="input"
+ )
+
+ yield Label("IP address:", classes="label")
+ yield Input(
+ value="127.0.0.1/32",
+ placeholder="192.168.0.0/24",
+ name="ip-input",
+ id="ip",
+ classes="input"
+ )
+
+ yield Label("Port range (or single port)", classes="label")
+ with Horizontal(id="port-range", classes="input-row"):
+ yield Input(placeholder="Start / Single", id="low-port-range", classes="input half")
+ yield Input(placeholder="End (Optional)", id="high-port-range", classes="input half")
+
+ yield Checkbox("Check for CVE's", id="cve-checkbox", classes="checkbox")
+
+ yield Label("Scan options:", classes="label")
+ with Horizontal(classes="selection-row"):
+ yield SelectionList[str](
+ Selection("Service version (-sV)", "-sV", False),
+ Selection("OS detection (-O)", "-O", False),
+ Selection("SYN scan (-sS)", "-sS", False),
+ Selection("UDP scan (-sU)", "-sU", False),
+ Selection("Fast scan (-F)", "-F", False),
+ Selection("Aggressive scan (-A)", "-A", False),
+ Selection("Ping skip (-Pn)", "-Pn", False),
+ Selection("Timing T4 (-T4)", "-T4", True),
+ classes="selection-list"
+ )
+
+ user_section = Container(id="user-command-section", classes="section card")
+ user_section.border_title = "Manual Command"
+
+ with user_section:
+ yield Input(
+ placeholder="sudo nmap -sS 192.168.1.1",
+ name="user-nmap-input",
+ id="usercommand",
+ classes="input full"
+ )
+
+
+ final_section = Container(id="final-section", classes="section card")
+ final_section.border_title = "Execution"
+
+ with final_section:
+ with Vertical(id="final-command", classes="command-output"):
+ self.final_command = Pretty([], id="final-command-output")
+ yield self.final_command
+ with Vertical(id="buttons", classes="buttons-column"):
+ yield Button("Scan now", id="scan-button", variant="success")
+ yield Button("Save profile", id="save-button", variant="primary")
+
+ results_section = Container(id="results-section", classes="section card")
+ results_section.border_title = "Scan Output"
+
+ with results_section:
+ with VerticalScroll(id="results-scroll"):
+ self.results = Pretty([], id="results", classes="results")
+ yield self.results
+
+ def validate_inputs(self, nmap_input: Dict[str, str]) -> List[str]:
+ errors: List[str] = []
+
+ if self.query_one("#cve-checkbox", Checkbox).value:
+ args = nmap_input.get("arguments", "")
+ if "-sV" not in args.split():
+ errors.append("To check for CVE's, '-sV' option is required")
+ return errors
+
+ targets = nmap_input.get("targets", "").strip()
+ if not targets:
+ errors.append("IP field cannot be empty")
+ return errors
+
+ if not self.query_one("#usercommand", Input).value.strip():
+ try:
+ ipaddress.ip_network(targets, strict=False)
+ except ValueError:
+ errors.append(f"Invalid IP address/CIDR: {targets}")
+ return errors
+
+ if "ports" in nmap_input:
+ port_str = nmap_input["ports"]
+ if "-" in port_str:
+ try:
+ low, high = map(int, port_str.split("-"))
+ if not (1 <= low <= 65535 and 1 <= high <= 65535):
+ errors.append("Ports must be in range 1-65535")
+ if low > high:
+ errors.append("Start port cannot be greater than end port")
+ except ValueError:
+ errors.append("Ports (range) must be numbers")
+ else:
+ try:
+ port = int(port_str)
+ if not (1 <= port <= 65535):
+ errors.append("Port must be in range 1-65535")
+ except ValueError:
+ errors.append("Port must be a number")
+
+ dry_run_err = self.dry_run_nmap_command(nmap_input)
+ if dry_run_err:
+ errors.append(dry_run_err)
+ return errors
+
+ def dry_run_nmap_command(self, nmap_input: Dict[str, str]) -> Optional[str]:
+ try:
+ cmd = ["nmap"]
+ if "arguments" in nmap_input:
+ cmd += shlex.split(nmap_input["arguments"])
+ if "ports" in nmap_input:
+ cmd += ["-p", nmap_input["ports"]]
+ cmd.append(nmap_input["targets"])
+ return None
+ except Exception as e:
+ return f"Błąd weryfikacji komendy: {e}"
+
+ def get_nmap_input(self, return_dict: bool = False) -> list[str] | dict:
+ user_command = self.query_one("#usercommand", Input).value.strip()
+ low_port = self.query_one("#low-port-range", Input).value.strip()
+ high_port = self.query_one("#high-port-range", Input).value.strip()
+ interface = self.query_one("#interface-select", Select).value
+ targets = self.query_one("#ip", Input).value.strip()
+ options = self.query_one(SelectionList).selected
+
+ if return_dict:
+ if user_command:
+ parts = shlex.split(user_command)
+
+ if parts and parts[0] == "sudo": parts.pop(0)
+ if parts and parts[0] == "nmap": parts.pop(0)
+
+ if not parts:
+ return {}
+
+ target = parts[-1]
+ args = parts[:-1]
+
+ return {
+ "targets": target,
+ "arguments": " ".join(args)
+ }
+ else:
+ nmap_dict = {"targets": targets}
+
+ if low_port:
+ if high_port:
+ nmap_dict["ports"] = f"{low_port}-{high_port}"
+ else:
+ nmap_dict["ports"] = f"{low_port}"
+
+ args: list[str] = []
+ if options:
+ args.extend(options)
+ if interface:
+ args.extend(["-e", str(interface)])
+ if args:
+ nmap_dict["arguments"] = " ".join(args)
+
+ return {k: v for k, v in nmap_dict.items() if v is not None}
+
+ if user_command:
+ return shlex.split(user_command)
+
+ parts = ["nmap"]
+ if interface:
+ parts.extend(["-e", str(interface)])
+ if options:
+ parts.extend(options)
+
+ if low_port:
+ if high_port:
+ parts.extend(["-p", f"{low_port}-{high_port}"])
+ else:
+ parts.extend(["-p", f"{low_port}"])
+
+ parts.append(targets)
+ return parts
+
+ @on(Mount)
+ @on(SelectionList.SelectedChanged)
+ @on(Input.Changed)
+ @on(Select.Changed)
+ def update_selected_view(self, event=None) -> None:
+ self.final_command.update(" ".join(self.get_nmap_input()))
+
+ @on(Button.Pressed, "#scan-button")
+ async def handle_scan_button(self, event: Button.Pressed) -> None:
+ nmap_input = self.get_nmap_input(return_dict=True)
+ if not isinstance(nmap_input, dict):
+ self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
+ return
+ errors = self.validate_inputs(nmap_input)
+ if errors:
+ self.notify("\n".join(errors), title="Bad Input", severity="error")
+ else:
+ try:
+ temp_prof_name = "temp_scan_profile"
+ self.notify("Start scanning", title="Notification", severity="information")
+
+ if not self.manager.add_profile(temp_prof_name, notify=False):
+ self.notify("Błąd: Nie udało się utworzyć profilu (problem z zapisem pliku?)", severity="error")
+ return
+
+ if not self.manager.update_profile(temp_prof_name, "nmap_input", nmap_input, notify=False):
+ self.notify("Błąd: Nie udało się zaktualizować parametrów skanowania", severity="error")
+ self.manager.delete_profile(temp_prof_name, notify=False)
+ return
+
+ profile = self.manager.get_profile(temp_prof_name)
+ if profile:
+ scan_now_results = profile.scan()
+ self.results.update(scan_now_results)
+ self.manager.delete_profile(temp_prof_name, notify=False)
+ self.notify("Scanning completed", title="Notification", severity="information")
+ else:
+ self.notify("Krytyczny błąd: Profil zniknął po utworzeniu", severity="error")
+
+ except Exception as e:
+ self.notify(f"Scanner error: {e}", title="Error", severity="error")
+
+ @on(Button.Pressed, "#save-button")
+ async def handle_save_button(self, event: Button.Pressed):
+ nmap_input = self.get_nmap_input(return_dict=True)
+ if not isinstance(nmap_input, dict):
+ self.notify("Błąd: dane wejściowe nie są słownikiem", severity="error")
+ return
+
+ errors = self.validate_inputs(nmap_input)
+ if errors:
+ self.notify("\n".join(errors), title="Bad Input", severity="error")
+ return
+
+ cve_check = self.query_one("#cve-checkbox", Checkbox).value
+ self.app.push_screen(SaveProfilePushScreen(self.manager, nmap_input, cve_check))
+
+ def on_manager_message(self, msg: str, title: str, severity):
+ self.app.notify(message=msg, title=title, severity=severity)