Spaces:
Build error
Build error
| import logging | |
| import time | |
| import uuid | |
| from typing import Optional | |
| from sqlalchemy.orm import Session | |
| from open_webui.internal.db import Base, get_db_context | |
| from pydantic import BaseModel, ConfigDict | |
| from sqlalchemy import BigInteger, Column, Text, UniqueConstraint, or_, and_ | |
| from sqlalchemy.dialects.postgresql import JSONB | |
| log = logging.getLogger(__name__) | |
| #################### | |
| # AccessGrant DB Schema | |
| #################### | |
| class AccessGrant(Base): | |
| __tablename__ = "access_grant" | |
| id = Column(Text, primary_key=True) | |
| resource_type = Column( | |
| Text, nullable=False | |
| ) # "knowledge", "model", "prompt", "tool", "note", "channel", "file" | |
| resource_id = Column(Text, nullable=False) | |
| principal_type = Column(Text, nullable=False) # "user" or "group" | |
| principal_id = Column( | |
| Text, nullable=False | |
| ) # user_id, group_id, or "*" (wildcard for public) | |
| permission = Column(Text, nullable=False) # "read" or "write" | |
| created_at = Column(BigInteger, nullable=False) | |
| __table_args__ = ( | |
| UniqueConstraint( | |
| "resource_type", | |
| "resource_id", | |
| "principal_type", | |
| "principal_id", | |
| "permission", | |
| name="uq_access_grant_grant", | |
| ), | |
| ) | |
| class AccessGrantModel(BaseModel): | |
| model_config = ConfigDict(from_attributes=True) | |
| id: str | |
| resource_type: str | |
| resource_id: str | |
| principal_type: str | |
| principal_id: str | |
| permission: str | |
| created_at: int | |
| class AccessGrantResponse(BaseModel): | |
| """Slim grant model for API responses — resource context is implicit from the parent.""" | |
| id: str | |
| principal_type: str | |
| principal_id: str | |
| permission: str | |
| def from_grant(cls, grant: "AccessGrantModel") -> "AccessGrantResponse": | |
| return cls( | |
| id=grant.id, | |
| principal_type=grant.principal_type, | |
| principal_id=grant.principal_id, | |
| permission=grant.permission, | |
| ) | |
| #################### | |
| # Conversion utilities | |
| #################### | |
| def access_control_to_grants( | |
| resource_type: str, | |
| resource_id: str, | |
| access_control: Optional[dict], | |
| ) -> list[dict]: | |
| """ | |
| Convert an old-style access_control JSON dict to a flat list of grant dicts. | |
| Semantics: | |
| - None → public read (user:* read) — except files which are private | |
| - {} → private/owner-only (no grants) | |
| - {read: {group_ids, user_ids}, write: {group_ids, user_ids}} → specific grants | |
| Returns a list of dicts with keys: resource_type, resource_id, principal_type, principal_id, permission | |
| """ | |
| grants = [] | |
| if access_control is None: | |
| # NULL → public read (user:* for read) | |
| # Exception: files with NULL are private (owner-only), no grants needed | |
| if resource_type != "file": | |
| grants.append( | |
| { | |
| "resource_type": resource_type, | |
| "resource_id": resource_id, | |
| "principal_type": "user", | |
| "principal_id": "*", | |
| "permission": "read", | |
| } | |
| ) | |
| return grants | |
| # {} → private/owner-only, no grants | |
| if not access_control: | |
| return grants | |
| # Parse structured permissions | |
| for permission in ["read", "write"]: | |
| perm_data = access_control.get(permission, {}) | |
| if not perm_data: | |
| continue | |
| for group_id in perm_data.get("group_ids", []): | |
| grants.append( | |
| { | |
| "resource_type": resource_type, | |
| "resource_id": resource_id, | |
| "principal_type": "group", | |
| "principal_id": group_id, | |
| "permission": permission, | |
| } | |
| ) | |
| for user_id in perm_data.get("user_ids", []): | |
| grants.append( | |
| { | |
| "resource_type": resource_type, | |
| "resource_id": resource_id, | |
| "principal_type": "user", | |
| "principal_id": user_id, | |
| "permission": permission, | |
| } | |
| ) | |
| return grants | |
| def normalize_access_grants(access_grants: Optional[list]) -> list[dict]: | |
| """ | |
| Normalize direct access_grants payloads from API forms. | |
| Keeps only valid grants and removes duplicates by | |
| (principal_type, principal_id, permission). | |
| """ | |
| if not access_grants: | |
| return [] | |
| deduped = {} | |
| for grant in access_grants: | |
| if isinstance(grant, BaseModel): | |
| grant = grant.model_dump() | |
| if not isinstance(grant, dict): | |
| continue | |
| principal_type = grant.get("principal_type") | |
| principal_id = grant.get("principal_id") | |
| permission = grant.get("permission") | |
| if principal_type not in ("user", "group"): | |
| continue | |
| if permission not in ("read", "write"): | |
| continue | |
| if not isinstance(principal_id, str) or not principal_id: | |
| continue | |
| key = (principal_type, principal_id, permission) | |
| deduped[key] = { | |
| "id": ( | |
| grant.get("id") | |
| if isinstance(grant.get("id"), str) and grant.get("id") | |
| else str(uuid.uuid4()) | |
| ), | |
| "principal_type": principal_type, | |
| "principal_id": principal_id, | |
| "permission": permission, | |
| } | |
| return list(deduped.values()) | |
| def has_public_read_access_grant(access_grants: Optional[list]) -> bool: | |
| """ | |
| Returns True when a direct grant list includes wildcard public-read. | |
| """ | |
| for grant in normalize_access_grants(access_grants): | |
| if ( | |
| grant["principal_type"] == "user" | |
| and grant["principal_id"] == "*" | |
| and grant["permission"] == "read" | |
| ): | |
| return True | |
| return False | |
| def grants_to_access_control(grants: list) -> Optional[dict]: | |
| """ | |
| Convert a list of grant objects (AccessGrantModel or AccessGrantResponse) | |
| back to the old-style access_control JSON dict for backward compatibility. | |
| Semantics: | |
| - [] (empty) → {} (private/owner-only) | |
| - Contains user:*:read → None (public), but write grants are preserved | |
| - Otherwise → {read: {group_ids, user_ids}, write: {group_ids, user_ids}} | |
| Note: "public" (user:*:read) still allows additional write permissions | |
| to coexist. When the wildcard read is present the function returns None | |
| for the legacy dict, so callers that need write info should inspect the | |
| grants list directly. | |
| """ | |
| if not grants: | |
| return {} # No grants = private/owner-only | |
| result = { | |
| "read": {"group_ids": [], "user_ids": []}, | |
| "write": {"group_ids": [], "user_ids": []}, | |
| } | |
| is_public = False | |
| for grant in grants: | |
| if ( | |
| grant.principal_type == "user" | |
| and grant.principal_id == "*" | |
| and grant.permission == "read" | |
| ): | |
| is_public = True | |
| continue # Don't add wildcard to user_ids list | |
| if grant.permission not in ("read", "write"): | |
| continue | |
| if grant.principal_type == "group": | |
| if grant.principal_id not in result[grant.permission]["group_ids"]: | |
| result[grant.permission]["group_ids"].append(grant.principal_id) | |
| elif grant.principal_type == "user": | |
| if grant.principal_id not in result[grant.permission]["user_ids"]: | |
| result[grant.permission]["user_ids"].append(grant.principal_id) | |
| if is_public: | |
| return None # Public read access | |
| return result | |
| #################### | |
| # Table Operations | |
| #################### | |
| class AccessGrantsTable: | |
| def grant_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| principal_type: str, | |
| principal_id: str, | |
| permission: str, | |
| db: Optional[Session] = None, | |
| ) -> Optional[AccessGrantModel]: | |
| """Add a single access grant. Idempotent (ignores duplicates).""" | |
| with get_db_context(db) as db: | |
| # Check for existing grant | |
| existing = ( | |
| db.query(AccessGrant) | |
| .filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=principal_type, | |
| principal_id=principal_id, | |
| permission=permission, | |
| ) | |
| .first() | |
| ) | |
| if existing: | |
| return AccessGrantModel.model_validate(existing) | |
| grant = AccessGrant( | |
| id=str(uuid.uuid4()), | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=principal_type, | |
| principal_id=principal_id, | |
| permission=permission, | |
| created_at=int(time.time()), | |
| ) | |
| db.add(grant) | |
| db.commit() | |
| db.refresh(grant) | |
| return AccessGrantModel.model_validate(grant) | |
| def revoke_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| principal_type: str, | |
| principal_id: str, | |
| permission: str, | |
| db: Optional[Session] = None, | |
| ) -> bool: | |
| """Remove a single access grant.""" | |
| with get_db_context(db) as db: | |
| deleted = ( | |
| db.query(AccessGrant) | |
| .filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=principal_type, | |
| principal_id=principal_id, | |
| permission=permission, | |
| ) | |
| .delete() | |
| ) | |
| db.commit() | |
| return deleted > 0 | |
| def revoke_all_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| db: Optional[Session] = None, | |
| ) -> int: | |
| """Remove all access grants for a resource.""" | |
| with get_db_context(db) as db: | |
| deleted = ( | |
| db.query(AccessGrant) | |
| .filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| .delete() | |
| ) | |
| db.commit() | |
| return deleted | |
| def set_access_control( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| access_control: Optional[dict], | |
| db: Optional[Session] = None, | |
| ) -> list[AccessGrantModel]: | |
| """ | |
| Replace all grants for a resource from an access_control JSON dict. | |
| This is the primary bridge for backward compat with the frontend. | |
| """ | |
| with get_db_context(db) as db: | |
| # Delete all existing grants for this resource | |
| db.query(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ).delete() | |
| # Convert JSON to grant dicts | |
| grant_dicts = access_control_to_grants( | |
| resource_type, resource_id, access_control | |
| ) | |
| # Insert new grants | |
| results = [] | |
| for grant_dict in grant_dicts: | |
| grant = AccessGrant( | |
| id=str(uuid.uuid4()), | |
| **grant_dict, | |
| created_at=int(time.time()), | |
| ) | |
| db.add(grant) | |
| results.append(grant) | |
| db.commit() | |
| return [AccessGrantModel.model_validate(g) for g in results] | |
| def set_access_grants( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| access_grants: Optional[list], | |
| db: Optional[Session] = None, | |
| ) -> list[AccessGrantModel]: | |
| """ | |
| Replace all grants for a resource from a direct access_grants list. | |
| """ | |
| with get_db_context(db) as db: | |
| db.query(AccessGrant).filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ).delete() | |
| normalized_grants = normalize_access_grants(access_grants) | |
| results = [] | |
| for grant_dict in normalized_grants: | |
| grant = AccessGrant( | |
| id=grant_dict["id"], | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| principal_type=grant_dict["principal_type"], | |
| principal_id=grant_dict["principal_id"], | |
| permission=grant_dict["permission"], | |
| created_at=int(time.time()), | |
| ) | |
| db.add(grant) | |
| results.append(grant) | |
| db.commit() | |
| return [AccessGrantModel.model_validate(g) for g in results] | |
| def get_access_control( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| db: Optional[Session] = None, | |
| ) -> Optional[dict]: | |
| """ | |
| Reconstruct the old-style access_control JSON dict from grants. | |
| For backward compat with the frontend. | |
| """ | |
| with get_db_context(db) as db: | |
| grants = ( | |
| db.query(AccessGrant) | |
| .filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| .all() | |
| ) | |
| grant_models = [AccessGrantModel.model_validate(g) for g in grants] | |
| return grants_to_access_control(grant_models) | |
| def get_grants_by_resource( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| db: Optional[Session] = None, | |
| ) -> list[AccessGrantModel]: | |
| """Get all grants for a specific resource.""" | |
| with get_db_context(db) as db: | |
| grants = ( | |
| db.query(AccessGrant) | |
| .filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| ) | |
| .all() | |
| ) | |
| return [AccessGrantModel.model_validate(g) for g in grants] | |
| def has_access( | |
| self, | |
| user_id: str, | |
| resource_type: str, | |
| resource_id: str, | |
| permission: str = "read", | |
| user_group_ids: Optional[set[str]] = None, | |
| db: Optional[Session] = None, | |
| ) -> bool: | |
| """ | |
| Check if a user has the specified permission on a resource. | |
| Access is granted if any of the following is true: | |
| - There's a grant for user:* (public) with the requested permission | |
| - There's a grant for the specific user with the requested permission | |
| - There's a grant for any of the user's groups with the requested permission | |
| """ | |
| with get_db_context(db) as db: | |
| # Build conditions for matching grants | |
| conditions = [ | |
| # Public access | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == "*", | |
| ), | |
| # Direct user access | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == user_id, | |
| ), | |
| ] | |
| # Group access | |
| if user_group_ids is None: | |
| from open_webui.models.groups import Groups | |
| user_groups = Groups.get_groups_by_member_id(user_id, db=db) | |
| user_group_ids = {group.id for group in user_groups} | |
| if user_group_ids: | |
| conditions.append( | |
| and_( | |
| AccessGrant.principal_type == "group", | |
| AccessGrant.principal_id.in_(user_group_ids), | |
| ) | |
| ) | |
| exists = ( | |
| db.query(AccessGrant) | |
| .filter( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == resource_id, | |
| AccessGrant.permission == permission, | |
| or_(*conditions), | |
| ) | |
| .first() | |
| ) | |
| return exists is not None | |
| def get_users_with_access( | |
| self, | |
| resource_type: str, | |
| resource_id: str, | |
| permission: str = "read", | |
| db: Optional[Session] = None, | |
| ) -> list: | |
| """ | |
| Get all users who have the specified permission on a resource. | |
| Returns a list of UserModel instances. | |
| """ | |
| from open_webui.models.users import Users, UserModel | |
| from open_webui.models.groups import Groups | |
| with get_db_context(db) as db: | |
| grants = ( | |
| db.query(AccessGrant) | |
| .filter_by( | |
| resource_type=resource_type, | |
| resource_id=resource_id, | |
| permission=permission, | |
| ) | |
| .all() | |
| ) | |
| # Check for public access | |
| for grant in grants: | |
| if grant.principal_type == "user" and grant.principal_id == "*": | |
| result = Users.get_users(filter={"roles": ["!pending"]}, db=db) | |
| return result.get("users", []) | |
| user_ids_with_access = set() | |
| for grant in grants: | |
| if grant.principal_type == "user": | |
| user_ids_with_access.add(grant.principal_id) | |
| elif grant.principal_type == "group": | |
| group_user_ids = Groups.get_group_user_ids_by_id( | |
| grant.principal_id, db=db | |
| ) | |
| if group_user_ids: | |
| user_ids_with_access.update(group_user_ids) | |
| if not user_ids_with_access: | |
| return [] | |
| return Users.get_users_by_user_ids(list(user_ids_with_access), db=db) | |
| def has_permission_filter( | |
| self, | |
| db, | |
| query, | |
| DocumentModel, | |
| filter: dict, | |
| resource_type: str, | |
| permission: str = "read", | |
| ): | |
| """ | |
| Apply access control filtering to a SQLAlchemy query by JOINing with access_grant. | |
| This replaces the old JSON-column-based filtering with a proper relational JOIN. | |
| """ | |
| group_ids = filter.get("group_ids", []) | |
| user_id = filter.get("user_id") | |
| if permission == "read_only": | |
| return self._has_read_only_permission_filter( | |
| db, query, DocumentModel, filter, resource_type | |
| ) | |
| # Build principal conditions | |
| principal_conditions = [] | |
| if group_ids or user_id: | |
| # Public access: user:* read | |
| principal_conditions.append( | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == "*", | |
| ) | |
| ) | |
| if user_id: | |
| # Owner always has access | |
| principal_conditions.append(DocumentModel.user_id == user_id) | |
| # Direct user grant | |
| principal_conditions.append( | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ) | |
| if group_ids: | |
| # Group grants | |
| principal_conditions.append( | |
| and_( | |
| AccessGrant.principal_type == "group", | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ) | |
| if not principal_conditions: | |
| return query | |
| # LEFT JOIN access_grant and filter | |
| # We use a subquery approach to avoid duplicates from multiple matching grants | |
| from sqlalchemy import exists as sa_exists, select | |
| grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == permission, | |
| or_( | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == "*", | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ] | |
| if user_id | |
| else [] | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == "group", | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ] | |
| if group_ids | |
| else [] | |
| ), | |
| ), | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| # Owner OR has a matching grant | |
| owner_or_grant = [grant_exists] | |
| if user_id: | |
| owner_or_grant.append(DocumentModel.user_id == user_id) | |
| query = query.filter(or_(*owner_or_grant)) | |
| return query | |
| def _has_read_only_permission_filter( | |
| self, | |
| db, | |
| query, | |
| DocumentModel, | |
| filter: dict, | |
| resource_type: str, | |
| ): | |
| """ | |
| Filter for items where user has read BUT NOT write access. | |
| Public items are NOT considered read_only. | |
| """ | |
| group_ids = filter.get("group_ids", []) | |
| user_id = filter.get("user_id") | |
| from sqlalchemy import exists as sa_exists, select | |
| # Has read grant (not public) | |
| read_grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == "read", | |
| or_( | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ] | |
| if user_id | |
| else [] | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == "group", | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ] | |
| if group_ids | |
| else [] | |
| ), | |
| ), | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| # Does NOT have write grant | |
| write_grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == "write", | |
| or_( | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == user_id, | |
| ) | |
| ] | |
| if user_id | |
| else [] | |
| ), | |
| *( | |
| [ | |
| and_( | |
| AccessGrant.principal_type == "group", | |
| AccessGrant.principal_id.in_(group_ids), | |
| ) | |
| ] | |
| if group_ids | |
| else [] | |
| ), | |
| ), | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| # Is NOT public | |
| public_grant_exists = ( | |
| select(AccessGrant.id) | |
| .where( | |
| AccessGrant.resource_type == resource_type, | |
| AccessGrant.resource_id == DocumentModel.id, | |
| AccessGrant.permission == "read", | |
| AccessGrant.principal_type == "user", | |
| AccessGrant.principal_id == "*", | |
| ) | |
| .correlate(DocumentModel) | |
| .exists() | |
| ) | |
| conditions = [read_grant_exists, ~write_grant_exists, ~public_grant_exists] | |
| # Not owner | |
| if user_id: | |
| conditions.append(DocumentModel.user_id != user_id) | |
| query = query.filter(and_(*conditions)) | |
| return query | |
| AccessGrants = AccessGrantsTable() | |