File size: 8,853 Bytes
24ceec5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3e7994
24ceec5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3e7994
 
 
 
 
 
 
24ceec5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3e7994
 
 
 
 
 
 
 
24ceec5
 
 
e3e7994
 
 
 
 
 
 
 
24ceec5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3e7994
 
 
24ceec5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3e7994
 
 
 
24ceec5
 
 
 
e3e7994
 
 
24ceec5
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e3e7994
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
"""Policy engine β€” rule-based tool approval system.

Inspired by Gemini CLI's PolicyEngine with priority-sorted rules
and pattern matching. Also incorporates Claude Code's allow/ask/deny model.
"""

from __future__ import annotations

import re
from dataclasses import dataclass
from enum import Enum
from typing import Any


class ApprovalDecision(Enum):
    ALLOW = "allow"
    ASK = "ask"
    DENY = "deny"


@dataclass
class PolicyCheckResult:
    """Result of a policy check."""

    decision: ApprovalDecision
    reason: str = ""
    matched_rule: str | None = None


class PolicyEngine:
    """Rule-based policy engine with priority ordering.

    Rules are evaluated in priority order (highest first).
    First matching rule wins.
    Default: ASK for interactive, DENY for non-interactive.
    """

    # Built-in safety rules (always evaluated, highest priority)
    BUILTIN_RULES = [
        {
            "tool_name": "bash",
            "args_pattern": r"rm\s+-rf\s+(/|~|\*)",
            "approval": "deny",
            "priority": 1000,
            "description": "Block recursive delete of root/home",
        },
        {
            "tool_name": "bash",
            "args_pattern": r"(shutdown|reboot|halt|poweroff)",
            "approval": "deny",
            "priority": 1000,
            "description": "Block system shutdown commands",
        },
        {
            "tool_name": "bash",
            "args_pattern": r"(mkfs|fdisk|parted|dd\s+if=/dev/zero)",
            "approval": "deny",
            "priority": 1000,
            "description": "Block disk destruction commands",
        },
        {
            "tool_name": "bash",
            "args_pattern": r"(chmod\s+-R\s+777|chown\s+-R)",
            "approval": "deny",
            "priority": 900,
            "description": "Block dangerous permission changes",
        },
        {
            "tool_name": "write_file",
            "args_pattern": None,
            "approval": "allow",
            "priority": -100,
            "description": "Allow file writes in sandboxed workspace",
        },
        {
            "tool_name": "edit_file",
            "args_pattern": None,
            "approval": "allow",
            "priority": -100,
            "description": "Allow file edits in sandboxed workspace",
        },
        {
            "tool_name": "multi_edit",
            "args_pattern": None,
            "approval": "allow",
            "priority": -100,
            "description": "Allow multi-file edits in sandboxed workspace",
        },
        {
            "tool_name": "read_file",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow file reads",
        },
        {
            "tool_name": "glob",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow glob searches",
        },
        {
            "tool_name": "grep",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow grep searches",
        },
        {
            "tool_name": "list_dir",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow directory listing",
        },
        {
            "tool_name": "web_search",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow web search",
        },
        {
            "tool_name": "web_fetch",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow web fetch",
        },
        {
            "tool_name": "todo_read",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow todo reads",
        },
        {
            "tool_name": "todo_write",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow todo writes",
        },
        {
            "tool_name": "todo_update",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow todo updates",
        },
        {
            "tool_name": "snapshot_workspace",
            "args_pattern": None,
            "approval": "allow",
            "priority": -200,
            "description": "Allow workspace snapshot",
        },
    ]

    def __init__(self, interactive: bool = True):
        self._rules: list[dict] = list(self.BUILTIN_RULES)
        self._interactive = interactive
        self._approval_mode: str = "default"  # default | auto_edit | yolo

    @property
    def approval_mode(self) -> str:
        return self._approval_mode

    @approval_mode.setter
    def approval_mode(self, mode: str) -> None:
        if mode in ("default", "auto_edit", "yolo"):
            self._approval_mode = mode

    def add_rule(self, rule: dict) -> None:
        """Add a custom policy rule."""
        rule.setdefault("priority", 0)
        rule.setdefault("approval", "ask")
        self._rules.append(rule)
        self._rules.sort(key=lambda r: r["priority"], reverse=True)

    def remove_rule(self, description: str) -> bool:
        """Remove a rule by description."""
        for i, r in enumerate(self._rules):
            if r.get("description") == description:
                self._rules.pop(i)
                return True
        return False

    def set_rules(self, rules: list[dict]) -> None:
        """Replace all custom rules (keeps built-in rules)."""
        self._rules = list(self.BUILTIN_RULES)
        for r in rules:
            self.add_rule(r)

    def check(
        self, tool_name: str, args: dict[str, Any] | None = None
    ) -> PolicyCheckResult:
        """Check if a tool call is allowed.

        Returns the decision and reason from the highest-priority matching rule.
        """
        args_str = str(args) if args else ""

        for rule in self._rules:
            # Check tool name match (supports wildcards)
            if not self._match_tool_name(tool_name, rule["tool_name"]):
                continue

            # Check args pattern if specified
            if rule.get("args_pattern"):
                try:
                    if not re.search(rule["args_pattern"], args_str, re.IGNORECASE):
                        continue
                except re.error:
                    continue

            # Match found β€” return this rule's decision
            approval = rule["approval"]

            # YOLO mode: upgrade DENY to ASK (except built-in high-priority)
            if self._approval_mode == "yolo" and rule["priority"] < 900:
                if approval == "deny":
                    approval = "ask"

            # Auto-edit mode: auto-approve file writes/edits
            if (
                self._approval_mode == "auto_edit"
                and tool_name in ("write_file", "edit_file", "multi_edit")
            ):
                approval = "allow"

            return PolicyCheckResult(
                decision=ApprovalDecision(approval),
                reason=rule.get(
                    "description", f"Rule matched: {rule['tool_name']}"
                ),
                matched_rule=rule.get("description"),
            )

        # Default: ASK for interactive, DENY for non-interactive
        default = ApprovalDecision.ASK if self._interactive else ApprovalDecision.DENY
        return PolicyCheckResult(
            decision=default,
            reason=f"No matching rule for {tool_name} (default: {default.value})",
        )

    def _match_tool_name(self, name: str, pattern: str) -> bool:
        """Match tool name against pattern (supports * wildcards)."""
        if pattern == "*":
            return True
        if "*" in pattern:
            # Convert glob to regex
            regex = pattern.replace("*", ".*")
            return bool(re.match(f"^{regex}$", name))
        return name == pattern

    def get_rules(self) -> list[dict]:
        """Get all rules for display."""
        return list(self._rules)


# ── Global Engine ───────────────────────────────────────────────────────

_global_engine: PolicyEngine | None = None


def get_policy_engine() -> PolicyEngine:
    """Get or create the global policy engine."""
    global _global_engine
    if _global_engine is None:
        _global_engine = PolicyEngine()
    return _global_engine


__all__ = [
    "ApprovalDecision",
    "PolicyCheckResult",
    "PolicyEngine",
    "get_policy_engine",
]