Spaces:
Build error
Build error
| from typing import Optional, Set, Union, List, Dict, Any | |
| from open_webui.models.users import Users, UserModel | |
| from open_webui.models.groups import Groups | |
| from open_webui.config import DEFAULT_USER_PERMISSIONS | |
| import json | |
| def fill_missing_permissions( | |
| permissions: Dict[str, Any], default_permissions: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Recursively fills in missing properties in the permissions dictionary | |
| using the default permissions as a template. | |
| """ | |
| for key, value in default_permissions.items(): | |
| if key not in permissions: | |
| permissions[key] = value | |
| elif isinstance(value, dict) and isinstance( | |
| permissions[key], dict | |
| ): # Both are nested dictionaries | |
| permissions[key] = fill_missing_permissions(permissions[key], value) | |
| return permissions | |
| def get_permissions( | |
| user_id: str, | |
| default_permissions: Dict[str, Any], | |
| db: Optional[Any] = None, | |
| ) -> Dict[str, Any]: | |
| """ | |
| Get all permissions for a user by combining the permissions of all groups the user is a member of. | |
| If a permission is defined in multiple groups, the most permissive value is used (True > False). | |
| Permissions are nested in a dict with the permission key as the key and a boolean as the value. | |
| """ | |
| def combine_permissions( | |
| permissions: Dict[str, Any], group_permissions: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """Combine permissions from multiple groups by taking the most permissive value.""" | |
| for key, value in group_permissions.items(): | |
| if isinstance(value, dict): | |
| if key not in permissions: | |
| permissions[key] = {} | |
| permissions[key] = combine_permissions(permissions[key], value) | |
| else: | |
| if key not in permissions: | |
| permissions[key] = value | |
| else: | |
| permissions[key] = ( | |
| permissions[key] or value | |
| ) # Use the most permissive value (True > False) | |
| return permissions | |
| user_groups = Groups.get_groups_by_member_id(user_id, db=db) | |
| # Deep copy default permissions to avoid modifying the original dict | |
| permissions = json.loads(json.dumps(default_permissions)) | |
| # Combine permissions from all user groups | |
| for group in user_groups: | |
| permissions = combine_permissions(permissions, group.permissions or {}) | |
| # Ensure all fields from default_permissions are present and filled in | |
| permissions = fill_missing_permissions(permissions, default_permissions) | |
| return permissions | |
| def has_permission( | |
| user_id: str, | |
| permission_key: str, | |
| default_permissions: Dict[str, Any] = {}, | |
| db: Optional[Any] = None, | |
| ) -> bool: | |
| """ | |
| Check if a user has a specific permission by checking the group permissions | |
| and fall back to default permissions if not found in any group. | |
| Permission keys can be hierarchical and separated by dots ('.'). | |
| """ | |
| def get_permission(permissions: Dict[str, Any], keys: List[str]) -> bool: | |
| """Traverse permissions dict using a list of keys (from dot-split permission_key).""" | |
| for key in keys: | |
| if key not in permissions: | |
| return False # If any part of the hierarchy is missing, deny access | |
| permissions = permissions[key] # Traverse one level deeper | |
| return bool(permissions) # Return the boolean at the final level | |
| permission_hierarchy = permission_key.split(".") | |
| # Retrieve user group permissions | |
| user_groups = Groups.get_groups_by_member_id(user_id, db=db) | |
| for group in user_groups: | |
| if get_permission(group.permissions or {}, permission_hierarchy): | |
| return True | |
| # Check default permissions afterward if the group permissions don't allow it | |
| default_permissions = fill_missing_permissions( | |
| default_permissions, DEFAULT_USER_PERMISSIONS | |
| ) | |
| return get_permission(default_permissions, permission_hierarchy) | |
| def has_access( | |
| user_id: str, | |
| permission: str = "read", | |
| access_grants: Optional[list] = None, | |
| user_group_ids: Optional[Set[str]] = None, | |
| db: Optional[Any] = None, | |
| ) -> bool: | |
| """ | |
| Check if a user has the specified permission using an in-memory access_grants list. | |
| Used for config-driven resources (arena models, tool servers) that store | |
| access control as JSON in PersistentConfig rather than in the access_grant DB table. | |
| Semantics: | |
| - None or [] → private (owner-only, deny all) | |
| - [{"principal_type": "user", "principal_id": "*", "permission": "read"}] → public read | |
| - Specific grants → check user/group membership | |
| """ | |
| if not access_grants: | |
| return False | |
| if user_group_ids is None: | |
| user_groups = Groups.get_groups_by_member_id(user_id, db=db) | |
| user_group_ids = {group.id for group in user_groups} | |
| for grant in access_grants: | |
| if not isinstance(grant, dict): | |
| continue | |
| if grant.get("permission") != permission: | |
| continue | |
| principal_type = grant.get("principal_type") | |
| principal_id = grant.get("principal_id") | |
| if principal_type == "user" and ( | |
| principal_id == "*" or principal_id == user_id | |
| ): | |
| return True | |
| if ( | |
| principal_type == "group" | |
| and user_group_ids | |
| and principal_id in user_group_ids | |
| ): | |
| return True | |
| return False | |
| def migrate_access_control( | |
| data: dict, ac_key: str = "access_control", grants_key: str = "access_grants" | |
| ) -> None: | |
| """ | |
| Auto-migrate a config dict in-place from legacy access_control dict to access_grants list. | |
| If `grants_key` already exists, does nothing. | |
| If `ac_key` exists (old format), converts it and stores as `grants_key`, then removes `ac_key`. | |
| """ | |
| if grants_key in data: | |
| return | |
| access_control = data.get(ac_key) | |
| if access_control is None and ac_key not in data: | |
| return | |
| grants: List[Dict[str, str]] = [] | |
| if access_control and isinstance(access_control, dict): | |
| for perm in ["read", "write"]: | |
| perm_data = access_control.get(perm, {}) | |
| if not perm_data: | |
| continue | |
| for group_id in perm_data.get("group_ids", []): | |
| grants.append( | |
| { | |
| "principal_type": "group", | |
| "principal_id": group_id, | |
| "permission": perm, | |
| } | |
| ) | |
| for uid in perm_data.get("user_ids", []): | |
| grants.append( | |
| { | |
| "principal_type": "user", | |
| "principal_id": uid, | |
| "permission": perm, | |
| } | |
| ) | |
| data[grants_key] = grants | |
| data.pop(ac_key, None) | |