castlebbs's picture
Add video
80bb04a
from gradio import ChatMessage
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.tools import tool
import pandas as pd
from youtube_search import YoutubeSearch
import gradio as gr
import asyncio
import re
import uuid
import json
from pathlib import Path
from dtc_display import DTCDisplay
from tools import search_youtube_video, decode_vin, hex_to_decimal, calculate_obd_value, combine_bytes
from prompts import system_prompt
from hackathon_detail import create_hackathon_detail_tab
from demo_video import create_demo_video_tab
from mcp_server_detail import create_mcp_server_tab
# from langchain_anthropic import ChatAnthropic
# model = ChatAnthropic(model="claude-haiku-4-5-20251001", temperature=0, streaming=True) # streaming=True is required for token streaming
from langchain_nebius import ChatNebius
model = ChatNebius(
model="Qwen/Qwen3-Coder-480B-A35B-Instruct",
# model = "Qwen/Qwen3-235B-A22B-Instruct-2507",
# model="meta-llama/Llama-3.3-70B-Instruct", # Choose from available models
# model="deepseek-ai/DeepSeek-R1-0528", # Choose from available models
temperature=0,
top_p=0.95,
streaming=True,
)
# Global variables
agent = None
mcp_error = None
checkpointer = None
historic_data = None
dtc_component = None
async def setup_agent():
global agent, mcp_error, checkpointer
try:
client = MultiServerMCPClient(
{
"diagnostic": {
"transport": "streamable_http",
# "url": "http://192.168.10.69/mcp", # Actual physical dongle MCP server
"url": "https://castlebbs-elm327-simulator.hf.space/gradio_api/mcp/", # Simulator MCP server on gradio
"timeout": 20.0, # Increased timeout for multiple calls
}
}
)
tools = await client.get_tools()
print("Available MCP tools:", [tool.name for tool in tools])
# Add YouTube search tool to the tools list
all_tools = list(tools) + [search_youtube_video, decode_vin, hex_to_decimal, calculate_obd_value, combine_bytes]
# Create the checkpointer
checkpointer = InMemorySaver()
# Create the agent with the retrieved tools
agent = create_agent(
model=model,
system_prompt=system_prompt,
tools=all_tools,
checkpointer=checkpointer,
)
print("βœ… MCP server connected successfully")
return agent
except Exception as e:
mcp_error = str(e)
print(f"⚠️ Warning: Could not connect to MCP server: {e}")
print("The app will start but MCP tools will not be available.")
return None
# Try to setup agent, but don't fail if it doesn't work
try:
agent = asyncio.run(setup_agent())
except Exception as e:
print(f"Failed to initialize agent: {e}")
async def interact_with_langchain_agent(user_prompt, messages, session_id, historic_data_state, vin_data_state, dtc_data_state):
global agent, mcp_error
# Skip if input is empty or only whitespace
if not user_prompt or not user_prompt.strip():
yield messages, historic_data_state, vin_data_state, dtc_data_state
return
print("> ", user_prompt)
messages.append(ChatMessage(role="user", content=user_prompt))
yield messages, historic_data_state, vin_data_state, dtc_data_state
# Check if agent is available
if agent is None:
error_msg = f"❌ MCP server is not available. {mcp_error or 'Connection failed.'}\n\nPlease ensure MCP server is reachable and try again."
messages.append(ChatMessage(role="assistant", content=error_msg))
yield messages, historic_data_state, vin_data_state, dtc_data_state
return
# print("Agent is available, processing the request...")
try:
assistant_response = "" # Collect assistant's full response
current_message_index = None # Track which message we're streaming to
tool_call_tracker = {} # Track tool calls: {tool_call_id: message_index}
tool_args_accumulator = {} # NEW: Accumulate streaming tool arguments
current_tool_call_id = None # Track the active tool call being streamed
dtc_extracted = False # Track if we've already extracted DTC data
# Stream with messages mode for token-by-token streaming
async for message_chunk, metadata in agent.astream(
{"messages": [{"role": "user", "content": user_prompt}]},
{"configurable": {"thread_id": session_id}},
stream_mode="messages"
):
node = metadata.get("langgraph_node", "unknown")
# print(f"Token from node {node}: {message_chunk}")
# Only process chunks from the model node (AI responses)
if node == "model":
# Check if this is the last chunk in a tool call sequence
is_last_chunk = hasattr(message_chunk, "response_metadata") and message_chunk.response_metadata.get("finish_reason") == "tool_calls"
# Check if this is a tool call or text content
if hasattr(message_chunk, "tool_call_chunks") and message_chunk.tool_call_chunks:
# Handle tool call chunks
for tool_chunk in message_chunk.tool_call_chunks:
tool_call_id = tool_chunk.get("id")
tool_name = tool_chunk.get("name")
tool_args = tool_chunk.get("args")
# Determine active ID using pending key for chunks without ID
if not tool_call_id:
# No ID yet, use default key
active_id = "pending_tool_0"
else:
# We have an ID now - this is a new tool call
active_id = tool_call_id
# If this is a brand new tool call with a name, reset for fresh start
if tool_name and tool_call_id not in tool_call_tracker:
# Clear any stale pending data
if "pending_tool_0" in tool_args_accumulator:
del tool_args_accumulator["pending_tool_0"]
# Initialize fresh accumulator for this tool call
tool_args_accumulator[tool_call_id] = ""
current_tool_call_id = tool_call_id
# Migrate any pending args to the real ID
elif "pending_tool_0" in tool_args_accumulator:
if tool_call_id not in tool_args_accumulator:
tool_args_accumulator[tool_call_id] = ""
tool_args_accumulator[tool_call_id] = tool_args_accumulator["pending_tool_0"] + tool_args_accumulator.get(tool_call_id, "")
del tool_args_accumulator["pending_tool_0"]
current_tool_call_id = tool_call_id
# Initialize accumulator if needed
if active_id not in tool_args_accumulator:
tool_args_accumulator[active_id] = ""
# Accumulate arguments
if tool_args:
tool_args_accumulator[active_id] += tool_args
# Create message when we first see a tool call with name and ID
if tool_call_id and tool_name:
# Check if this is a reused ID (same ID appearing again)
if tool_call_id in tool_call_tracker:
# This ID was used before - check if previous call is complete
prev_msg_idx = tool_call_tracker[tool_call_id]
if prev_msg_idx < len(messages) and messages[prev_msg_idx].content:
# Previous call has results, this is a NEW call with reused ID
# Reset the accumulator and create a new message
tool_args_accumulator[tool_call_id] = ""
messages.append(ChatMessage(
role="assistant",
content="",
metadata={"title": f"βš™οΈ Calling {tool_name}...", "tool_id": tool_call_id, "tool_name": tool_name},
))
tool_call_tracker[tool_call_id] = len(messages) - 1
yield messages, historic_data_state, vin_data_state, dtc_data_state
else:
# Truly new tool call ID
messages.append(ChatMessage(
role="assistant",
content="",
metadata={"title": f"βš™οΈ Calling {tool_name}...", "tool_id": tool_call_id, "tool_name": tool_name},
))
tool_call_tracker[tool_call_id] = len(messages) - 1
yield messages, historic_data_state, vin_data_state, dtc_data_state
# Update the message as args stream in OR on the last chunk
# Check both active_id and current_tool_call_id in case pending args exist
target_id = active_id if active_id in tool_call_tracker else current_tool_call_id
if target_id and target_id in tool_call_tracker:
# Try to get args from either active_id or pending
accumulated_args = tool_args_accumulator.get(active_id, "") or tool_args_accumulator.get("pending_tool_0", "")
if accumulated_args:
try:
# Try to parse accumulated args
args_dict = json.loads(accumulated_args)
# Get the first parameter value (works for any parameter name)
param_value = "..."
if args_dict:
# Get the first non-empty value from the dict
param_value = next((str(v) for v in args_dict.values() if v), "...")
# Update the message title with the actual parameter value
msg_index = tool_call_tracker[target_id]
tool_name = messages[msg_index].metadata.get("tool_name", "")
messages[msg_index].metadata["title"] = f"βš™οΈ Calling {tool_name}: <b>{param_value}</b>"
yield messages, historic_data_state, vin_data_state, dtc_data_state
except json.JSONDecodeError:
# Args still streaming, don't update yet
pass
# Force update on the last chunk to ensure we catch the final args
if is_last_chunk and current_tool_call_id and current_tool_call_id in tool_call_tracker:
# Check both the tool ID accumulator and pending
accumulated_args = tool_args_accumulator.get(current_tool_call_id, "") or tool_args_accumulator.get("pending_tool_0", "")
if accumulated_args:
try:
args_dict = json.loads(accumulated_args)
param_value = "..."
if args_dict:
param_value = next((str(v) for v in args_dict.values() if v), "...")
msg_index = tool_call_tracker[current_tool_call_id]
tool_name = messages[msg_index].metadata.get("tool_name", "")
messages[msg_index].metadata["title"] = f"βš™οΈ Calling {tool_name}: <b>{param_value}</b>"
yield messages, historic_data_state, vin_data_state, dtc_data_state
except json.JSONDecodeError:
pass
elif hasattr(message_chunk, "content") and message_chunk.content:
# Handle text streaming token by token
text_content = message_chunk.content
if isinstance(text_content, str) and text_content:
# Accumulate the full response for DTC extraction
if current_message_index is None:
assistant_response = text_content
else:
assistant_response += text_content
# Check for complete DTC data block and extract it (only once)
if not dtc_extracted:
dtc_match = re.search(r'<DTC_DATA>\s*(\[.*?\])\s*</DTC_DATA>',
assistant_response, re.DOTALL)
if dtc_match:
try:
# Extract and parse the DTC JSON
dtc_json = json.loads(dtc_match.group(1))
dtc_data_state = dtc_json
dtc_extracted = True # Mark as extracted
print(f"βœ… Extracted DTC data from LLM response: {dtc_json}")
except json.JSONDecodeError as e:
print(f"⚠️ Could not parse DTC data from LLM response: {e}")
# Remove the DTC_DATA block from display text (only from accumulated response)
display_response = re.sub(r'<DTC_DATA>.*?</DTC_DATA>', '',
assistant_response, flags=re.DOTALL)
# Stream the token (calculate what to add by comparing with previous content)
if current_message_index is None:
# Start a new assistant message with clean content
messages.append(ChatMessage(role="assistant", content=display_response))
current_message_index = len(messages) - 1
else:
# Update the message with the clean accumulated content
messages[current_message_index].content = display_response
yield messages, historic_data_state, vin_data_state, dtc_data_state
elif isinstance(text_content, list):
# Handle content blocks
for block in text_content:
if isinstance(block, dict) and block.get("type") == "text":
text = block.get("text", "")
if text:
# Accumulate the full response for DTC extraction
assistant_response += text
# Check for complete DTC data block and extract it (only once)
if not dtc_extracted:
dtc_match = re.search(r'<DTC_DATA>\s*(\[.*?\])\s*</DTC_DATA>',
assistant_response, re.DOTALL)
if dtc_match:
try:
# Extract and parse the DTC JSON
dtc_json = json.loads(dtc_match.group(1))
dtc_data_state = dtc_json
dtc_extracted = True # Mark as extracted
print(f"βœ… Extracted DTC data from LLM response: {dtc_json}")
except json.JSONDecodeError as e:
print(f"⚠️ Could not parse DTC data from LLM response: {e}")
# Remove the DTC_DATA block from display (only from accumulated response)
display_response = re.sub(r'<DTC_DATA>.*?</DTC_DATA>', '',
assistant_response, flags=re.DOTALL)
if current_message_index is None:
messages.append(ChatMessage(role="assistant", content=display_response))
current_message_index = len(messages) - 1
else:
messages[current_message_index].content = display_response
yield messages, historic_data_state, vin_data_state, dtc_data_state
elif node == "tools":
# Reset current message index when we move to tool execution
current_message_index = None
# Handle tool outputs
if hasattr(message_chunk, "content"):
tool_output = message_chunk.content
tool_call_id = message_chunk.tool_call_id if hasattr(message_chunk, "tool_call_id") else None
# Decode escaped newlines for proper display
if isinstance(tool_output, str):
tool_output = tool_output.replace('\\n', '\n')
# Replace "?" with "NO DATA"
if tool_output.strip() == "?":
tool_output = "NO DATA"
# Update the corresponding tool call message
if tool_call_id and tool_call_id in tool_call_tracker:
msg_index = tool_call_tracker[tool_call_id]
# Check if this is historic data tool
tool_metadata = messages[msg_index].metadata or {}
tool_name = tool_metadata.get("tool_name", "")
# Check if toolname ends with "get_elm327_history"
if tool_name.endswith("get_elm327_history") and isinstance(tool_output, str):
try:
# Parse the JSON data
historic_data_state = json.loads(tool_output)
print(f"Captured historic data: {historic_data_state}")
except json.JSONDecodeError:
print(f"Could not parse historic data as JSON")
# Check if this is VIN decode tool
if tool_name == "decode_vin" and isinstance(tool_output, str):
# Store the VIN decode output
vin_data_state = tool_output
print(f"Captured VIN data: {vin_data_state}")
messages[msg_index].content = f"{tool_output}\n"
else:
# Fallback: add as new message
messages.append(ChatMessage(
role="assistant",
content=f"{tool_output}\n",
))
yield messages, historic_data_state, vin_data_state, dtc_data_state
except ExceptionGroup as eg:
# Handle Python 3.11+ ExceptionGroup from TaskGroup
error_details = []
for e in eg.exceptions:
error_details.append(f" - {type(e).__name__}: {str(e)}")
error_message = f"❌ Multiple errors occurred:\n" + "\n".join(error_details[:3]) # Show first 3
if len(eg.exceptions) > 3:
error_message += f"\n ... and {len(eg.exceptions) - 3} more"
error_message += "\n\nπŸ’‘ This usually means the elm327 device is unreachable or timing out."
messages.append(ChatMessage(role="assistant", content=error_message))
yield messages, historic_data_state, vin_data_state, dtc_data_state
except Exception as e:
error_message = f"❌ Error: {type(e).__name__}: {str(e)}\n\nπŸ’‘ Make sure MCP server is online and responding."
messages.append(ChatMessage(role="assistant", content=error_message))
print(f"Error during agent interaction: {e}")
yield messages, historic_data_state, vin_data_state, dtc_data_state
def clear_history():
global checkpointer
# Clear the checkpointer memory if it exists
if checkpointer is not None:
# InMemorySaver stores data in checkpointer.storage dict
checkpointer.storage.clear()
print("βœ… Checkpointer memory cleared")
# Return empty states for chatbot, historic_data, vin_data, dtc_data, plot, and vin_dataframe
empty_vin_df = pd.DataFrame({
'Property': ["NO DATA"],
'Value': ['']
})
return [], None, None, None, None, empty_vin_df, []
def update_vin_dataframe(vin_data_state):
"""Update the VIN dataframe from the state data."""
if not vin_data_state:
# Return empty dataframe with default structure
return pd.DataFrame({
'Property': [ "NO DATA" ],
'Value': [ '' ]
})
# Parse the VIN data (it comes as a formatted string from the tool)
# Extract property-value pairs from the string
df_data = {
'Property': [
'Make',
'Model',
'Model Year',
'Vehicle Type',
'Body Class',
'Manufacturer Name',
'Plant City',
'Plant State',
'Plant Country',
'Trim',
'Engine Number of Cylinders',
'Displacement (L)',
'Engine Model',
'Fuel Type - Primary',
'Fuel Type - Secondary',
'Electrification Level',
'Transmission Style',
'Drive Type',
'Number of Doors',
'Number of Seats',
'Gross Vehicle Weight Rating From',
'Error Code',
'Error Text'
],
'Value': [''] * 23
}
# Parse the VIN data string and populate values
for line in vin_data_state.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
value = value.strip()
if key in df_data['Property']:
idx = df_data['Property'].index(key)
df_data['Value'][idx] = value
return pd.DataFrame(df_data)
#method to create a plot
def create_plot(data):
if data is None or len(data) == 0:
# print("No historic data to plot.")
return None
print("Creating plot with data:", data)
# Create DataFrame from actual historic data
df = pd.DataFrame(data)
# Create long format DataFrame with all three metrics
df_combined = pd.DataFrame({
'time': list(df['time']) + list(df['time']) + list(df['time']),
'value': list(df['rpm']) + list(df['speed']) + list(df['coolant_temp']),
'metric': ['RPM'] * len(df) + ['Speed'] * len(df) + ['Coolant Temp'] * len(df)
})
# plot = gr.LinePlot(
# value=df_combined,
# x="time",
# y="value",
# color="metric",
# x_title="Time (ms)",
# y_title="Value",
# title="Vehicle Telemetry",
# colors_in_legend=["RPM", "Speed", "Coolant Temp"],
# color_map={"RPM": "red", "Speed": "blue", "Coolant Temp": "green"}
# )
# return plot
return df_combined
def update_dtc(dtc_data):
if dtc_data is None or len(dtc_data) == 0:
# print("No DTC data to update.")
return []
print("Updating DTC component with data:", dtc_data)
return dtc_data
# Gradio UI setup
with gr.Blocks() as demo:
input = None
chatbot = None
submit_btn = None
clear_btn = None
examples = None
plot_output = None
vin_dataframe = None
# Create a unique session ID for each user session
session_id = gr.State(lambda: str(uuid.uuid4()))
historic_data_state = gr.State(None) # State to hold historic data
dtc_data_state = gr.State(None) # State to hold DTC data
vin_data_state = gr.State(None) # State to hold VIN decode data
with gr.Row(equal_height=True, elem_classes="vertical-center"):
gr.Image("mechanic_s.png", height=60, width=60, show_label=False, container=False, buttons=[""], scale=0, min_width=0, elem_classes="logo-image")
with gr.Column(scale=10, elem_classes="big-title"):
gr.Markdown("## Vehicle Diagnostic Assistant πŸš— πŸ› οΈ")
with gr.Row( elem_classes="ask-me"):
gr.Markdown("AI agent that connects to your car via an embedded MCP server to provide real-time diagnostics and insights.")
with gr.Tabs():
with gr.Tab("Main Dashboard"):
with gr.Row():
with gr.Column(scale=6, min_width=600):
with gr.Row():
chatbot = gr.Chatbot(
label="Diagnostic Agent",
avatar_images=(
None,
"mechanic_s.png",
),
height=400,
group_consecutive_messages=False,
buttons=[""],
)
with gr.Row():
input = gr.Textbox(
lines=1,
label="Ask a question",
placeholder="e.g., Can you check for any faults? What's the engine RPM?",
show_label=True,
buttons=[""],
)
with gr.Row():
submit_btn = gr.Button("Run", size="lg", variant="primary", elem_classes="button_color")
clear_btn = gr.Button("Clear History", size="lg", variant="secondary")
# Example questions
gr.Examples(
examples=[
["Can you see if there are any faults using OBD-II?"],
["What's the vehicle VIN?"],
["Check the engine RPM and speed"],
["Read the temperature sensor"],
["Get system information"],
["Get the last 10 historic data"],
],
inputs=input
)
with gr.Column(scale=4, min_width=400, elem_classes="scroll-column"):
with gr.Column(min_width=400): # Added extra Column for scroll
with gr.Row():
with gr.Column():
gr.Markdown("### πŸ“Š Historic Vehicle Data", elem_classes="section-title")
plot_output = gr.LinePlot(
value=None,
x="time",
y="value",
color="metric",
x_title="Time (ms)",
y_title="Value",
title="Vehicle Telemetry",
colors_in_legend=["RPM", "Speed", "Coolant Temp"],
color_map={"RPM": "red", "Speed": "blue", "Coolant Temp": "green"},
label="Vehicle Telemetry"
)
with gr.Row():
with gr.Column():
dtc_component = DTCDisplay(
label="Diagnostic Trouble Codes",
value=[],
scale=1
)
with gr.Row():
with gr.Column():
gr.Markdown("### 🚘 Vehicle Information (VIN Decoded)", elem_classes="section-title")
vin_dataframe = gr.DataFrame(
value=pd.DataFrame({
'Property': [ "NO DATA" ],
'Value': [ '' ]
}),
headers=["Property", "Value"],
datatype=["str", "str"],
column_count=2,
interactive=False,
column_widths=["30%", "60%"],
max_height="100%",
wrap=True
)
with gr.Tab("Demo Video"):
create_demo_video_tab()
with gr.Tab("Hackathon project detail"):
create_hackathon_detail_tab()
with gr.Tab("MCP Server on embedded device"):
create_mcp_server_tab()
# Event handlers (must be outside the Row context but can reference components)
input.submit(
interact_with_langchain_agent,
[input, chatbot, session_id, historic_data_state, vin_data_state, dtc_data_state],
[chatbot, historic_data_state, vin_data_state, dtc_data_state],
trigger_mode="always_last"
).then(
lambda:"", None, input
).then(
create_plot, historic_data_state, plot_output
).then(
update_vin_dataframe, vin_data_state, vin_dataframe
).then(
update_dtc, dtc_data_state, dtc_component)
submit_btn.click(
interact_with_langchain_agent,
[input, chatbot, session_id, historic_data_state, vin_data_state, dtc_data_state],
[chatbot, historic_data_state, vin_data_state, dtc_data_state],
trigger_mode="always_last"
).then(
lambda:"", None, input
).then(
create_plot, historic_data_state, plot_output
).then(
update_vin_dataframe, vin_data_state, vin_dataframe
).then(
update_dtc, dtc_data_state, dtc_component)
clear_btn.click(clear_history, None, [chatbot, historic_data_state, vin_data_state, dtc_data_state, plot_output, vin_dataframe, dtc_component])
gr.set_static_paths(paths=[Path.cwd().absolute()/"assets"])
demo.launch(css_paths=["styles.css"])