Spaces:
Build error
Build error
| import logging | |
| import time | |
| import uuid | |
| from typing import Optional | |
| import re | |
| from pydantic import BaseModel, ConfigDict | |
| from sqlalchemy import BigInteger, Column, Text, JSON, Boolean, func | |
| from sqlalchemy.orm import Session | |
| from open_webui.internal.db import Base, JSONField, get_db, get_db_context | |
| log = logging.getLogger(__name__) | |
| #################### | |
| # Folder DB Schema | |
| #################### | |
| class Folder(Base): | |
| __tablename__ = "folder" | |
| id = Column(Text, primary_key=True, unique=True) | |
| parent_id = Column(Text, nullable=True) | |
| user_id = Column(Text) | |
| name = Column(Text) | |
| items = Column(JSON, nullable=True) | |
| meta = Column(JSON, nullable=True) | |
| data = Column(JSON, nullable=True) | |
| is_expanded = Column(Boolean, default=False) | |
| created_at = Column(BigInteger) | |
| updated_at = Column(BigInteger) | |
| class FolderModel(BaseModel): | |
| id: str | |
| parent_id: Optional[str] = None | |
| user_id: str | |
| name: str | |
| items: Optional[dict] = None | |
| meta: Optional[dict] = None | |
| data: Optional[dict] = None | |
| is_expanded: bool = False | |
| created_at: int | |
| updated_at: int | |
| model_config = ConfigDict(from_attributes=True) | |
| class FolderMetadataResponse(BaseModel): | |
| icon: Optional[str] = None | |
| class FolderNameIdResponse(BaseModel): | |
| id: str | |
| name: str | |
| meta: Optional[FolderMetadataResponse] = None | |
| parent_id: Optional[str] = None | |
| is_expanded: bool = False | |
| created_at: int | |
| updated_at: int | |
| #################### | |
| # Forms | |
| #################### | |
| class FolderForm(BaseModel): | |
| name: str | |
| data: Optional[dict] = None | |
| meta: Optional[dict] = None | |
| model_config = ConfigDict(extra="allow") | |
| class FolderUpdateForm(BaseModel): | |
| name: Optional[str] = None | |
| data: Optional[dict] = None | |
| meta: Optional[dict] = None | |
| model_config = ConfigDict(extra="allow") | |
| class FolderTable: | |
| def insert_new_folder( | |
| self, | |
| user_id: str, | |
| form_data: FolderForm, | |
| parent_id: Optional[str] = None, | |
| db: Optional[Session] = None, | |
| ) -> Optional[FolderModel]: | |
| with get_db_context(db) as db: | |
| id = str(uuid.uuid4()) | |
| folder = FolderModel( | |
| **{ | |
| "id": id, | |
| "user_id": user_id, | |
| **(form_data.model_dump(exclude_unset=True) or {}), | |
| "parent_id": parent_id, | |
| "created_at": int(time.time()), | |
| "updated_at": int(time.time()), | |
| } | |
| ) | |
| try: | |
| result = Folder(**folder.model_dump()) | |
| db.add(result) | |
| db.commit() | |
| db.refresh(result) | |
| if result: | |
| return FolderModel.model_validate(result) | |
| else: | |
| return None | |
| except Exception as e: | |
| log.exception(f"Error inserting a new folder: {e}") | |
| return None | |
| def get_folder_by_id_and_user_id( | |
| self, id: str, user_id: str, db: Optional[Session] = None | |
| ) -> Optional[FolderModel]: | |
| try: | |
| with get_db_context(db) as db: | |
| folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
| if not folder: | |
| return None | |
| return FolderModel.model_validate(folder) | |
| except Exception: | |
| return None | |
| def get_children_folders_by_id_and_user_id( | |
| self, id: str, user_id: str, db: Optional[Session] = None | |
| ) -> Optional[list[FolderModel]]: | |
| try: | |
| with get_db_context(db) as db: | |
| folders = [] | |
| def get_children(folder): | |
| children = self.get_folders_by_parent_id_and_user_id( | |
| folder.id, user_id, db=db | |
| ) | |
| for child in children: | |
| get_children(child) | |
| folders.append(child) | |
| folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
| if not folder: | |
| return None | |
| get_children(folder) | |
| return folders | |
| except Exception: | |
| return None | |
| def get_folders_by_user_id( | |
| self, user_id: str, db: Optional[Session] = None | |
| ) -> list[FolderModel]: | |
| with get_db_context(db) as db: | |
| return [ | |
| FolderModel.model_validate(folder) | |
| for folder in db.query(Folder).filter_by(user_id=user_id).all() | |
| ] | |
| def get_folder_by_parent_id_and_user_id_and_name( | |
| self, | |
| parent_id: Optional[str], | |
| user_id: str, | |
| name: str, | |
| db: Optional[Session] = None, | |
| ) -> Optional[FolderModel]: | |
| try: | |
| with get_db_context(db) as db: | |
| # Check if folder exists | |
| folder = ( | |
| db.query(Folder) | |
| .filter_by(parent_id=parent_id, user_id=user_id) | |
| .filter(Folder.name.ilike(name)) | |
| .first() | |
| ) | |
| if not folder: | |
| return None | |
| return FolderModel.model_validate(folder) | |
| except Exception as e: | |
| log.error(f"get_folder_by_parent_id_and_user_id_and_name: {e}") | |
| return None | |
| def get_folders_by_parent_id_and_user_id( | |
| self, parent_id: Optional[str], user_id: str, db: Optional[Session] = None | |
| ) -> list[FolderModel]: | |
| with get_db_context(db) as db: | |
| return [ | |
| FolderModel.model_validate(folder) | |
| for folder in db.query(Folder) | |
| .filter_by(parent_id=parent_id, user_id=user_id) | |
| .all() | |
| ] | |
| def update_folder_parent_id_by_id_and_user_id( | |
| self, | |
| id: str, | |
| user_id: str, | |
| parent_id: str, | |
| db: Optional[Session] = None, | |
| ) -> Optional[FolderModel]: | |
| try: | |
| with get_db_context(db) as db: | |
| folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
| if not folder: | |
| return None | |
| folder.parent_id = parent_id | |
| folder.updated_at = int(time.time()) | |
| db.commit() | |
| return FolderModel.model_validate(folder) | |
| except Exception as e: | |
| log.error(f"update_folder: {e}") | |
| return | |
| def update_folder_by_id_and_user_id( | |
| self, | |
| id: str, | |
| user_id: str, | |
| form_data: FolderUpdateForm, | |
| db: Optional[Session] = None, | |
| ) -> Optional[FolderModel]: | |
| try: | |
| with get_db_context(db) as db: | |
| folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
| if not folder: | |
| return None | |
| form_data = form_data.model_dump(exclude_unset=True) | |
| existing_folder = ( | |
| db.query(Folder) | |
| .filter_by( | |
| name=form_data.get("name"), | |
| parent_id=folder.parent_id, | |
| user_id=user_id, | |
| ) | |
| .first() | |
| ) | |
| if existing_folder and existing_folder.id != id: | |
| return None | |
| folder.name = form_data.get("name", folder.name) | |
| if "data" in form_data: | |
| folder.data = { | |
| **(folder.data or {}), | |
| **form_data["data"], | |
| } | |
| if "meta" in form_data: | |
| folder.meta = { | |
| **(folder.meta or {}), | |
| **form_data["meta"], | |
| } | |
| folder.updated_at = int(time.time()) | |
| db.commit() | |
| return FolderModel.model_validate(folder) | |
| except Exception as e: | |
| log.error(f"update_folder: {e}") | |
| return | |
| def update_folder_is_expanded_by_id_and_user_id( | |
| self, id: str, user_id: str, is_expanded: bool, db: Optional[Session] = None | |
| ) -> Optional[FolderModel]: | |
| try: | |
| with get_db_context(db) as db: | |
| folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
| if not folder: | |
| return None | |
| folder.is_expanded = is_expanded | |
| folder.updated_at = int(time.time()) | |
| db.commit() | |
| return FolderModel.model_validate(folder) | |
| except Exception as e: | |
| log.error(f"update_folder: {e}") | |
| return | |
| def delete_folder_by_id_and_user_id( | |
| self, id: str, user_id: str, db: Optional[Session] = None | |
| ) -> list[str]: | |
| try: | |
| folder_ids = [] | |
| with get_db_context(db) as db: | |
| folder = db.query(Folder).filter_by(id=id, user_id=user_id).first() | |
| if not folder: | |
| return folder_ids | |
| folder_ids.append(folder.id) | |
| # Delete all children folders | |
| def delete_children(folder): | |
| folder_children = self.get_folders_by_parent_id_and_user_id( | |
| folder.id, user_id, db=db | |
| ) | |
| for folder_child in folder_children: | |
| delete_children(folder_child) | |
| folder_ids.append(folder_child.id) | |
| folder = db.query(Folder).filter_by(id=folder_child.id).first() | |
| db.delete(folder) | |
| db.commit() | |
| delete_children(folder) | |
| db.delete(folder) | |
| db.commit() | |
| return folder_ids | |
| except Exception as e: | |
| log.error(f"delete_folder: {e}") | |
| return [] | |
| def normalize_folder_name(self, name: str) -> str: | |
| # Replace _ and space with a single space, lower case, collapse multiple spaces | |
| name = re.sub(r"[\s_]+", " ", name) | |
| return name.strip().lower() | |
| def search_folders_by_names( | |
| self, user_id: str, queries: list[str], db: Optional[Session] = None | |
| ) -> list[FolderModel]: | |
| """ | |
| Search for folders for a user where the name matches any of the queries, treating _ and space as equivalent, case-insensitive. | |
| """ | |
| normalized_queries = [self.normalize_folder_name(q) for q in queries] | |
| if not normalized_queries: | |
| return [] | |
| results = {} | |
| with get_db_context(db) as db: | |
| folders = db.query(Folder).filter_by(user_id=user_id).all() | |
| for folder in folders: | |
| if self.normalize_folder_name(folder.name) in normalized_queries: | |
| results[folder.id] = FolderModel.model_validate(folder) | |
| # get children folders | |
| children = self.get_children_folders_by_id_and_user_id( | |
| folder.id, user_id, db=db | |
| ) | |
| for child in children: | |
| results[child.id] = child | |
| # Return the results as a list | |
| if not results: | |
| return [] | |
| else: | |
| results = list(results.values()) | |
| return results | |
| def search_folders_by_name_contains( | |
| self, user_id: str, query: str, db: Optional[Session] = None | |
| ) -> list[FolderModel]: | |
| """ | |
| Partial match: normalized name contains (as substring) the normalized query. | |
| """ | |
| normalized_query = self.normalize_folder_name(query) | |
| results = [] | |
| with get_db_context(db) as db: | |
| folders = db.query(Folder).filter_by(user_id=user_id).all() | |
| for folder in folders: | |
| norm_name = self.normalize_folder_name(folder.name) | |
| if normalized_query in norm_name: | |
| results.append(FolderModel.model_validate(folder)) | |
| return results | |
| Folders = FolderTable() | |