summaryrefslogtreecommitdiff
path: root/src/netmonitor/back/scanner_profiles_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/netmonitor/back/scanner_profiles_manager.py')
-rw-r--r--src/netmonitor/back/scanner_profiles_manager.py177
1 files changed, 0 insertions, 177 deletions
diff --git a/src/netmonitor/back/scanner_profiles_manager.py b/src/netmonitor/back/scanner_profiles_manager.py
deleted file mode 100644
index 7748835..0000000
--- a/src/netmonitor/back/scanner_profiles_manager.py
+++ /dev/null
@@ -1,177 +0,0 @@
-import re
-from pathlib import Path
-from typing import Callable, Literal
-from apscheduler.schedulers.background import BackgroundScheduler
-from apscheduler.triggers.cron import CronTrigger
-import pickle
-
-from .scanner_profile import ScannerProfile
-
-SeverityLevel = Literal["information", "warning", "error"]
-VALID_NAME_REGEX = r"^[a-zA-Z0-9_-]+$"
-
-class ScannerProfilesManager:
- def __init__(self, profiles_file: str, scheduler: BackgroundScheduler):
- self.profiles_file = Path(profiles_file)
- self.scheduler = scheduler
- self.profiles: list[ScannerProfile] = []
-
- self.on_message: Callable[[str, str, str], None] | None = None
- self.on_refresh: Callable[[], None] | None = None
-
- self.load_profiles()
-
- def _notify(self, msg: str, title: str = "Profile Manager", level: SeverityLevel = "information"):
- if self.on_message:
- self.on_message(msg, title, level)
-
- def _refresh_front(self):
- if self.on_refresh:
- self.on_refresh()
-
- def _fail(self, msg: str, level: SeverityLevel = "error", notify: bool = True) -> bool:
- if notify:
- self._notify(msg, level=level)
- return False
-
- def _ok(self, msg: str | None = None, level: SeverityLevel = "information", notify: bool = True) -> bool:
- if msg and notify:
- self._notify(msg, level=level)
- return True
-
- def try_save_profiles(self, notify: bool = True) -> bool:
- try:
- self.profiles_file.parent.mkdir(parents=True, exist_ok=True)
- with open(self.profiles_file, "wb") as f:
- pickle.dump(self.profiles, f)
- return self._ok("Profiles saved successfully.", notify=notify)
- except Exception as e:
- return self._fail(f"Error saving profiles: {e}", notify=notify)
-
- def load_profiles(self, notify: bool = True) -> bool:
- if not self.profiles_file.exists():
- if notify:
- self._notify("Profiles file not found. Creating a new one.", level="warning")
- if not self.try_save_profiles(notify=False):
- return self._fail("Failed to create profiles file!", notify=notify)
- return self._ok("New profiles file created.", notify=notify)
-
- try:
- with open(self.profiles_file, "rb") as f:
- self.profiles = pickle.load(f)
- self._refresh_front()
- return self._ok(f"Loaded {len(self.profiles)} profiles.", notify=notify)
- except Exception as e:
- return self._fail(f"Error loading profiles: {e}", notify=notify)
-
-
- def add_profile(self, profile_name: str, notify: bool = True) -> bool:
- if not profile_name:
- return self._fail("Name can't be blank.", "error", notify)
-
- if not re.match(VALID_NAME_REGEX, profile_name):
- return self._fail(f"Bad input '{profile_name}'. Use only letters, numbers, '_' and '-'.", "error", notify)
-
- if any(p.profile_name == profile_name for p in self.profiles):
- return self._fail(f"Profile {profile_name} already exists.", "warning", notify)
-
- new_profile = ScannerProfile(profile_name=profile_name)
- self.profiles.append(new_profile)
-
- if not self.try_save_profiles(notify=False):
- self.profiles.remove(new_profile)
- return self._fail(f"Failed to save profile {profile_name}.", notify=notify)
-
- self._refresh_front()
- return self._ok(f"Added profile {profile_name}.", notify=notify)
-
- def delete_profile(self, profile_name: str, notify: bool = True) -> bool:
- before = len(self.profiles)
- self.profiles = [p for p in self.profiles if p.profile_name != profile_name]
-
- if len(self.profiles) == before:
- return self._fail(f"Profile {profile_name} not found.", "warning", notify)
-
- if not self.try_save_profiles(notify=False):
- return self._fail(f"Failed to save changes after deleting {profile_name}.", notify=notify)
-
- self._refresh_front()
- return self._ok(f"Deleted profile {profile_name}.", notify=notify)
-
- def get_profile(self, profile_name: str) -> ScannerProfile | None:
- return next((p for p in self.profiles if p.profile_name == profile_name), None)
-
- def update_profile(self, profile_name: str, field: str, value, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} does not exist.", notify=notify)
-
- setattr(p, field, value)
- if self.try_save_profiles(notify=False):
- return self._ok(f"Updated profile {profile_name}.", notify=notify)
- else:
- return self._fail(f"Failed to save updated profile {profile_name}.", notify=notify)
-
-
- def run_profile_once(self, profile_name: str, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
-
- try:
- p.scan()
- return self._ok(f"Scan completed for {profile_name}.", notify=notify)
- except Exception as e:
- return self._fail(f"Scan error: {e}", notify=notify)
-
- def turn_on_profile(self, profile_name: str, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
-
- try:
- job_id = f"profile_{p.profile_name}"
- self.scheduler.add_job(
- lambda: self.run_profile_once(p.profile_name, notify=False),
- trigger=CronTrigger.from_crontab(p.scheduler),
- id=job_id,
- replace_existing=True
- )
- p.is_active = True
- return self._ok(f"Profile {profile_name} activated.", notify=notify)
- except Exception as e:
- return self._fail(f"Error activating profile: {e}", notify=notify)
-
- def turn_off_profile(self, profile_name: str, notify: bool = True) -> bool:
- job_id = f"profile_{profile_name}"
- try:
- self.scheduler.remove_job(job_id)
- p = self.get_profile(profile_name)
- if p:
- p.is_active = False
- return self._ok(f"Profile {profile_name} deactivated.", notify=notify)
- except Exception as e:
- return self._fail(f"Error deactivating profile: {e}", notify=notify)
-
- def set_validated_scheduler(self, profile_name: str, cron_str: str, notify: bool = True) -> bool:
- p = self.get_profile(profile_name)
- if not p:
- return self._fail(f"Profile {profile_name} not found.", notify=notify)
-
- try:
- CronTrigger.from_crontab(cron_str)
- p.scheduler = cron_str
- if not self.try_save_profiles(notify=False):
- return self._fail(f"Failed to save scheduler for {profile_name}.", notify=notify)
- return self._ok(f"Scheduler updated for {profile_name}.", notify=notify)
- except ValueError as ve:
- return self._fail(f"Invalid CRON: {ve}", notify=notify)
- except Exception as e:
- return self._fail(f"Unexpected error: {e}", notify=notify)
-
- def get_profile_logs(self, profile_name: str, notify: bool = True):
- p = self.get_profile(profile_name)
- if not p:
- self._fail(f"Profile {profile_name} not found.", "error", notify)
- return None
- return p.get_logs()