github-actions[bot]
GitHub deploy: 4d024c91d61f15a8b39171610ab1406915ef598d
d6703a1
import logging
import os
import uuid
import json
from pathlib import Path
from typing import Optional
from urllib.parse import quote
import asyncio
from fastapi import (
BackgroundTasks,
APIRouter,
Depends,
File,
Form,
HTTPException,
Request,
UploadFile,
status,
Query,
)
from fastapi.responses import FileResponse, StreamingResponse
from sqlalchemy.orm import Session
from open_webui.internal.db import get_session, SessionLocal
from open_webui.constants import ERROR_MESSAGES
from open_webui.retrieval.vector.factory import VECTOR_DB_CLIENT
from open_webui.models.channels import Channels
from open_webui.models.users import Users
from open_webui.models.files import (
FileForm,
FileModel,
FileModelResponse,
Files,
)
from open_webui.models.chats import Chats
from open_webui.models.knowledge import Knowledges
from open_webui.models.groups import Groups
from open_webui.models.access_grants import AccessGrants
from open_webui.routers.retrieval import ProcessFileForm, process_file
from open_webui.routers.audio import transcribe
from open_webui.storage.provider import Storage
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.misc import strict_match_mime_type
from pydantic import BaseModel
log = logging.getLogger(__name__)
router = APIRouter()
############################
# Check if the current user has access to a file through any knowledge bases the user may be in.
############################
# TODO: Optimize this function to use the knowledge_file table for faster lookups.
def has_access_to_file(
file_id: Optional[str],
access_type: str,
user=Depends(get_verified_user),
db: Optional[Session] = None,
) -> bool:
file = Files.get_file_by_id(file_id, db=db)
log.debug(f"Checking if user has {access_type} access to file")
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Check if the file is associated with any knowledge bases the user has access to
knowledge_bases = Knowledges.get_knowledges_by_file_id(file_id, db=db)
user_group_ids = {
group.id for group in Groups.get_groups_by_member_id(user.id, db=db)
}
for knowledge_base in knowledge_bases:
if knowledge_base.user_id == user.id or AccessGrants.has_access(
user_id=user.id,
resource_type="knowledge",
resource_id=knowledge_base.id,
permission=access_type,
user_group_ids=user_group_ids,
db=db,
):
return True
knowledge_base_id = file.meta.get("collection_name") if file.meta else None
if knowledge_base_id:
knowledge_bases = Knowledges.get_knowledge_bases_by_user_id(
user.id, access_type, db=db
)
for knowledge_base in knowledge_bases:
if knowledge_base.id == knowledge_base_id:
return True
# Check if the file is associated with any channels the user has access to
channels = Channels.get_channels_by_file_id_and_user_id(file_id, user.id, db=db)
if access_type == "read" and channels:
return True
# Check if the file is associated with any chats the user has access to
# TODO: Granular access control for chats
chats = Chats.get_shared_chats_by_file_id(file_id, db=db)
if chats:
return True
return False
############################
# Upload File
############################
def process_uploaded_file(
request,
file,
file_path,
file_item,
file_metadata,
user,
db: Optional[Session] = None,
):
def _process_handler(db_session):
try:
if file.content_type:
stt_supported_content_types = getattr(
request.app.state.config, "STT_SUPPORTED_CONTENT_TYPES", []
)
if strict_match_mime_type(
stt_supported_content_types, file.content_type
):
file_path_processed = Storage.get_file(file_path)
result = transcribe(
request, file_path_processed, file_metadata, user
)
process_file(
request,
ProcessFileForm(
file_id=file_item.id, content=result.get("text", "")
),
user=user,
db=db_session,
)
elif (not file.content_type.startswith(("image/", "video/"))) or (
request.app.state.config.CONTENT_EXTRACTION_ENGINE == "external"
):
process_file(
request,
ProcessFileForm(file_id=file_item.id),
user=user,
db=db_session,
)
else:
raise Exception(
f"File type {file.content_type} is not supported for processing"
)
else:
log.info(
f"File type {file.content_type} is not provided, but trying to process anyway"
)
process_file(
request,
ProcessFileForm(file_id=file_item.id),
user=user,
db=db_session,
)
except Exception as e:
log.error(f"Error processing file: {file_item.id}")
Files.update_file_data_by_id(
file_item.id,
{
"status": "failed",
"error": str(e.detail) if hasattr(e, "detail") else str(e),
},
db=db_session,
)
if db:
_process_handler(db)
else:
with SessionLocal() as db_session:
_process_handler(db_session)
@router.post("/", response_model=FileModelResponse)
def upload_file(
request: Request,
background_tasks: BackgroundTasks,
file: UploadFile = File(...),
metadata: Optional[dict | str] = Form(None),
process: bool = Query(True),
process_in_background: bool = Query(True),
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
return upload_file_handler(
request,
file=file,
metadata=metadata,
process=process,
process_in_background=process_in_background,
user=user,
background_tasks=background_tasks,
db=db,
)
def upload_file_handler(
request: Request,
file: UploadFile = File(...),
metadata: Optional[dict | str] = Form(None),
process: bool = Query(True),
process_in_background: bool = Query(True),
user=Depends(get_verified_user),
background_tasks: Optional[BackgroundTasks] = None,
db: Optional[Session] = None,
):
log.info(f"file.content_type: {file.content_type} {process}")
if isinstance(metadata, str):
try:
metadata = json.loads(metadata)
except json.JSONDecodeError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Invalid metadata format"),
)
file_metadata = metadata if metadata else {}
try:
unsanitized_filename = file.filename
filename = os.path.basename(unsanitized_filename)
file_extension = os.path.splitext(filename)[1]
# Remove the leading dot from the file extension
file_extension = file_extension[1:] if file_extension else ""
if process and request.app.state.config.ALLOWED_FILE_EXTENSIONS:
request.app.state.config.ALLOWED_FILE_EXTENSIONS = [
ext for ext in request.app.state.config.ALLOWED_FILE_EXTENSIONS if ext
]
if file_extension not in request.app.state.config.ALLOWED_FILE_EXTENSIONS:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(
f"File type {file_extension} is not allowed"
),
)
# replace filename with uuid
id = str(uuid.uuid4())
name = filename
filename = f"{id}_{filename}"
contents, file_path = Storage.upload_file(
file.file,
filename,
{
"OpenWebUI-User-Email": user.email,
"OpenWebUI-User-Id": user.id,
"OpenWebUI-User-Name": user.name,
"OpenWebUI-File-Id": id,
},
)
file_item = Files.insert_new_file(
user.id,
FileForm(
**{
"id": id,
"filename": name,
"path": file_path,
"data": {
**({"status": "pending"} if process else {}),
},
"meta": {
"name": name,
"content_type": (
file.content_type
if isinstance(file.content_type, str)
else None
),
"size": len(contents),
"data": file_metadata,
},
}
),
db=db,
)
if "channel_id" in file_metadata:
channel = Channels.get_channel_by_id_and_user_id(
file_metadata["channel_id"], user.id, db=db
)
if channel:
Channels.add_file_to_channel_by_id(
channel.id, file_item.id, user.id, db=db
)
if process:
if background_tasks and process_in_background:
background_tasks.add_task(
process_uploaded_file,
request,
file,
file_path,
file_item,
file_metadata,
user,
)
return {"status": True, **file_item.model_dump()}
else:
process_uploaded_file(
request,
file,
file_path,
file_item,
file_metadata,
user,
db=db,
)
return {"status": True, **file_item.model_dump()}
else:
if file_item:
return file_item
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error uploading file"),
)
except HTTPException as e:
raise e
except Exception as e:
log.exception(e)
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error uploading file"),
)
############################
# List Files
############################
@router.get("/", response_model=list[FileModelResponse])
async def list_files(
user=Depends(get_verified_user),
content: bool = Query(True),
db: Session = Depends(get_session),
):
if user.role == "admin":
files = Files.get_files(db=db)
else:
files = Files.get_files_by_user_id(user.id, db=db)
if not content:
for file in files:
if "content" in file.data:
del file.data["content"]
return files
############################
# Search Files
############################
@router.get("/search", response_model=list[FileModelResponse])
async def search_files(
filename: str = Query(
...,
description="Filename pattern to search for. Supports wildcards such as '*.txt'",
),
content: bool = Query(True),
skip: int = Query(0, ge=0, description="Number of files to skip"),
limit: int = Query(
100, ge=1, le=1000, description="Maximum number of files to return"
),
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
"""
Search for files by filename with support for wildcard patterns.
Uses SQL-based filtering with pagination for better performance.
"""
# Determine user_id: null for admin (search all), user.id for regular users
user_id = None if user.role == "admin" else user.id
# Use optimized database query with pagination
files = Files.search_files(
user_id=user_id,
filename=filename,
skip=skip,
limit=limit,
db=db,
)
if not files:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No files found matching the pattern.",
)
if not content:
for file in files:
if file.data and "content" in file.data:
del file.data["content"]
return files
############################
# Delete All Files
############################
@router.delete("/all")
async def delete_all_files(
user=Depends(get_admin_user), db: Session = Depends(get_session)
):
result = Files.delete_all_files(db=db)
if result:
try:
Storage.delete_all_files()
VECTOR_DB_CLIENT.reset()
except Exception as e:
log.exception(e)
log.error("Error deleting files")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
return {"message": "All files deleted successfully"}
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
############################
# Get File By Id
############################
@router.get("/{id}", response_model=Optional[FileModel])
async def get_file_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
return file
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/process/status")
async def get_file_process_status(
id: str,
stream: bool = Query(False),
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
if stream:
MAX_FILE_PROCESSING_DURATION = 3600 * 2
async def event_stream(file_id):
# NOTE: We intentionally do NOT capture the request's db session here.
# Each poll creates its own short-lived session to avoid holding a
# connection for hours. A WebSocket push would be more efficient.
for _ in range(MAX_FILE_PROCESSING_DURATION):
file_item = Files.get_file_by_id(file_id) # Creates own session
if file_item:
data = file_item.model_dump().get("data", {})
status = data.get("status")
if status:
event = {"status": status}
if status == "failed":
event["error"] = data.get("error")
yield f"data: {json.dumps(event)}\n\n"
if status in ("completed", "failed"):
break
else:
# Legacy
break
else:
yield f"data: {json.dumps({'status': 'not_found'})}\n\n"
break
await asyncio.sleep(1)
return StreamingResponse(
event_stream(file.id),
media_type="text/event-stream",
)
else:
return {"status": file.data.get("status", "pending")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Get File Data Content By Id
############################
@router.get("/{id}/data/content")
async def get_file_data_content_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
return {"content": file.data.get("content", "")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Update File Data Content By Id
############################
class ContentForm(BaseModel):
content: str
@router.post("/{id}/data/content/update")
def update_file_data_content_by_id(
request: Request,
id: str,
form_data: ContentForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "write", user, db=db)
):
try:
process_file(
request,
ProcessFileForm(file_id=id, content=form_data.content),
user=user,
)
file = Files.get_file_by_id(id=id, db=db)
except Exception as e:
log.exception(e)
log.error(f"Error processing file: {file.id}")
return {"content": file.data.get("content", "")}
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Get File Content By Id
############################
@router.get("/{id}/content")
async def get_file_content_by_id(
id: str,
user=Depends(get_verified_user),
attachment: bool = Query(False),
db: Session = Depends(get_session),
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
try:
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
# Handle Unicode filenames
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename) # RFC5987 encoding
content_type = file.meta.get("content_type")
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename)
headers = {}
if attachment:
headers["Content-Disposition"] = (
f"attachment; filename*=UTF-8''{encoded_filename}"
)
else:
if content_type == "application/pdf" or filename.lower().endswith(
".pdf"
):
headers["Content-Disposition"] = (
f"inline; filename*=UTF-8''{encoded_filename}"
)
content_type = "application/pdf"
elif content_type != "text/plain":
headers["Content-Disposition"] = (
f"attachment; filename*=UTF-8''{encoded_filename}"
)
return FileResponse(file_path, headers=headers, media_type=content_type)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
except Exception as e:
log.exception(e)
log.error("Error getting file content")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/content/html")
async def get_html_file_content_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
file_user = Users.get_user_by_id(file.user_id, db=db)
if not file_user.role == "admin":
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
try:
file_path = Storage.get_file(file.path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
log.info(f"file_path: {file_path}")
return FileResponse(file_path)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
except Exception as e:
log.exception(e)
log.error("Error getting file content")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error getting file content"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/{id}/content/{file_name}")
async def get_file_content_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "read", user, db=db)
):
file_path = file.path
# Handle Unicode filenames
filename = file.meta.get("name", file.filename)
encoded_filename = quote(filename) # RFC5987 encoding
headers = {
"Content-Disposition": f"attachment; filename*=UTF-8''{encoded_filename}"
}
if file_path:
file_path = Storage.get_file(file_path)
file_path = Path(file_path)
# Check if the file already exists in the cache
if file_path.is_file():
return FileResponse(file_path, headers=headers)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
else:
# File path doesn’t exist, return the content as .txt if possible
file_content = file.content.get("content", "")
file_name = file.filename
# Create a generator that encodes the file content
def generator():
yield file_content.encode("utf-8")
return StreamingResponse(
generator(),
media_type="text/plain",
headers=headers,
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# Delete File By Id
############################
@router.delete("/{id}")
async def delete_file_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
file = Files.get_file_by_id(id, db=db)
if not file:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
file.user_id == user.id
or user.role == "admin"
or has_access_to_file(id, "write", user, db=db)
):
# Clean up KB associations and embeddings before deleting
knowledges = Knowledges.get_knowledges_by_file_id(id, db=db)
for knowledge in knowledges:
# Remove KB-file relationship
Knowledges.remove_file_from_knowledge_by_id(knowledge.id, id, db=db)
# Clean KB embeddings (same logic as /knowledge/{id}/file/remove)
try:
VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={"file_id": id}
)
if file.hash:
VECTOR_DB_CLIENT.delete(
collection_name=knowledge.id, filter={"hash": file.hash}
)
except Exception as e:
log.debug(f"KB embedding cleanup for {knowledge.id}: {e}")
result = Files.delete_file_by_id(id, db=db)
if result:
try:
Storage.delete_file(file.path)
VECTOR_DB_CLIENT.delete(collection_name=f"file-{id}")
except Exception as e:
log.exception(e)
log.error("Error deleting files")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting files"),
)
return {"message": "File deleted successfully"}
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error deleting file"),
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)