github-actions[bot]
GitHub deploy: 4d024c91d61f15a8b39171610ab1406915ef598d
d6703a1
import logging
from pathlib import Path
from typing import Optional
import time
import re
import aiohttp
from open_webui.env import AIOHTTP_CLIENT_TIMEOUT
from open_webui.models.groups import Groups
from pydantic import BaseModel, HttpUrl
from fastapi import APIRouter, Depends, HTTPException, Request, status
from sqlalchemy.orm import Session
from open_webui.internal.db import get_session
from open_webui.models.oauth_sessions import OAuthSessions
from open_webui.models.tools import (
ToolForm,
ToolModel,
ToolResponse,
ToolUserResponse,
ToolAccessResponse,
Tools,
)
from open_webui.models.access_grants import AccessGrants
from open_webui.utils.plugin import (
load_tool_module_by_id,
replace_imports,
get_tool_module_from_cache,
resolve_valves_schema_options,
)
from open_webui.utils.tools import get_tool_specs
from open_webui.utils.auth import get_admin_user, get_verified_user
from open_webui.utils.access_control import has_access, has_permission
from open_webui.utils.tools import get_tool_servers
from open_webui.config import CACHE_DIR, BYPASS_ADMIN_ACCESS_CONTROL
from open_webui.constants import ERROR_MESSAGES
log = logging.getLogger(__name__)
router = APIRouter()
def get_tool_module(request, tool_id, load_from_db=True):
"""
Get the tool module by its ID.
"""
tool_module, _ = get_tool_module_from_cache(request, tool_id, load_from_db)
return tool_module
############################
# GetTools
############################
@router.get("/", response_model=list[ToolUserResponse])
async def get_tools(
request: Request,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = []
# Local Tools
for tool in Tools.get_tools(db=db):
tool_module = get_tool_module(request, tool.id)
tools.append(
ToolUserResponse(
**{
**tool.model_dump(),
"has_user_valves": hasattr(tool_module, "UserValves"),
}
)
)
# OpenAPI Tool Servers
server_access_grants = {}
for server in await get_tool_servers(request):
connection = request.app.state.config.TOOL_SERVER_CONNECTIONS[
server.get("idx", 0)
]
server_config = connection.get("config", {})
server_id = f"server:{server.get('id')}"
server_access_grants[server_id] = server_config.get("access_grants", [])
tools.append(
ToolUserResponse(
**{
"id": server_id,
"user_id": server_id,
"name": server.get("openapi", {})
.get("info", {})
.get("title", "Tool Server"),
"meta": {
"description": server.get("openapi", {})
.get("info", {})
.get("description", ""),
},
"updated_at": int(time.time()),
"created_at": int(time.time()),
}
)
)
# MCP Tool Servers
for server in request.app.state.config.TOOL_SERVER_CONNECTIONS:
if server.get("type", "openapi") == "mcp":
server_id = server.get("info", {}).get("id")
auth_type = server.get("auth_type", "none")
session_token = None
if auth_type == "oauth_2.1":
splits = server_id.split(":")
server_id = splits[-1] if len(splits) > 1 else server_id
session_token = (
await request.app.state.oauth_client_manager.get_oauth_token(
user.id, f"mcp:{server_id}"
)
)
server_config = server.get("config", {})
tool_id = f"server:mcp:{server.get('info', {}).get('id')}"
server_access_grants[tool_id] = server_config.get("access_grants", [])
tools.append(
ToolUserResponse(
**{
"id": tool_id,
"user_id": tool_id,
"name": server.get("info", {}).get("name", "MCP Tool Server"),
"meta": {
"description": server.get("info", {}).get(
"description", ""
),
},
"updated_at": int(time.time()),
"created_at": int(time.time()),
**(
{
"authenticated": session_token is not None,
}
if auth_type == "oauth_2.1"
else {}
),
}
)
)
if user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL:
# Admin can see all tools
return tools
else:
user_group_ids = {
group.id for group in Groups.get_groups_by_member_id(user.id, db=db)
}
tools = [
tool
for tool in tools
if tool.user_id == user.id
or (
has_access(
user.id,
"read",
server_access_grants.get(str(tool.id), []),
user_group_ids,
db=db,
)
if str(tool.id).startswith("server:")
else AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tool.id,
permission="read",
user_group_ids=user_group_ids,
db=db,
)
)
]
return tools
############################
# GetToolList
############################
@router.get("/list", response_model=list[ToolAccessResponse])
async def get_tool_list(
user=Depends(get_verified_user), db: Session = Depends(get_session)
):
if user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL:
tools = Tools.get_tools(db=db)
else:
tools = Tools.get_tools_by_user_id(user.id, "read", db=db)
return [
ToolAccessResponse(
**tool.model_dump(),
write_access=(
(user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL)
or user.id == tool.user_id
or AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tool.id,
permission="write",
db=db,
)
),
)
for tool in tools
]
############################
# LoadFunctionFromLink
############################
class LoadUrlForm(BaseModel):
url: HttpUrl
def github_url_to_raw_url(url: str) -> str:
# Handle 'tree' (folder) URLs (add main.py at the end)
m1 = re.match(r"https://github\.com/([^/]+)/([^/]+)/tree/([^/]+)/(.*)", url)
if m1:
org, repo, branch, path = m1.groups()
return f"https://raw.githubusercontent.com/{org}/{repo}/refs/heads/{branch}/{path.rstrip('/')}/main.py"
# Handle 'blob' (file) URLs
m2 = re.match(r"https://github\.com/([^/]+)/([^/]+)/blob/([^/]+)/(.*)", url)
if m2:
org, repo, branch, path = m2.groups()
return (
f"https://raw.githubusercontent.com/{org}/{repo}/refs/heads/{branch}/{path}"
)
# No match; return as-is
return url
@router.post("/load/url", response_model=Optional[dict])
async def load_tool_from_url(
request: Request, form_data: LoadUrlForm, user=Depends(get_admin_user)
):
# NOTE: This is NOT a SSRF vulnerability:
# This endpoint is admin-only (see get_admin_user), meant for *trusted* internal use,
# and does NOT accept untrusted user input. Access is enforced by authentication.
url = str(form_data.url)
if not url:
raise HTTPException(status_code=400, detail="Please enter a valid URL")
url = github_url_to_raw_url(url)
url_parts = url.rstrip("/").split("/")
file_name = url_parts[-1]
tool_name = (
file_name[:-3]
if (
file_name.endswith(".py")
and (not file_name.startswith(("main.py", "index.py", "__init__.py")))
)
else url_parts[-2] if len(url_parts) > 1 else "function"
)
try:
async with aiohttp.ClientSession(
trust_env=True, timeout=aiohttp.ClientTimeout(total=AIOHTTP_CLIENT_TIMEOUT)
) as session:
async with session.get(
url, headers={"Content-Type": "application/json"}
) as resp:
if resp.status != 200:
raise HTTPException(
status_code=resp.status, detail="Failed to fetch the tool"
)
data = await resp.text()
if not data:
raise HTTPException(
status_code=400, detail="No data received from the URL"
)
return {
"name": tool_name,
"content": data,
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error importing tool: {e}")
############################
# ExportTools
############################
@router.get("/export", response_model=list[ToolModel])
async def export_tools(
request: Request,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
if user.role != "admin" and not has_permission(
user.id,
"workspace.tools_export",
request.app.state.config.USER_PERMISSIONS,
db=db,
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
if user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL:
return Tools.get_tools(db=db)
else:
return Tools.get_tools_by_user_id(user.id, "read", db=db)
############################
# CreateNewTools
############################
@router.post("/create", response_model=Optional[ToolResponse])
async def create_new_tools(
request: Request,
form_data: ToolForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
if user.role != "admin" and not (
has_permission(
user.id, "workspace.tools", request.app.state.config.USER_PERMISSIONS, db=db
)
or has_permission(
user.id,
"workspace.tools_import",
request.app.state.config.USER_PERMISSIONS,
db=db,
)
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
if not form_data.id.isidentifier():
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Only alphanumeric characters and underscores are allowed in the id",
)
form_data.id = form_data.id.lower()
tools = Tools.get_tool_by_id(form_data.id, db=db)
if tools is None:
try:
form_data.content = replace_imports(form_data.content)
tool_module, frontmatter = load_tool_module_by_id(
form_data.id, content=form_data.content
)
form_data.meta.manifest = frontmatter
TOOLS = request.app.state.TOOLS
TOOLS[form_data.id] = tool_module
specs = get_tool_specs(TOOLS[form_data.id])
tools = Tools.insert_new_tool(user.id, form_data, specs, db=db)
tool_cache_dir = CACHE_DIR / "tools" / form_data.id
tool_cache_dir.mkdir(parents=True, exist_ok=True)
if tools:
return tools
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error creating tools"),
)
except Exception as e:
log.exception(f"Failed to load the tool by id {form_data.id}: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(str(e)),
)
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ID_TAKEN,
)
############################
# GetToolsById
############################
@router.get("/id/{id}", response_model=Optional[ToolAccessResponse])
async def get_tools_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
tools = Tools.get_tool_by_id(id, db=db)
if tools:
if (
user.role == "admin"
or tools.user_id == user.id
or AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tools.id,
permission="read",
db=db,
)
):
return ToolAccessResponse(
**tools.model_dump(),
write_access=(
(user.role == "admin" and BYPASS_ADMIN_ACCESS_CONTROL)
or user.id == tools.user_id
or AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tools.id,
permission="write",
db=db,
)
),
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# UpdateToolsById
############################
@router.post("/id/{id}/update", response_model=Optional[ToolModel])
async def update_tools_by_id(
request: Request,
id: str,
form_data: ToolForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = Tools.get_tool_by_id(id, db=db)
if not tools:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
# Is the user the original creator, in a group with write access, or an admin
if (
tools.user_id != user.id
and not AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tools.id,
permission="write",
db=db,
)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
try:
form_data.content = replace_imports(form_data.content)
tool_module, frontmatter = load_tool_module_by_id(id, content=form_data.content)
form_data.meta.manifest = frontmatter
TOOLS = request.app.state.TOOLS
TOOLS[id] = tool_module
specs = get_tool_specs(TOOLS[id])
updated = {
**form_data.model_dump(exclude={"id"}),
"specs": specs,
}
log.debug(updated)
tools = Tools.update_tool_by_id(id, updated, db=db)
if tools:
return tools
else:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT("Error updating tools"),
)
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(str(e)),
)
############################
# UpdateToolAccessById
############################
class ToolAccessGrantsForm(BaseModel):
access_grants: list[dict]
@router.post("/id/{id}/access/update", response_model=Optional[ToolModel])
async def update_tool_access_by_id(
id: str,
form_data: ToolAccessGrantsForm,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = Tools.get_tool_by_id(id, db=db)
if not tools:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
tools.user_id != user.id
and not AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tools.id,
permission="write",
db=db,
)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
AccessGrants.set_access_grants("tool", id, form_data.access_grants, db=db)
return Tools.get_tool_by_id(id, db=db)
############################
# DeleteToolsById
############################
@router.delete("/id/{id}/delete", response_model=bool)
async def delete_tools_by_id(
request: Request,
id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = Tools.get_tool_by_id(id, db=db)
if not tools:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
tools.user_id != user.id
and not AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tools.id,
permission="write",
db=db,
)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.UNAUTHORIZED,
)
result = Tools.delete_tool_by_id(id, db=db)
if result:
TOOLS = request.app.state.TOOLS
if id in TOOLS:
del TOOLS[id]
return result
############################
# GetToolValves
############################
@router.get("/id/{id}/valves", response_model=Optional[dict])
async def get_tools_valves_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
tools = Tools.get_tool_by_id(id, db=db)
if tools:
try:
valves = Tools.get_tool_valves_by_id(id, db=db)
return valves
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(str(e)),
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# GetToolValvesSpec
############################
@router.get("/id/{id}/valves/spec", response_model=Optional[dict])
async def get_tools_valves_spec_by_id(
request: Request,
id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = Tools.get_tool_by_id(id, db=db)
if tools:
if id in request.app.state.TOOLS:
tools_module = request.app.state.TOOLS[id]
else:
tools_module, _ = load_tool_module_by_id(id)
request.app.state.TOOLS[id] = tools_module
if hasattr(tools_module, "Valves"):
Valves = tools_module.Valves
schema = Valves.schema()
# Resolve dynamic options for select dropdowns
schema = resolve_valves_schema_options(Valves, schema, user)
return schema
return None
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
############################
# UpdateToolValves
############################
@router.post("/id/{id}/valves/update", response_model=Optional[dict])
async def update_tools_valves_by_id(
request: Request,
id: str,
form_data: dict,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = Tools.get_tool_by_id(id, db=db)
if not tools:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
if (
tools.user_id != user.id
and not AccessGrants.has_access(
user_id=user.id,
resource_type="tool",
resource_id=tools.id,
permission="write",
db=db,
)
and user.role != "admin"
):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.ACCESS_PROHIBITED,
)
if id in request.app.state.TOOLS:
tools_module = request.app.state.TOOLS[id]
else:
tools_module, _ = load_tool_module_by_id(id)
request.app.state.TOOLS[id] = tools_module
if not hasattr(tools_module, "Valves"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
Valves = tools_module.Valves
try:
form_data = {k: v for k, v in form_data.items() if v is not None}
valves = Valves(**form_data)
valves_dict = valves.model_dump(exclude_unset=True)
Tools.update_tool_valves_by_id(id, valves_dict, db=db)
return valves_dict
except Exception as e:
log.exception(f"Failed to update tool valves by id {id}: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(str(e)),
)
############################
# ToolUserValves
############################
@router.get("/id/{id}/valves/user", response_model=Optional[dict])
async def get_tools_user_valves_by_id(
id: str, user=Depends(get_verified_user), db: Session = Depends(get_session)
):
tools = Tools.get_tool_by_id(id, db=db)
if tools:
try:
user_valves = Tools.get_user_valves_by_id_and_user_id(id, user.id, db=db)
return user_valves
except Exception as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(str(e)),
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.get("/id/{id}/valves/user/spec", response_model=Optional[dict])
async def get_tools_user_valves_spec_by_id(
request: Request,
id: str,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = Tools.get_tool_by_id(id, db=db)
if tools:
if id in request.app.state.TOOLS:
tools_module = request.app.state.TOOLS[id]
else:
tools_module, _ = load_tool_module_by_id(id)
request.app.state.TOOLS[id] = tools_module
if hasattr(tools_module, "UserValves"):
UserValves = tools_module.UserValves
schema = UserValves.schema()
# Resolve dynamic options for select dropdowns
schema = resolve_valves_schema_options(UserValves, schema, user)
return schema
return None
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
@router.post("/id/{id}/valves/user/update", response_model=Optional[dict])
async def update_tools_user_valves_by_id(
request: Request,
id: str,
form_data: dict,
user=Depends(get_verified_user),
db: Session = Depends(get_session),
):
tools = Tools.get_tool_by_id(id, db=db)
if tools:
if id in request.app.state.TOOLS:
tools_module = request.app.state.TOOLS[id]
else:
tools_module, _ = load_tool_module_by_id(id)
request.app.state.TOOLS[id] = tools_module
if hasattr(tools_module, "UserValves"):
UserValves = tools_module.UserValves
try:
form_data = {k: v for k, v in form_data.items() if v is not None}
user_valves = UserValves(**form_data)
user_valves_dict = user_valves.model_dump(exclude_unset=True)
Tools.update_user_valves_by_id_and_user_id(
id, user.id, user_valves_dict, db=db
)
return user_valves_dict
except Exception as e:
log.exception(f"Failed to update user valves by id {id}: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=ERROR_MESSAGES.DEFAULT(str(e)),
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)
else:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=ERROR_MESSAGES.NOT_FOUND,
)