summaryrefslogtreecommitdiff
path: root/src/streamml/front/detector_tab.py
blob: f93a8cbbdb09f7dfb4f7214d2da330320b0a71db (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from textual import on
from textual.app import ComposeResult
from textual.widgets import Input, Select, Button, Label, Checkbox
from textual.containers import Container, Horizontal, VerticalScroll

import psutil

from ..back.detector_profiles_manager import DetectorProfilesManager
from ..back.window import FEATURE_LIST
from ..front.detector_tab_pushscreens import SaveProfilePushScreen

class DetectorTab(Container):
    def __init__(self, manager: DetectorProfilesManager, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.manager = manager
        self.manager.on_message = self.on_manager_message

    def compose(self) -> ComposeResult:
        with Horizontal(id="top-config-container"):
            
            model_section = Container(id="model-section", classes="section-card")
            model_section.border_title = "Algorithm: HalfSpaceTrees"
            with model_section:
                with VerticalScroll(classes="detector-scroll"):
                    yield Label("Select interface:", classes="label")
                    try:
                        ifaces = list(psutil.net_if_addrs().keys())
                    except:
                        ifaces = []
                    
                    yield Select.from_values(
                        ifaces,
                        id="interface-select",
                        allow_blank=False,
                        classes="input"
                    )
                    
                    yield Label("Model params:", classes="label")
                    yield Input(placeholder="Trees number (int, def: 10)", id="param-trees", classes="input")
                    yield Input(placeholder="Height (int, def: 8)", id="param-height", classes="input")
                    yield Input(placeholder="Window size (int, def: 250)", id="param-window", classes="input")
                    yield Input(placeholder="Seed (int, def: 42)", id="param-seed", classes="input")
                    yield Input(placeholder="Window duration (def: 10 sec )", id="param-window_duration", classes="input")
                    yield Input(placeholder="Threshold (0.0 - 1.0, def: 0.7)", id="param-threshold", classes="input")
                    yield Input(placeholder="Queue size (int, def: 10000)", id="param-queue_size", classes="input")

            features_section = Container(id="features-section", classes="section-card")
            features_section.border_title = "Flow-based Features"
            with features_section:
                with VerticalScroll(classes="detector-scroll"):
                    yield Label("Select features to include in model:", classes="label")
                    self.feature_checkboxes = {}
                    for feat in FEATURE_LIST:
                        cb = Checkbox(feat, value=True, classes="input")
                        self.feature_checkboxes[feat] = cb
                        yield cb

        bpf_section = Container(id="bpf-section", classes="section-card")
        bpf_section.border_title = "BPF Filter (Optional)"
        with bpf_section:
            yield Input(
                placeholder="BPF Filter (e.g. 'tcp port 80 or udp')", 
                id="param-bpf_filter", 
                classes="input full"
            )
        with Container(classes="save-button-container"):
            yield Button("Save Profile", id="save-button", variant="success")

    def get_inputs(self):
        features = [f for f, cb in self.feature_checkboxes.items() if cb.value]

        if not features:
            raise ValueError("Select at least one feature.")

        params = {}
        
        try:
            interface = self.query_one("#interface-select", Select).value
            if not interface:
                raise ValueError("Select interface.")
            params["interface"] = interface
        except Exception:
            raise ValueError("Interface selection error.")

        bpf_input = self.query_one("#param-bpf_filter", Input)
        if bpf_input.value.strip():
            params["bpf_filter"] = bpf_input.value.strip()

        defaults = {
            "trees": 10,
            "height": 8,
            "window": 250,
            "seed": 42,
            "threshold": 0.7,
            "window_duration": 10.0,
            "queue_size": 10000,
            "bpf_filter": ""
        }

        model_section = self.query_one("#model-section")
        for inp in model_section.query("Input"):
            if inp.id and inp.id.startswith("param-"):
                key = inp.id.removeprefix("param-")
                if key == "bpf_filter": continue

                val_str = inp.value.strip()
                
                if not val_str:
                    if key in defaults and defaults[key] is not None:
                        params[key] = defaults[key]
                    continue

                try:
                    if key in ["trees", "height", "window", "seed","queue_size"]:
                        params[key] = int(val_str)
                    elif key in ["threshold", "window_duration"]:
                        params[key] = float(val_str)
                except ValueError:
                    raise ValueError(f"Param '{key}' must be a number.")

        return {
            "features": features,
            "params": params,
        }

    @on(Button.Pressed, "#save-button")
    async def handle_save_button(self, event: Button.Pressed):
        try:
            input_data = self.get_inputs()
            self.app.push_screen(SaveProfilePushScreen(self.manager, input_data=input_data))
        except ValueError as e:
            self.app.notify(str(e), title="Validation error", severity="error")

    def on_manager_message(self, msg: str, title: str, severity):
        self.app.notify(message=msg, title=title, severity=severity)