Spaces:
Running
Running
| """Policy engine β rule-based tool approval system. | |
| Inspired by Gemini CLI's PolicyEngine with priority-sorted rules | |
| and pattern matching. Also incorporates Claude Code's allow/ask/deny model. | |
| """ | |
| from __future__ import annotations | |
| import re | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Any | |
| class ApprovalDecision(Enum): | |
| ALLOW = "allow" | |
| ASK = "ask" | |
| DENY = "deny" | |
| class PolicyCheckResult: | |
| """Result of a policy check.""" | |
| decision: ApprovalDecision | |
| reason: str = "" | |
| matched_rule: str | None = None | |
| class PolicyEngine: | |
| """Rule-based policy engine with priority ordering. | |
| Rules are evaluated in priority order (highest first). | |
| First matching rule wins. | |
| Default: ASK for interactive, DENY for non-interactive. | |
| """ | |
| # Built-in safety rules (always evaluated, highest priority) | |
| BUILTIN_RULES = [ | |
| { | |
| "tool_name": "bash", | |
| "args_pattern": r"rm\s+-rf\s+(/|~|\*)", | |
| "approval": "deny", | |
| "priority": 1000, | |
| "description": "Block recursive delete of root/home", | |
| }, | |
| { | |
| "tool_name": "bash", | |
| "args_pattern": r"(shutdown|reboot|halt|poweroff)", | |
| "approval": "deny", | |
| "priority": 1000, | |
| "description": "Block system shutdown commands", | |
| }, | |
| { | |
| "tool_name": "bash", | |
| "args_pattern": r"(mkfs|fdisk|parted|dd\s+if=/dev/zero)", | |
| "approval": "deny", | |
| "priority": 1000, | |
| "description": "Block disk destruction commands", | |
| }, | |
| { | |
| "tool_name": "bash", | |
| "args_pattern": r"(chmod\s+-R\s+777|chown\s+-R)", | |
| "approval": "deny", | |
| "priority": 900, | |
| "description": "Block dangerous permission changes", | |
| }, | |
| { | |
| "tool_name": "write_file", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -100, | |
| "description": "Allow file writes in sandboxed workspace", | |
| }, | |
| { | |
| "tool_name": "edit_file", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -100, | |
| "description": "Allow file edits in sandboxed workspace", | |
| }, | |
| { | |
| "tool_name": "multi_edit", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -100, | |
| "description": "Allow multi-file edits in sandboxed workspace", | |
| }, | |
| { | |
| "tool_name": "read_file", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow file reads", | |
| }, | |
| { | |
| "tool_name": "glob", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow glob searches", | |
| }, | |
| { | |
| "tool_name": "grep", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow grep searches", | |
| }, | |
| { | |
| "tool_name": "list_dir", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow directory listing", | |
| }, | |
| { | |
| "tool_name": "web_search", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow web search", | |
| }, | |
| { | |
| "tool_name": "web_fetch", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow web fetch", | |
| }, | |
| { | |
| "tool_name": "todo_read", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow todo reads", | |
| }, | |
| { | |
| "tool_name": "todo_write", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow todo writes", | |
| }, | |
| { | |
| "tool_name": "todo_update", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow todo updates", | |
| }, | |
| { | |
| "tool_name": "snapshot_workspace", | |
| "args_pattern": None, | |
| "approval": "allow", | |
| "priority": -200, | |
| "description": "Allow workspace snapshot", | |
| }, | |
| ] | |
| def __init__(self, interactive: bool = True): | |
| self._rules: list[dict] = list(self.BUILTIN_RULES) | |
| self._interactive = interactive | |
| self._approval_mode: str = "default" # default | auto_edit | yolo | |
| def approval_mode(self) -> str: | |
| return self._approval_mode | |
| def approval_mode(self, mode: str) -> None: | |
| if mode in ("default", "auto_edit", "yolo"): | |
| self._approval_mode = mode | |
| def add_rule(self, rule: dict) -> None: | |
| """Add a custom policy rule.""" | |
| rule.setdefault("priority", 0) | |
| rule.setdefault("approval", "ask") | |
| self._rules.append(rule) | |
| self._rules.sort(key=lambda r: r["priority"], reverse=True) | |
| def remove_rule(self, description: str) -> bool: | |
| """Remove a rule by description.""" | |
| for i, r in enumerate(self._rules): | |
| if r.get("description") == description: | |
| self._rules.pop(i) | |
| return True | |
| return False | |
| def set_rules(self, rules: list[dict]) -> None: | |
| """Replace all custom rules (keeps built-in rules).""" | |
| self._rules = list(self.BUILTIN_RULES) | |
| for r in rules: | |
| self.add_rule(r) | |
| def check( | |
| self, tool_name: str, args: dict[str, Any] | None = None | |
| ) -> PolicyCheckResult: | |
| """Check if a tool call is allowed. | |
| Returns the decision and reason from the highest-priority matching rule. | |
| """ | |
| args_str = str(args) if args else "" | |
| for rule in self._rules: | |
| # Check tool name match (supports wildcards) | |
| if not self._match_tool_name(tool_name, rule["tool_name"]): | |
| continue | |
| # Check args pattern if specified | |
| if rule.get("args_pattern"): | |
| try: | |
| if not re.search(rule["args_pattern"], args_str, re.IGNORECASE): | |
| continue | |
| except re.error: | |
| continue | |
| # Match found β return this rule's decision | |
| approval = rule["approval"] | |
| # YOLO mode: upgrade DENY to ASK (except built-in high-priority) | |
| if self._approval_mode == "yolo" and rule["priority"] < 900: | |
| if approval == "deny": | |
| approval = "ask" | |
| # Auto-edit mode: auto-approve file writes/edits | |
| if ( | |
| self._approval_mode == "auto_edit" | |
| and tool_name in ("write_file", "edit_file", "multi_edit") | |
| ): | |
| approval = "allow" | |
| return PolicyCheckResult( | |
| decision=ApprovalDecision(approval), | |
| reason=rule.get( | |
| "description", f"Rule matched: {rule['tool_name']}" | |
| ), | |
| matched_rule=rule.get("description"), | |
| ) | |
| # Default: ASK for interactive, DENY for non-interactive | |
| default = ApprovalDecision.ASK if self._interactive else ApprovalDecision.DENY | |
| return PolicyCheckResult( | |
| decision=default, | |
| reason=f"No matching rule for {tool_name} (default: {default.value})", | |
| ) | |
| def _match_tool_name(self, name: str, pattern: str) -> bool: | |
| """Match tool name against pattern (supports * wildcards).""" | |
| if pattern == "*": | |
| return True | |
| if "*" in pattern: | |
| # Convert glob to regex | |
| regex = pattern.replace("*", ".*") | |
| return bool(re.match(f"^{regex}$", name)) | |
| return name == pattern | |
| def get_rules(self) -> list[dict]: | |
| """Get all rules for display.""" | |
| return list(self._rules) | |
| # ββ Global Engine βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _global_engine: PolicyEngine | None = None | |
| def get_policy_engine() -> PolicyEngine: | |
| """Get or create the global policy engine.""" | |
| global _global_engine | |
| if _global_engine is None: | |
| _global_engine = PolicyEngine() | |
| return _global_engine | |
| __all__ = [ | |
| "ApprovalDecision", | |
| "PolicyCheckResult", | |
| "PolicyEngine", | |
| "get_policy_engine", | |
| ] | |