Spaces:
Build error
Build error
| import os | |
| import re | |
| import subprocess | |
| import sys | |
| from importlib import util | |
| import types | |
| import tempfile | |
| import logging | |
| from typing import Any | |
| from open_webui.env import PIP_OPTIONS, PIP_PACKAGE_INDEX_OPTIONS, OFFLINE_MODE | |
| from open_webui.models.functions import Functions | |
| from open_webui.models.tools import Tools | |
| log = logging.getLogger(__name__) | |
| def resolve_valves_schema_options( | |
| valves_class: type, schema: dict, user: Any = None | |
| ) -> dict: | |
| """ | |
| Resolve dynamic options in a Valves schema. | |
| For properties with `input.options`, this function handles two cases: | |
| - List: Used directly as dropdown options | |
| - String: Treated as method name, called to get options dynamically | |
| Usage in Valves: | |
| class UserValves(BaseModel): | |
| # Static options | |
| priority: str = Field( | |
| default="medium", | |
| json_schema_extra={ | |
| "input": { | |
| "type": "select", | |
| "options": ["low", "medium", "high"] | |
| } | |
| } | |
| ) | |
| # Dynamic options (method name) | |
| model: str = Field( | |
| default="", | |
| json_schema_extra={ | |
| "input": { | |
| "type": "select", | |
| "options": "get_model_options" | |
| } | |
| } | |
| ) | |
| @classmethod | |
| def get_model_options(cls, __user__=None) -> list[dict]: | |
| return [{"value": "gpt-4", "label": "GPT-4"}] | |
| Args: | |
| valves_class: The Valves or UserValves Pydantic model class | |
| schema: The JSON schema dict from valves_class.schema() | |
| user: Optional user object passed to methods that accept __user__ | |
| Returns: | |
| Modified schema dict with resolved options | |
| """ | |
| if not schema or "properties" not in schema: | |
| return schema | |
| # Make a copy to avoid mutating the original | |
| schema = dict(schema) | |
| schema["properties"] = dict(schema.get("properties", {})) | |
| for prop_name, prop_schema in list(schema["properties"].items()): | |
| # Get the original field info from the Pydantic model | |
| if not hasattr(valves_class, "model_fields"): | |
| continue | |
| field_info = valves_class.model_fields.get(prop_name) | |
| if not field_info: | |
| continue | |
| # Check json_schema_extra for options | |
| json_schema_extra = field_info.json_schema_extra | |
| if not json_schema_extra or not isinstance(json_schema_extra, dict): | |
| continue | |
| input_config = json_schema_extra.get("input") | |
| if not input_config or not isinstance(input_config, dict): | |
| continue | |
| options = input_config.get("options") | |
| if options is None: | |
| continue | |
| resolved_options = None | |
| # Case 1: options is already a list - use directly | |
| if isinstance(options, list): | |
| resolved_options = options | |
| # Case 2: options is a string - treat as method name | |
| elif isinstance(options, str) and options: | |
| method = getattr(valves_class, options, None) | |
| if method is None or not callable(method): | |
| log.warning( | |
| f"options '{options}' not found or not callable on {valves_class.__name__}" | |
| ) | |
| continue | |
| try: | |
| import inspect | |
| sig = inspect.signature(method) | |
| params = sig.parameters | |
| # Prepare kwargs based on what the method accepts | |
| kwargs = {} | |
| if "__user__" in params and user is not None: | |
| kwargs["__user__"] = ( | |
| user.model_dump() if hasattr(user, "model_dump") else user | |
| ) | |
| if "user" in params and user is not None: | |
| kwargs["user"] = ( | |
| user.model_dump() if hasattr(user, "model_dump") else user | |
| ) | |
| resolved_options = method(**kwargs) if kwargs else method() | |
| # Validate return type | |
| if not isinstance(resolved_options, list): | |
| log.warning( | |
| f"Method '{options}' did not return a list for {prop_name}" | |
| ) | |
| continue | |
| except Exception as e: | |
| log.warning(f"Failed to resolve options for {prop_name}: {e}") | |
| continue | |
| else: | |
| # Invalid options type - skip | |
| continue | |
| # Update the schema with resolved options | |
| schema["properties"][prop_name] = dict(prop_schema) | |
| if "input" not in schema["properties"][prop_name]: | |
| schema["properties"][prop_name]["input"] = {"type": "select"} | |
| else: | |
| schema["properties"][prop_name]["input"] = dict( | |
| schema["properties"][prop_name].get("input", {}) | |
| ) | |
| schema["properties"][prop_name]["input"]["options"] = resolved_options | |
| return schema | |
| def extract_frontmatter(content): | |
| """ | |
| Extract frontmatter as a dictionary from the provided content string. | |
| """ | |
| frontmatter = {} | |
| frontmatter_started = False | |
| frontmatter_ended = False | |
| frontmatter_pattern = re.compile(r"^\s*([a-z_]+):\s*(.*)\s*$", re.IGNORECASE) | |
| try: | |
| lines = content.splitlines() | |
| if len(lines) < 1 or lines[0].strip() != '"""': | |
| # The content doesn't start with triple quotes | |
| return {} | |
| frontmatter_started = True | |
| for line in lines[1:]: | |
| if '"""' in line: | |
| if frontmatter_started: | |
| frontmatter_ended = True | |
| break | |
| if frontmatter_started and not frontmatter_ended: | |
| match = frontmatter_pattern.match(line) | |
| if match: | |
| key, value = match.groups() | |
| frontmatter[key.strip()] = value.strip() | |
| except Exception as e: | |
| log.exception(f"Failed to extract frontmatter: {e}") | |
| return {} | |
| return frontmatter | |
| def replace_imports(content): | |
| """ | |
| Replace the import paths in the content. | |
| """ | |
| replacements = { | |
| "from utils": "from open_webui.utils", | |
| "from apps": "from open_webui.apps", | |
| "from main": "from open_webui.main", | |
| "from config": "from open_webui.config", | |
| } | |
| for old, new in replacements.items(): | |
| content = content.replace(old, new) | |
| return content | |
| def load_tool_module_by_id(tool_id, content=None): | |
| if content is None: | |
| tool = Tools.get_tool_by_id(tool_id) | |
| if not tool: | |
| raise Exception(f"Toolkit not found: {tool_id}") | |
| content = tool.content | |
| content = replace_imports(content) | |
| Tools.update_tool_by_id(tool_id, {"content": content}) | |
| else: | |
| frontmatter = extract_frontmatter(content) | |
| # Install required packages found within the frontmatter | |
| install_frontmatter_requirements(frontmatter.get("requirements", "")) | |
| module_name = f"tool_{tool_id}" | |
| module = types.ModuleType(module_name) | |
| sys.modules[module_name] = module | |
| # Create a temporary file and use it to define `__file__` so | |
| # that it works as expected from the module's perspective. | |
| temp_file = tempfile.NamedTemporaryFile(delete=False) | |
| temp_file.close() | |
| try: | |
| with open(temp_file.name, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| module.__dict__["__file__"] = temp_file.name | |
| # Executing the modified content in the created module's namespace | |
| exec(content, module.__dict__) | |
| frontmatter = extract_frontmatter(content) | |
| log.info(f"Loaded module: {module.__name__}") | |
| # Create and return the object if the class 'Tools' is found in the module | |
| if hasattr(module, "Tools"): | |
| return module.Tools(), frontmatter | |
| else: | |
| raise Exception("No Tools class found in the module") | |
| except Exception as e: | |
| log.error(f"Error loading module: {tool_id}: {e}") | |
| del sys.modules[module_name] # Clean up | |
| raise e | |
| finally: | |
| os.unlink(temp_file.name) | |
| def load_function_module_by_id(function_id: str, content: str | None = None): | |
| if content is None: | |
| function = Functions.get_function_by_id(function_id) | |
| if not function: | |
| raise Exception(f"Function not found: {function_id}") | |
| content = function.content | |
| content = replace_imports(content) | |
| Functions.update_function_by_id(function_id, {"content": content}) | |
| else: | |
| frontmatter = extract_frontmatter(content) | |
| install_frontmatter_requirements(frontmatter.get("requirements", "")) | |
| module_name = f"function_{function_id}" | |
| module = types.ModuleType(module_name) | |
| sys.modules[module_name] = module | |
| # Create a temporary file and use it to define `__file__` so | |
| # that it works as expected from the module's perspective. | |
| temp_file = tempfile.NamedTemporaryFile(delete=False) | |
| temp_file.close() | |
| try: | |
| with open(temp_file.name, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| module.__dict__["__file__"] = temp_file.name | |
| # Execute the modified content in the created module's namespace | |
| exec(content, module.__dict__) | |
| frontmatter = extract_frontmatter(content) | |
| log.info(f"Loaded module: {module.__name__}") | |
| # Create appropriate object based on available class type in the module | |
| if hasattr(module, "Pipe"): | |
| return module.Pipe(), "pipe", frontmatter | |
| elif hasattr(module, "Filter"): | |
| return module.Filter(), "filter", frontmatter | |
| elif hasattr(module, "Action"): | |
| return module.Action(), "action", frontmatter | |
| else: | |
| raise Exception("No Function class found in the module") | |
| except Exception as e: | |
| log.error(f"Error loading module: {function_id}: {e}") | |
| # Cleanup by removing the module in case of error | |
| del sys.modules[module_name] | |
| Functions.update_function_by_id(function_id, {"is_active": False}) | |
| raise e | |
| finally: | |
| os.unlink(temp_file.name) | |
| def get_tool_module_from_cache(request, tool_id, load_from_db=True): | |
| if load_from_db: | |
| # Always load from the database by default | |
| tool = Tools.get_tool_by_id(tool_id) | |
| if not tool: | |
| raise Exception(f"Tool not found: {tool_id}") | |
| content = tool.content | |
| new_content = replace_imports(content) | |
| if new_content != content: | |
| content = new_content | |
| # Update the tool content in the database | |
| Tools.update_tool_by_id(tool_id, {"content": content}) | |
| if ( | |
| hasattr(request.app.state, "TOOL_CONTENTS") | |
| and tool_id in request.app.state.TOOL_CONTENTS | |
| ) and ( | |
| hasattr(request.app.state, "TOOLS") and tool_id in request.app.state.TOOLS | |
| ): | |
| if request.app.state.TOOL_CONTENTS[tool_id] == content: | |
| return request.app.state.TOOLS[tool_id], None | |
| tool_module, frontmatter = load_tool_module_by_id(tool_id, content) | |
| else: | |
| if hasattr(request.app.state, "TOOLS") and tool_id in request.app.state.TOOLS: | |
| return request.app.state.TOOLS[tool_id], None | |
| tool_module, frontmatter = load_tool_module_by_id(tool_id) | |
| if not hasattr(request.app.state, "TOOLS"): | |
| request.app.state.TOOLS = {} | |
| if not hasattr(request.app.state, "TOOL_CONTENTS"): | |
| request.app.state.TOOL_CONTENTS = {} | |
| request.app.state.TOOLS[tool_id] = tool_module | |
| request.app.state.TOOL_CONTENTS[tool_id] = content | |
| return tool_module, frontmatter | |
| def get_function_module_from_cache(request, function_id, load_from_db=True): | |
| if load_from_db: | |
| # Always load from the database by default | |
| # This is useful for hooks like "inlet" or "outlet" where the content might change | |
| # and we want to ensure the latest content is used. | |
| function = Functions.get_function_by_id(function_id) | |
| if not function: | |
| raise Exception(f"Function not found: {function_id}") | |
| content = function.content | |
| new_content = replace_imports(content) | |
| if new_content != content: | |
| content = new_content | |
| # Update the function content in the database | |
| Functions.update_function_by_id(function_id, {"content": content}) | |
| if ( | |
| hasattr(request.app.state, "FUNCTION_CONTENTS") | |
| and function_id in request.app.state.FUNCTION_CONTENTS | |
| ) and ( | |
| hasattr(request.app.state, "FUNCTIONS") | |
| and function_id in request.app.state.FUNCTIONS | |
| ): | |
| if request.app.state.FUNCTION_CONTENTS[function_id] == content: | |
| return request.app.state.FUNCTIONS[function_id], None, None | |
| function_module, function_type, frontmatter = load_function_module_by_id( | |
| function_id, content | |
| ) | |
| else: | |
| # Load from cache (e.g. "stream" hook) | |
| # This is useful for performance reasons | |
| if ( | |
| hasattr(request.app.state, "FUNCTIONS") | |
| and function_id in request.app.state.FUNCTIONS | |
| ): | |
| return request.app.state.FUNCTIONS[function_id], None, None | |
| function_module, function_type, frontmatter = load_function_module_by_id( | |
| function_id | |
| ) | |
| if not hasattr(request.app.state, "FUNCTIONS"): | |
| request.app.state.FUNCTIONS = {} | |
| if not hasattr(request.app.state, "FUNCTION_CONTENTS"): | |
| request.app.state.FUNCTION_CONTENTS = {} | |
| request.app.state.FUNCTIONS[function_id] = function_module | |
| request.app.state.FUNCTION_CONTENTS[function_id] = content | |
| return function_module, function_type, frontmatter | |
| def install_frontmatter_requirements(requirements: str): | |
| if OFFLINE_MODE: | |
| log.info("Offline mode enabled, skipping installation of requirements.") | |
| return | |
| if requirements: | |
| try: | |
| req_list = [req.strip() for req in requirements.split(",")] | |
| log.info(f"Installing requirements: {' '.join(req_list)}") | |
| subprocess.check_call( | |
| [sys.executable, "-m", "pip", "install"] | |
| + PIP_OPTIONS | |
| + req_list | |
| + PIP_PACKAGE_INDEX_OPTIONS | |
| ) | |
| except Exception as e: | |
| log.error(f"Error installing packages: {' '.join(req_list)}") | |
| raise e | |
| else: | |
| log.info("No requirements found in frontmatter.") | |
| def install_tool_and_function_dependencies(): | |
| """ | |
| Install all dependencies for all admin tools and active functions. | |
| By first collecting all dependencies from the frontmatter of each tool and function, | |
| and then installing them using pip. Duplicates or similar version specifications are | |
| handled by pip as much as possible. | |
| """ | |
| function_list = Functions.get_functions(active_only=True) | |
| tool_list = Tools.get_tools() | |
| all_dependencies = "" | |
| try: | |
| for function in function_list: | |
| frontmatter = extract_frontmatter(replace_imports(function.content)) | |
| if dependencies := frontmatter.get("requirements"): | |
| all_dependencies += f"{dependencies}, " | |
| for tool in tool_list: | |
| # Only install requirements for admin tools | |
| if tool.user and tool.user.role == "admin": | |
| frontmatter = extract_frontmatter(replace_imports(tool.content)) | |
| if dependencies := frontmatter.get("requirements"): | |
| all_dependencies += f"{dependencies}, " | |
| install_frontmatter_requirements(all_dependencies.strip(", ")) | |
| except Exception as e: | |
| log.error(f"Error installing requirements: {e}") | |