|
|
import random |
|
|
import inspect |
|
|
import re |
|
|
|
|
|
from llama_cpp import Llama |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_weather(location: str) -> str: |
|
|
"""This tool returns the current weather situation. |
|
|
Args: |
|
|
location: The city or place to chec |
|
|
Returns: |
|
|
str: Weather situation (e.g. cloudy, rainy, sunny) |
|
|
""" |
|
|
weather_situations = ["cloudy", "rainy", "sunny", "foobar"] |
|
|
return random.choice(weather_situations) |
|
|
|
|
|
def get_temperature(location: str) -> str: |
|
|
"""This tool returns the current temperature. |
|
|
Args: |
|
|
location: The city or place to check |
|
|
Returns: |
|
|
str: Temperature |
|
|
""" |
|
|
temperature = ["-10", "0", "20", "30"] |
|
|
return random.choice(temperature) |
|
|
|
|
|
def get_weather_forecast(location: str, days_ahead: str) -> str: |
|
|
"""This tool returns the weather forecast for the specified days ahead. |
|
|
Args: |
|
|
location: The city or place to check |
|
|
days_ahead: How many days ahead of today |
|
|
Returns: |
|
|
str: Weather situation (e.g. cloudy, rainy, sunny) |
|
|
""" |
|
|
test = "Storm" |
|
|
if days_ahead > 0: |
|
|
test = "Thunderstorm" |
|
|
elif days_ahead > 10: |
|
|
test = "Hurricane" |
|
|
return test |
|
|
|
|
|
|
|
|
TOOLS = [get_weather, get_temperature, get_weather_forecast] |
|
|
TOOL_REGISTRY = {f.__name__: f for f in TOOLS} |
|
|
|
|
|
|
|
|
def function_to_json(func) -> dict: |
|
|
""" |
|
|
Converts a Python function into a JSON-serializable dictionary |
|
|
that describes the function's signature, including its name, |
|
|
description, and parameters. |
|
|
""" |
|
|
type_map = { |
|
|
str: "string", |
|
|
int: "integer", |
|
|
float: "number", |
|
|
bool: "boolean", |
|
|
list: "array", |
|
|
dict: "object", |
|
|
type(None): "null", |
|
|
} |
|
|
|
|
|
try: |
|
|
signature = inspect.signature(func) |
|
|
except ValueError as e: |
|
|
raise ValueError( |
|
|
f"Failed to get signature for function {func.__name__}: {str(e)}" |
|
|
) |
|
|
|
|
|
parameters = {} |
|
|
for param in signature.parameters.values(): |
|
|
param_type = type_map.get(param.annotation, "string") |
|
|
parameters[param.name] = {"type": param_type} |
|
|
|
|
|
required = [ |
|
|
param.name |
|
|
for param in signature.parameters.values() |
|
|
if param.default == inspect._empty |
|
|
] |
|
|
|
|
|
return { |
|
|
"type": "function", |
|
|
"function": { |
|
|
"name": func.__name__, |
|
|
"description": func.__doc__ or "", |
|
|
"parameters": { |
|
|
"type": "object", |
|
|
"properties": parameters, |
|
|
"required": required, |
|
|
}, |
|
|
}, |
|
|
} |
|
|
|
|
|
|
|
|
TOOLS_SCHEMA = [function_to_json(f) for f in TOOLS] |
|
|
|
|
|
|
|
|
def parse_tool_calls(tool_output: str): |
|
|
""" |
|
|
Very simple parser for outputs like: |
|
|
[get_weather(location="Berlin")] |
|
|
Returns a list of (func_name, kwargs) tuples. |
|
|
""" |
|
|
calls = [] |
|
|
|
|
|
for match in re.finditer(r"(\w+)\((.*?)\)", tool_output, re.DOTALL): |
|
|
func_name, arg_str = match.groups() |
|
|
func_name = func_name.strip() |
|
|
kwargs = {} |
|
|
|
|
|
arg_str = arg_str.strip() |
|
|
if arg_str: |
|
|
parts = re.split(r",\s*", arg_str) |
|
|
for part in parts: |
|
|
if "=" not in part: |
|
|
continue |
|
|
key, val = part.split("=", 1) |
|
|
key = key.strip() |
|
|
val = val.strip().strip('"').strip("'") |
|
|
|
|
|
|
|
|
try: |
|
|
if "." in val: |
|
|
parsed_val = float(val) |
|
|
else: |
|
|
parsed_val = int(val) |
|
|
except ValueError: |
|
|
parsed_val = val |
|
|
kwargs[key] = parsed_val |
|
|
|
|
|
calls.append((func_name, kwargs)) |
|
|
|
|
|
return calls |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_non_stream(llm, prompt, max_tokens=256, temperature=0.2, top_p=0.95): |
|
|
"""One-shot generation for internal agent/tool prompts.""" |
|
|
out = llm( |
|
|
prompt, |
|
|
max_tokens=max_tokens, |
|
|
temperature=temperature, |
|
|
top_p=top_p, |
|
|
stop=["User:", "System:"], |
|
|
stream=False, |
|
|
) |
|
|
return out["choices"][0]["text"] |
|
|
|
|
|
|
|
|
def build_prompt(system_message, history, user_message): |
|
|
prompt = f"System: {system_message}\n" |
|
|
for turn in history: |
|
|
role = turn["role"] |
|
|
content = turn["content"] |
|
|
prompt += f"{role.capitalize()}: {content}\n" |
|
|
prompt += f"User: {user_message}\nAssistant:" |
|
|
return prompt |
|
|
|
|
|
|
|
|
def select_tools_with_llm(llm, user_message: str) -> list: |
|
|
""" |
|
|
Ask the model which tools to call. |
|
|
Returns a list of (func_name, kwargs) from parse_tool_calls. |
|
|
""" |
|
|
tool_selection_system = f""" |
|
|
You are an expert in composing functions. |
|
|
You are given a user question and a set of possible functions (tools). |
|
|
|
|
|
Your job is to decide which tools to call and with what arguments. |
|
|
|
|
|
Rules: |
|
|
- If you decide to invoke any function(s), you MUST put them in the format: |
|
|
[func_name1(param1=value1, param2=value2), func_name2(param1=value1)] |
|
|
- If none of the functions are suitable, respond with: [] |
|
|
- Do NOT include any explanation or extra text, only the list. |
|
|
- If the question lacks required parameters, respond with []. |
|
|
|
|
|
Here is a list of functions in JSON format that you can invoke: |
|
|
{TOOLS_SCHEMA} |
|
|
""" |
|
|
|
|
|
prompt = ( |
|
|
f"System: {tool_selection_system}\n" |
|
|
f"User: {user_message}\n" |
|
|
f"Assistant:" |
|
|
) |
|
|
|
|
|
raw = generate_non_stream( |
|
|
llm, |
|
|
prompt, |
|
|
max_tokens=256, |
|
|
temperature=0.2, |
|
|
top_p=0.95, |
|
|
) |
|
|
|
|
|
return parse_tool_calls(raw) |
|
|
|
|
|
|
|
|
def call_tools(tool_calls): |
|
|
""" |
|
|
Execute the tools chosen by the model. |
|
|
Returns a list of dicts: {name, args, result}. |
|
|
""" |
|
|
results = [] |
|
|
for func_name, kwargs in tool_calls: |
|
|
func = TOOL_REGISTRY.get(func_name) |
|
|
if func is None: |
|
|
results.append( |
|
|
{ |
|
|
"name": func_name, |
|
|
"args": kwargs, |
|
|
"result": f"Unknown tool '{func_name}'.", |
|
|
} |
|
|
) |
|
|
continue |
|
|
|
|
|
try: |
|
|
res = func(**kwargs) |
|
|
except Exception as e: |
|
|
res = f"Error while calling {func_name}: {e}" |
|
|
|
|
|
results.append({"name": func_name, "args": kwargs, "result": res}) |
|
|
return results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def respond(message, history, system_message, llm): |
|
|
|
|
|
tool_calls = select_tools_with_llm(llm, message) |
|
|
tool_results = call_tools(tool_calls) if tool_calls else [] |
|
|
|
|
|
|
|
|
if tool_results: |
|
|
tool_info_str = "\nYou have executed the following tools (name, args, result):\n" |
|
|
for tr in tool_results: |
|
|
tool_info_str += f"- {tr['name']}({tr['args']}) -> {tr['result']}\n" |
|
|
final_system_message = system_message + tool_info_str |
|
|
else: |
|
|
final_system_message = system_message |
|
|
|
|
|
|
|
|
prompt = build_prompt(final_system_message, history, message) |
|
|
|
|
|
stream = llm( |
|
|
prompt, |
|
|
max_tokens=256, |
|
|
temperature=0.7, |
|
|
top_p=0.9, |
|
|
stop=["User:", "System:"], |
|
|
stream=True, |
|
|
) |
|
|
|
|
|
partial = "" |
|
|
for out in stream: |
|
|
token = out["choices"][0]["text"] |
|
|
partial += token |
|
|
yield partial |
|
|
|