monkey-mind / mcp_server.py
sxandie's picture
Update mcp_server.py
794fb9b verified
from datetime import datetime, timezone
from pathlib import Path
import os
import re
import shutil
import gradio as gr
from agent import (
analyze_github_repo,
analyze_local_repo,
build_experiment_from_report,
build_repo_vector_store,
fetch_youtube_transcript,
generate_youtube_study_notes,
rag_answer_from_store,
summarize_youtube_chapters,
)
from bookmarks import (
bookmark_repo_from_analysis,
find_metadata_by_label,
get_cache_dirs,
get_dropdown_options,
)
# File extensions and folders to ignore for local uploads
IGNORE_PATTERNS = {
'folders': {
'__pycache__', '.git', '.svn', '.hg', 'node_modules',
'venv', 'env', '.venv', '.env', 'dist', 'build',
'.idea', '.vscode', '.pytest_cache', '.mypy_cache',
'coverage', '.coverage', 'htmlcov', '.tox', 'eggs',
'.eggs', '*.egg-info', '.DS_Store'
},
'extensions': {
'.pyc', '.pyo', '.pyd', '.so', '.dll', '.dylib',
'.class', '.o', '.obj', '.exe', '.bin', '.lock',
'.log', '.tmp', '.temp', '.cache', '.bak', '.swp',
'.swo', '.DS_Store', '.gitignore'
}
}
def _should_ignore_path(path: Path) -> bool:
"""Check if a path should be ignored during local folder processing."""
for part in path.parts:
if part in IGNORE_PATTERNS['folders']:
return True
if part.startswith('.') and part not in {'.', '..'}:
return True
if path.suffix.lower() in IGNORE_PATTERNS['extensions']:
return True
return False
KNOWLEDGE_TRANSFER_ROOT = Path("Knowledge Transfer")
def _is_gradio_v6_or_newer() -> bool:
"""Return True if the installed Gradio major version is >= 6."""
version_str = getattr(gr, "__version__", "0")
try:
major = int(version_str.split(".")[0])
return major >= 6
except (ValueError, IndexError):
return False
IS_GRADIO_V6 = _is_gradio_v6_or_newer()
def run_github_ingestion(repo_url: str):
"""Analyze GitHub repository without indexing."""
repo_url = (repo_url or "").strip()
if not repo_url:
warning = "⚠️ Please paste a public GitHub repository URL to begin."
source_info = "**Source:** Not selected\n**Status:** ⏳ Pending\n**Chunks:** 0 vectors"
return warning, source_info, {
"analysis": None,
"vector_dir": "",
"vector_chunks": 0,
"summary_base": [],
}
result = analyze_github_repo(repo_url)
if "error" in result:
error_msg = f"❌ {result['error']}"
source_info = f"**Source:** {repo_url}\n**Status:** ❌ Error\n**Chunks:** 0 vectors"
return error_msg, source_info, {
"analysis": None,
"vector_dir": "",
"vector_chunks": 0,
"summary_base": [],
}
docs = result.get("documentation", [])
repo_name = result.get("repo_name", repo_url)
timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC")
# Source info for preview panel
source_info = f"""**Source:** GitHub Repository
**Repository:** {repo_name}
**Status:** βœ… Analyzed
**Documents:** {len(docs)} files
**Analyzed:** {timestamp}"""
# Document preview content
preview_sections = []
for doc in docs[:5]:
content = (doc.get("content") or "").strip()
if not content:
continue
snippet = content[:600]
if len(content) > 600:
snippet = snippet.rstrip() + " ..."
preview_sections.append(
f"### πŸ“„ {doc.get('path', 'document')}\n\n{snippet}"
)
preview_content = (
"\n\n---\n\n".join(preview_sections)
if preview_sections
else "*No textual documentation snippets were found.*"
)
state_payload = {
"analysis": result,
"vector_dir": "",
"vector_chunks": 0,
"summary_base": [repo_name, str(len(docs))],
"processed_timestamp": timestamp,
"indexed": False,
}
return preview_content, source_info, state_payload
def index_github_repo(state_payload: dict | None):
"""Index the analyzed GitHub repository for RAG."""
analysis_data = (state_payload or {}).get("analysis") if state_payload else None
if not analysis_data:
return "⚠️ Run analysis before indexing.", state_payload
if state_payload.get("indexed"):
return "βœ… Repository already indexed and ready for RAG queries.", state_payload
docs = analysis_data.get("documentation", [])
repo_name = analysis_data.get("repo_name", "repo")
repo_url = analysis_data.get("repo_url", "")
slug, cache_dir, cache_vector_dir = get_cache_dirs(repo_url, repo_name)
if cache_dir.exists():
shutil.rmtree(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
vector_chunks = 0
if docs:
_, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir)
vector_chunks = chunk_count
else:
return "⚠️ No documentation found to index.", state_payload
new_state = {
**state_payload,
"vector_dir": str(cache_vector_dir),
"vector_chunks": vector_chunks,
"indexed": True,
}
return f"βœ… Indexed {vector_chunks} vector chunks. Ready for RAG queries!", new_state
def bookmark_github_repo(state_payload: dict | None):
"""Bookmark and index the GitHub repository permanently."""
analysis_data = (state_payload or {}).get("analysis") if state_payload else None
if not analysis_data:
return "⚠️ Run analysis before bookmarking.", state_payload, gr.Dropdown()
docs = analysis_data.get("documentation", [])
if not docs:
return "⚠️ No documentation to bookmark.", state_payload, gr.Dropdown()
repo_url = analysis_data.get("repo_url") or analysis_data.get("repo_name")
# Build vector store if not already done
vector_dir = state_payload.get("vector_dir") if state_payload else ""
if not vector_dir:
repo_name = analysis_data.get("repo_name", "repo")
slug, cache_dir, cache_vector_dir = get_cache_dirs(repo_url, repo_name)
if cache_dir.exists():
shutil.rmtree(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
if docs:
_, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir)
vector_dir = str(cache_vector_dir)
else:
chunk_count = 0
else:
chunk_count = state_payload.get("vector_chunks", 0)
metadata = bookmark_repo_from_analysis(
repo_url,
analysis_data,
prebuilt_vector_dir=Path(vector_dir) if vector_dir else None,
prebuilt_chunks=chunk_count,
)
choices, metadata_list = get_dropdown_options()
dropdown_update = gr.Dropdown(
choices=choices,
value=metadata.dropdown_label,
interactive=True,
)
new_state = {
**state_payload,
"vector_dir": vector_dir,
"vector_chunks": chunk_count,
"indexed": True,
}
return f"πŸ’Ύ Repository bookmarked on {metadata.last_pulled_display}. Access it in the Chat tab!", new_state, dropdown_update
def run_youtube_ingestion(youtube_url: str):
"""Analyze YouTube video without indexing."""
youtube_url = (youtube_url or "").strip()
if not youtube_url:
warning = "⚠️ Paste a YouTube video URL to begin."
source_info = "**Source:** Not selected\n**Status:** ⏳ Pending\n**Chunks:** 0 vectors"
return warning, source_info, {
"analysis": None,
"vector_dir": "",
"vector_chunks": 0,
}
result = fetch_youtube_transcript(youtube_url)
if "error" in result:
error_msg = f"❌ {result['error']}"
source_info = f"**Source:** YouTube\n**Status:** ❌ Error\n**Chunks:** 0 vectors"
return error_msg, source_info, {
"analysis": None,
"vector_dir": "",
"vector_chunks": 0,
}
transcript = (result.get("raw_transcript") or "").strip()
if not transcript:
source_info = "**Source:** YouTube\n**Status:** ⚠️ No transcript\n**Chunks:** 0 vectors"
return "⚠️ No transcript text was returned.", source_info, {"analysis": None}
timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC")
video_url = result.get("url", youtube_url)
lang = result.get("lang", "en")
# Source info for preview panel
source_info = f"""**Source:** YouTube Video
**URL:** {video_url}
**Language:** {lang}
**Status:** βœ… Analyzed
**Analyzed:** {timestamp}"""
# Generate chapter summaries
chapters = summarize_youtube_chapters(transcript, url=video_url)
# Preview content with chapters
preview_content = f"""### πŸ“Ί Video Transcript Analysis
{chapters}
---
### πŸ“ Transcript Preview
{transcript[:2000]}{"..." if len(transcript) > 2000 else ""}
"""
state_payload = {
"analysis": {
"transcript": transcript,
"url": video_url,
"lang": lang,
"chapters": chapters,
},
"vector_dir": "",
"vector_chunks": 0,
"indexed": False,
}
return preview_content, source_info, state_payload
def index_youtube_video(state_payload: dict | None):
"""Index YouTube transcript for RAG."""
analysis_data = (state_payload or {}).get("analysis") if state_payload else None
if not analysis_data:
return "⚠️ Run analysis before indexing.", state_payload
if state_payload.get("indexed"):
return "βœ… Video already indexed and ready for RAG queries.", state_payload
transcript = analysis_data.get("transcript", "")
if not transcript:
return "⚠️ No transcript found to index.", state_payload
# Create pseudo-documents from transcript
docs = [{
"path": "transcript.txt",
"content": transcript,
"type": "transcript",
}]
url = analysis_data.get("url", "youtube")
slug, cache_dir, cache_vector_dir = get_cache_dirs(url, "youtube")
if cache_dir.exists():
shutil.rmtree(cache_dir)
cache_dir.mkdir(parents=True, exist_ok=True)
_, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir)
new_state = {
**state_payload,
"vector_dir": str(cache_vector_dir),
"vector_chunks": chunk_count,
"indexed": True,
}
return f"βœ… Indexed {chunk_count} transcript chunks. Ready for RAG queries!", new_state
def bookmark_youtube_video(state_payload: dict | None):
"""Bookmark YouTube video - persists transcript to bookmarks system."""
analysis_data = (state_payload or {}).get("analysis") if state_payload else None
if not analysis_data:
return "⚠️ Run analysis before bookmarking.", state_payload, gr.Dropdown()
transcript = analysis_data.get("transcript", "")
if not transcript:
return "⚠️ No transcript found to bookmark.", state_payload, gr.Dropdown()
video_url = analysis_data.get("url", "youtube-video")
chapters = analysis_data.get("chapters", "")
# Create pseudo-analysis structure compatible with bookmark_repo_from_analysis
pseudo_analysis = {
"repo_name": f"YouTube: {video_url[:50]}",
"repo_url": video_url,
"documentation": [
{
"path": "transcript.txt",
"content": transcript,
},
{
"path": "chapters.md",
"content": chapters,
}
],
}
# Use prebuilt vector store if already indexed
prebuilt_dir = None
prebuilt_chunks = None
if state_payload.get("indexed") and state_payload.get("vector_dir"):
prebuilt_dir = Path(state_payload["vector_dir"])
prebuilt_chunks = state_payload.get("vector_chunks", 0)
metadata = bookmark_repo_from_analysis(
video_url,
pseudo_analysis,
prebuilt_vector_dir=prebuilt_dir,
prebuilt_chunks=prebuilt_chunks,
)
choices, _ = get_dropdown_options()
dropdown_update = gr.Dropdown(choices=choices, value=metadata.dropdown_label)
new_state = {
**state_payload,
"vector_dir": metadata.vector_dir,
"vector_chunks": metadata.vector_chunks,
"indexed": True,
"bookmarked": True,
}
return f"πŸ”– YouTube video bookmarked! {metadata.vector_chunks} chunks indexed.", new_state, dropdown_update
def generate_youtube_transfer_report(youtube_url: str):
youtube_url = (youtube_url or "").strip()
if not youtube_url:
return "⚠️ Paste a YouTube video URL before generating a report."
result = fetch_youtube_transcript(youtube_url)
if "error" in result:
return f"❌ {result['error']}"
transcript = (result.get("raw_transcript") or "").strip()
if not transcript:
return "No transcript text was returned by the youtube-transcript MCP server; report generation was skipped."
chapters = summarize_youtube_chapters(transcript, url=result.get("url", youtube_url))
study_notes = generate_youtube_study_notes(chapters, url=result.get("url", youtube_url))
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
header_lines = [
"# YouTube Knowledge Transfer Report",
"",
f"- Source: {result.get('url', youtube_url)}",
f"- Language: {result.get('lang', 'en')}",
f"- Generated at: {generated_at}",
]
lines: list[str] = []
lines.extend(header_lines)
lines.append("")
lines.append("## 1. Topic & Chapter Outline")
lines.append("")
lines.append(chapters)
lines.append("")
lines.append("## 2. Study & Interview Guidance")
lines.append("")
lines.append(study_notes)
report_markdown = "\n".join(lines)
root = _ensure_knowledge_root()
youtube_root = root / "Youtube Video"
youtube_root.mkdir(parents=True, exist_ok=True)
slug = _slugify_name(result.get("url", youtube_url))
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
report_path = youtube_root / f"{slug}-youtube-knowledge-{ts}.md"
report_path.write_text(report_markdown, encoding="utf-8")
rel_path = report_path.relative_to(Path("."))
return f"πŸ“„ YouTube knowledge transfer report written to `{rel_path}`."
def _ensure_knowledge_root() -> Path:
KNOWLEDGE_TRANSFER_ROOT.mkdir(parents=True, exist_ok=True)
return KNOWLEDGE_TRANSFER_ROOT
def _slugify_name(name: str) -> str:
base = (name or "project").lower()
safe = re.sub(r"[^a-z0-9-]+", "-", base).strip("-")
return safe or "project"
def generate_knowledge_transfer_report(state_payload: dict | None):
analysis_data = (state_payload or {}).get("analysis") if state_payload else None
if not analysis_data:
return "⚠️ Run an analysis before generating a report."
repo_name = analysis_data.get("repo_name") or "Project"
repo_url = analysis_data.get("repo_url") or "local upload"
docs = analysis_data.get("documentation") or []
doc_count = len(docs)
structure = analysis_data.get("structure") or []
vector_dir = state_payload.get("vector_dir") if state_payload else ""
vector_chunks = state_payload.get("vector_chunks", 0) if state_payload else 0
summary_base = state_payload.get("summary_base", []) if state_payload else []
processed_timestamp = state_payload.get("processed_timestamp") if state_payload else None
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
header_lines = [
f"# Knowledge Transfer Report – {repo_name}",
"",
f"- Source: {repo_url}",
f"- Generated at: {generated_at}",
f"- Documentation files: {doc_count}",
f"- Vector chunks: {vector_chunks}",
]
if processed_timestamp:
header_lines.append(f"- Last analysis run: {processed_timestamp}")
overview_section = "\n".join(summary_base) if summary_base else "No high-level summary was captured during analysis."
llm_summary_section = ""
if vector_dir and vector_chunks:
try:
question = (
"Provide a detailed knowledge transfer summary of this repository. "
"Explain its purpose, main components, architecture, key dependencies, "
"and patterns that would be reusable in other projects. "
"Focus on actionable insights and how to extend or adapt this codebase."
)
llm_summary_section = rag_answer_from_store(Path(vector_dir), question, repo_summary=overview_section)
except Exception as err:
llm_summary_section = f"LLM summary unavailable due to error: {err}"
else:
llm_summary_section = "Vector store not available; running RAG-based summary was skipped."
max_items = 80
structure_snippet = "\n".join(structure[:max_items]) if structure else "No repository structure information was captured."
doc_paths = [d.get("path", "") for d in docs][:max_items]
docs_list_section = "\n".join(f"- {p}" for p in doc_paths) if doc_paths else "No documentation files were detected."
lines: list[str] = []
lines.extend(header_lines)
lines.append("")
lines.append("## 1. High-level Overview")
lines.append("")
lines.append(overview_section)
lines.append("")
lines.append("## 2. Repository Layout (snapshot)")
lines.append("")
lines.append("```")
lines.append(structure_snippet)
lines.append("```")
lines.append("")
lines.append("## 3. Documentation Files")
lines.append("")
lines.append(docs_list_section)
lines.append("")
lines.append("## 4. LLM Knowledge Summary")
lines.append("")
lines.append(llm_summary_section)
lines.append("")
lines.append("## 5. Notes for Future Reuse")
lines.append("")
lines.append(
"Use this report as a starting point when designing new projects. "
"Focus on reusing architecture patterns, utility modules, and any "
"documented best practices or workflows."
)
report_markdown = "\n".join(lines)
root = _ensure_knowledge_root()
slug = _slugify_name(repo_name)
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
report_path = root / f"{slug}-knowledge-transfer-{ts}.md"
report_path.write_text(report_markdown, encoding="utf-8")
rel_path = report_path.relative_to(Path("."))
return f"πŸ“„ Knowledge transfer report written to `{rel_path}`."
def _list_knowledge_report_choices() -> list[str]:
root = _ensure_knowledge_root()
reports = sorted(root.glob("*.md"))
return [report.name for report in reports]
def _refresh_lab_reports_dropdown():
return gr.Dropdown(
label="Knowledge Transfer report (optional)",
choices=_list_knowledge_report_choices(),
value=None,
interactive=True,
)
def load_study_deck_from_report(report_name: str | None) -> str:
if not report_name:
return "Select a Knowledge Transfer report above to view its study deck-style summary."
root = _ensure_knowledge_root()
report_path = root / report_name
if not report_path.exists():
return f"Report `{report_name}` was not found in the Knowledge Transfer folder."
try:
raw = report_path.read_text(encoding="utf-8", errors="ignore")
except OSError as err:
return f"Unable to read report `{report_name}`: {err}"
max_chars = 6000
snippet = raw[:max_chars]
if len(raw) > max_chars:
snippet = snippet.rstrip() + "\n\n... (truncated)"
return (
f"### Study Deck Β· {report_name}\n\n"
"Scroll through this condensed report to refresh yourself on the key concepts, "
"architecture, and reusable patterns for this project.\n\n"
f"```markdown\n{snippet}\n```"
)
def _derive_local_repo_root(uploaded: str | list[str] | None) -> Path | None:
"""Given a directory-style file upload, infer the repository root directory.
Gradio's File component with ``file_count="directory"`` returns a list of
filepaths under the uploaded folder (or a single filepath). We compute the
common parent directory and treat that as the repo root.
"""
if not uploaded:
return None
if isinstance(uploaded, str):
paths = [uploaded]
else:
paths = [p for p in uploaded if p]
if not paths:
return None
try:
common = os.path.commonpath(paths)
except ValueError:
return None
root = Path(common)
return root if root.exists() and root.is_dir() else None
def run_local_repo_ingestion(uploaded_folder):
"""Analyze local repository folder, filtering irrelevant files."""
repo_root = _derive_local_repo_root(uploaded_folder)
if not repo_root:
warning = "⚠️ Upload a project folder before running analysis."
source_info = "**Source:** Not selected\n**Status:** ⏳ Pending\n**Chunks:** 0 vectors"
return warning, source_info, {
"analysis": None,
"vector_dir": "",
"vector_chunks": 0,
"summary_base": [],
}
# Filter out irrelevant files before analysis
if isinstance(uploaded_folder, list):
filtered_files = [f for f in uploaded_folder if not _should_ignore_path(Path(f))]
if not filtered_files:
warning = "⚠️ No relevant files found after filtering."
source_info = "**Source:** Local\n**Status:** ⚠️ No files\n**Chunks:** 0 vectors"
return warning, source_info, {
"analysis": None,
"vector_dir": "",
"vector_chunks": 0,
"summary_base": [],
}
result = analyze_local_repo(str(repo_root))
if "error" in result:
error_msg = f"❌ {result['error']}"
source_info = f"**Source:** Local\n**Status:** ❌ Error\n**Chunks:** 0 vectors"
return error_msg, source_info, {
"analysis": None,
"vector_dir": "",
"vector_chunks": 0,
"summary_base": [],
}
docs = result.get("documentation", [])
repo_name = result.get("repo_name", repo_root.name)
timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC")
# Source info for preview panel
source_info = f"""**Source:** Local Project
**Folder:** {repo_name}
**Status:** βœ… Analyzed
**Documents:** {len(docs)} files
**Analyzed:** {timestamp}"""
# Document preview content
preview_sections = []
for doc in docs[:5]:
content = (doc.get("content") or "").strip()
if not content:
continue
snippet = content[:600]
if len(content) > 600:
snippet = snippet.rstrip() + " ..."
preview_sections.append(
f"### πŸ“„ {doc.get('path', 'document')}\n\n{snippet}"
)
preview_content = (
"\n\n---\n\n".join(preview_sections)
if preview_sections
else "*No textual documentation snippets were found.*"
)
state_payload = {
"analysis": result,
"vector_dir": "",
"vector_chunks": 0,
"summary_base": [repo_name, str(len(docs))],
"processed_timestamp": timestamp,
"indexed": False,
}
return preview_content, source_info, state_payload
def index_local_repo(state_payload: dict | None):
"""Index the analyzed local repository for RAG."""
return index_github_repo(state_payload) # Same logic
def bookmark_local_repo(state_payload: dict | None):
"""Bookmark and index the local repository permanently."""
return bookmark_github_repo(state_payload) # Same logic
def _format_bookmark_info(metadata: dict | None) -> str:
if not metadata:
return (
"No bookmarks yet. Process a repository in the *Process New Repository* tab, then bookmark it to enable RAG chat."
)
preview = (metadata.get("summary_preview") or "").strip()
if preview:
max_len = 600
if len(preview) > max_len:
preview_display = preview[:max_len].rstrip() + " ..."
else:
preview_display = preview
return (
f"### {metadata.get('repo_name', 'Saved Repository')}\n"
f"- URL: {metadata.get('repo_url', 'N/A')}\n"
f"- Last pulled: {metadata.get('last_pulled_display', '--/--/----')}\n"
f"- Documentation files: {metadata.get('docs_count', 0)}\n"
f"- Vector chunks: {metadata.get('vector_chunks', 0)}\n\n"
f"**Preview:**\n\n{preview_display}"
)
return (
f"### {metadata.get('repo_name', 'Saved Repository')}\n"
f"- URL: {metadata.get('repo_url', 'N/A')}\n"
f"- Last pulled: {metadata.get('last_pulled_display', '--/--/----')}\n"
f"- Documentation files: {metadata.get('docs_count', 0)}\n"
f"- Vector chunks: {metadata.get('vector_chunks', 0)}"
)
def _refresh_bookmarks(preselect: str | None = None):
choices, metadata_list = get_dropdown_options()
value = preselect if preselect and preselect in choices else (choices[0] if choices else None)
dropdown_update = gr.Dropdown(
choices=choices,
value=value,
interactive=bool(choices),
label="Bookmarked repositories",
allow_custom_value=True,
)
info = _format_bookmark_info(
find_metadata_by_label(value, metadata_list) if value else None
)
return dropdown_update, metadata_list, info
def load_bookmarks_on_start():
dropdown_update, metadata_list, info = _refresh_bookmarks()
status = "Bookmarks loaded." if metadata_list else "No bookmarks saved yet."
return dropdown_update, metadata_list, info, status
def _build_summary_from_base(base_lines: list[str], final_message: str) -> str:
if not base_lines:
return final_message
return "\n".join(base_lines + ["", final_message])
def bookmark_current_repo(state_payload: dict | None):
analysis_data = (state_payload or {}).get("analysis") if state_payload else None
if not analysis_data or not analysis_data.get("documentation"):
return (
"⚠️ Run an analysis before bookmarking a repository.",
gr.Dropdown(choices=[], value=None, interactive=False, label="Bookmarked repositories"),
[],
_format_bookmark_info(None),
_build_summary_from_base(
(state_payload or {}).get("summary_base", []),
"⚠️ Bookmark failed because no analysis data is available.",
),
state_payload,
)
repo_url = analysis_data.get("repo_url") or analysis_data.get("repo_name")
vector_dir = state_payload.get("vector_dir") if state_payload else ""
metadata = bookmark_repo_from_analysis(
repo_url,
analysis_data,
prebuilt_vector_dir=Path(vector_dir) if vector_dir else None,
prebuilt_chunks=state_payload.get("vector_chunks") if state_payload else None,
)
dropdown_update, metadata_list, info = _refresh_bookmarks(preselect=metadata.dropdown_label)
saved_msg = (
f"πŸ’Ύ Repo saved on {metadata.last_pulled_display}. Access it via the Bookmarked tab for RAG chat."
)
updated_summary = _build_summary_from_base(
state_payload.get("summary_base", []),
saved_msg,
)
new_state = {
**(state_payload or {}),
"summary_base": state_payload.get("summary_base", []),
"saved": True,
}
return saved_msg, dropdown_update, metadata_list, info, updated_summary, new_state
def update_selected_bookmark(label: str, metadata_list: list[dict]):
metadata = find_metadata_by_label(label, metadata_list or []) if label else None
return _format_bookmark_info(metadata)
def answer_bookmark_question(label: str, question: str, metadata_list: list[dict]):
if not label:
return "Select a bookmarked repository before asking a question."
if not question.strip():
return "Enter a question to query your bookmarked repository."
metadata = find_metadata_by_label(label, metadata_list or [])
if not metadata:
return "Bookmark metadata not found. Try refreshing bookmarks."
if metadata.get("vector_chunks", 0) == 0:
return "This bookmark has no vector store yet. Re-bookmark the repo to rebuild embeddings."
summary = (
f"Repository: {metadata.get('repo_name', label)}\n"
f"Docs: {metadata.get('docs_count', 0)} | Last pulled: {metadata.get('last_pulled_display', '--/--/----')}"
)
answer = rag_answer_from_store(Path(metadata["vector_dir"]), question, repo_summary=summary)
return answer
def placeholder_action_message(label: str):
if not label:
return "Select a bookmarked repository to use this action."
return f"Additional bookmark actions for **{label}** are coming soon."
def run_experimental_lab(intention: str, report_name: str | None):
text = (intention or "").strip()
if not text:
return "Describe what you want to build in the Experimental Lab to get started."
root = _ensure_knowledge_root()
report_markdown = ""
context_note = ""
if report_name:
report_path = root / report_name
if report_path.exists():
try:
raw = report_path.read_text(encoding="utf-8", errors="ignore")
report_markdown = raw
snippet = raw[:3000]
context_note = (
f"Using Knowledge Transfer report: `{report_name}` as reference.\n\n"
f"Snippet from report (truncated):\n\n```markdown\n{snippet}\n```\n\n"
)
except OSError:
context_note = f"Unable to read Knowledge Transfer report `{report_name}`. Proceeding without embedded context.\n\n"
build_result = build_experiment_from_report(text, report_markdown)
code = build_result.get("code", "")
stdout = build_result.get("stdout", "")
error = build_result.get("error", "")
base_intro = (
"Experimental Lab is a sandbox where future versions of MonkeyMind will use "
"Knowledge Transfer reports as context to plan and build small Gradio apps.\n\n"
)
code_section = "### Generated experiment code\n\n"
if code:
code_section += f"```python\n{code}\n```\n\n"
else:
code_section += "No code was generated.\n\n"
results_section = "### Sandbox output\n\n"
if stdout:
results_section += f"**Stdout / logs:**\n\n```text\n{stdout}\n```\n\n"
if error:
results_section += f"**Error:**\n\n```text\n{error}\n```\n\n"
if not stdout and not error:
results_section += "No errors encountered during sandbox test.\n\n"
return (
base_intro
+ context_note
+ f"You wrote:\n\n> {text}\n\n"
+ code_section
+ results_section
)
def lab_fix_bugs(intention: str, report_name: str | None):
base = run_experimental_lab(intention, report_name)
return (
base
+ "\n\n---\n\n_This Fix bugs action will eventually trigger another build iteration to resolve errors in the generated app. "
"For now, it simply records another planning pass based on your intention and chosen report._"
)
def lab_mark_happy(intention: str, report_name: str | None):
text = (intention or "").strip()
return (
"Marking this experiment as complete.\n\n"
f"Final intention:\n\n> {text or 'N/A'}\n\n"
"You can now export or reuse this idea elsewhere. Future versions will attach concrete code artifacts here."
)
def lab_export_project(intention: str, report_name: str | None):
text = (intention or "").strip()
return (
"Export placeholder: a future version will bundle generated code, configuration, and a short README "
"into a downloadable package.\n\n"
f"Current experiment description:\n\n> {text or 'N/A'}\n\n"
f"Reference report: `{report_name or 'none selected'}`."
)
def answer_chat_question(question: str, github_state, local_state, youtube_state, selected_bookmark, metadata_list):
"""Answer questions using RAG from any indexed source or bookmark."""
if not question.strip():
return "Please enter a question."
# Check if using a bookmark
if selected_bookmark:
metadata = find_metadata_by_label(selected_bookmark, metadata_list or [])
if metadata and metadata.get("vector_chunks", 0) > 0:
summary = (
f"Repository: {metadata.get('repo_name', selected_bookmark)}\n"
f"Last pulled: {metadata.get('last_pulled_display', '--/--/----')}"
)
answer = rag_answer_from_store(Path(metadata["vector_dir"]), question, repo_summary=summary)
return f"**[Bookmark: {selected_bookmark}]**\n\n{answer}"
# Check current session sources
for state, label in [
(github_state, "GitHub"),
(local_state, "Local"),
(youtube_state, "YouTube"),
]:
if state and state.get("indexed") and state.get("vector_dir"):
vector_dir = Path(state["vector_dir"])
if vector_dir.exists():
answer = rag_answer_from_store(vector_dir, question)
return f"**[{label} Source]**\n\n{answer}"
return "⚠️ No indexed sources available. Please index a repository or select a bookmark first."
def generate_and_download_report(state_payload: dict | None, source_type: str):
"""Generate markdown report and return file path for download."""
analysis_data = (state_payload or {}).get("analysis") if state_payload else None
if not analysis_data:
return None
if source_type == "youtube":
transcript = analysis_data.get("transcript", "")
chapters = analysis_data.get("chapters", "")
url = analysis_data.get("url", "youtube")
lang = analysis_data.get("lang", "en")
study_notes = generate_youtube_study_notes(chapters, url=url)
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
lines = [
"# YouTube Knowledge Transfer Report",
"",
f"- Source: {url}",
f"- Language: {lang}",
f"- Generated at: {generated_at}",
"",
"## 1. Chapter Outline",
"",
chapters,
"",
"## 2. Study Notes",
"",
study_notes,
]
else:
# GitHub or Local repo
repo_name = analysis_data.get("repo_name", "Project")
repo_url = analysis_data.get("repo_url", "local")
docs = analysis_data.get("documentation", [])
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z")
lines = [
f"# Knowledge Transfer Report – {repo_name}",
"",
f"- Source: {repo_url}",
f"- Generated at: {generated_at}",
f"- Documentation files: {len(docs)}",
"",
"## Documentation Files",
"",
]
for doc in docs[:50]:
lines.append(f"- {doc.get('path', 'unknown')}")
report_markdown = "\n".join(lines)
root = _ensure_knowledge_root()
slug = _slugify_name(analysis_data.get("repo_name", "project"))
ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S")
report_path = root / f"{slug}-report-{ts}.md"
report_path.write_text(report_markdown, encoding="utf-8")
return str(report_path)
def refresh_bookmarks_dropdown():
"""Refresh the bookmarks dropdown."""
choices, metadata_list = get_dropdown_options()
return gr.Dropdown(choices=choices, value=None, interactive=True), metadata_list
def build_interface() -> tuple[gr.Blocks, gr.Theme | None, str | None]:
"""Build the Gradio interface with improved UI/UX inspired by modern dashboard design."""
custom_css = """
@import url('https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;500;600;700&family=JetBrains+Mono:wght@400;500&display=swap');
:root {
--primary: #34d399; /* Mint Green */
--primary-glow: rgba(52, 211, 153, 0.4);
--glass-bg: rgba(15, 23, 42, 0.6);
--glass-border: rgba(255, 255, 255, 0.08);
--text-main: #f8fafc;
--text-muted: #94a3b8;
}
body {
background-color: #0f172a;
color: var(--text-main);
font-family: 'Outfit', sans-serif !important;
}
/* Global container override */
.gradio-container {
max-width: 1400px !important;
background: #0f172a !important;
background-image:
radial-gradient(circle at 0% 0%, rgba(52, 211, 153, 0.15) 0%, transparent 50%),
radial-gradient(circle at 100% 100%, rgba(16, 185, 129, 0.1) 0%, transparent 50%) !important;
border: none !important;
}
/* Header styling */
.header-container {
background: rgba(15, 23, 42, 0.8);
backdrop-filter: blur(12px);
border-bottom: 1px solid var(--glass-border);
padding: 20px 24px;
margin: -16px -16px 24px -16px;
border-radius: 0;
}
/* Card/Panel styling */
.source-card, .gradio-group, .tabs, .tabitem, .box-container {
background: var(--glass-bg) !important;
backdrop-filter: blur(12px);
border: 1px solid var(--glass-border) !important;
border-radius: 16px !important;
box-shadow: 0 4px 20px rgba(0, 0, 0, 0.2);
padding: 20px !important;
margin-bottom: 20px !important;
}
/* Inputs and Textareas */
input, textarea, .gr-input, .gr-box, .dropdown-wrap {
background-color: rgba(30, 41, 59, 0.6) !important;
border: 1px solid rgba(255, 255, 255, 0.1) !important;
color: var(--text-main) !important;
border-radius: 10px !important;
}
input:focus, textarea:focus {
border-color: var(--primary) !important;
box-shadow: 0 0 0 2px var(--primary-glow) !important;
}
/* Buttons */
button.primary {
background: linear-gradient(135deg, #34d399 0%, #10b981 100%) !important;
color: #0f172a !important;
font-weight: 600 !important;
border: none !important;
box-shadow: 0 4px 15px rgba(52, 211, 153, 0.3) !important;
}
button.secondary {
background: rgba(30, 41, 59, 0.8) !important;
border: 1px solid rgba(255, 255, 255, 0.1) !important;
color: var(--text-muted) !important;
}
button.secondary:hover {
color: var(--text-main) !important;
border-color: var(--primary) !important;
}
button.stop {
background: linear-gradient(135deg, #f87171 0%, #ef4444 100%) !important;
color: white !important;
}
/* Status indicators */
.status-ready {
background: rgba(16, 185, 129, 0.1);
color: #34d399;
padding: 6px 12px;
border-radius: 20px;
font-size: 12px;
border: 1px solid rgba(16, 185, 129, 0.2);
}
.status-pending {
background: rgba(251, 191, 36, 0.1);
color: #fbbf24;
padding: 6px 12px;
border-radius: 20px;
font-size: 12px;
border: 1px solid rgba(251, 191, 36, 0.2);
}
/* Keep copy/like buttons always visible and styled */
.message-buttons {
opacity: 1 !important;
display: flex !important;
gap: 4px !important;
}
.message-buttons button {
color: #94a3b8 !important; /* text-muted */
background: transparent !important;
border: none !important;
box-shadow: none !important;
}
.message-buttons button:hover {
color: #34d399 !important; /* primary */
background: rgba(52, 211, 153, 0.1) !important;
}
/* Chat bubbles */
.chat-assistant {
background: rgba(30, 41, 59, 0.8) !important;
border: 1px solid var(--glass-border);
border-radius: 18px 18px 18px 4px !important;
color: var(--text-main) !important;
}
.chat-user {
background: linear-gradient(135deg, #34d399 0%, #10b981 100%) !important;
color: #022c22 !important;
border-radius: 18px 18px 4px 18px !important;
font-weight: 500;
}
/* Chat message text size and font */
.message-wrap .message {
font-size: 0.9rem !important;
font-family: 'JetBrains Mono', monospace !important;
line-height: 1.5 !important;
}
/* Typography overrides */
.prose, .prose h1, .prose h2, .prose h3, .prose p, .prose strong {
color: var(--text-main) !important;
}
/* Scrollbar */
::-webkit-scrollbar { width: 6px; height: 6px; }
::-webkit-scrollbar-track { background: transparent; }
::-webkit-scrollbar-thumb { background: #334155; border-radius: 10px; }
::-webkit-scrollbar-thumb:hover { background: #475569; }
"""
app_theme = gr.themes.Soft(
primary_hue="emerald",
secondary_hue="slate",
neutral_hue="slate",
).set(
body_background_fill="#0f172a",
block_background_fill="#1e293b",
block_border_color="rgba(255,255,255,0.1)",
input_background_fill="#0f172a",
button_primary_background_fill="#34d399",
button_primary_text_color="#0f172a",
)
blocks_kwargs = {"title": "πŸ’ MonkeyMind - Knowledge Transfer Agent"}
if not IS_GRADIO_V6:
blocks_kwargs.update(theme=app_theme, css=custom_css)
with gr.Blocks(**blocks_kwargs) as demo:
# State variables
github_state = gr.State({})
local_state = gr.State({})
youtube_state = gr.State({})
bookmarks_metadata = gr.State([])
chat_history = gr.State([])
notepad_content = gr.State("") # For notepad feature
copied_texts = gr.State(set()) # Track copied texts to prevent duplicates
# ===== HEADER =====
with gr.Row(elem_classes=["header-container"]):
gr.HTML("""
<div style="display: flex; align-items: center; gap: 16px;">
<div style="width: 48px; height: 48px; border-radius: 12px; background: linear-gradient(135deg, #34d399 0%, #10b981 100%); display: flex; align-items: center; justify-content: center; box-shadow: 0 0 20px rgba(52, 211, 153, 0.3);">
<span style="font-size: 28px;">πŸ’</span>
</div>
<div>
<h1 style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #f8fafc; letter-spacing: -0.5px;">
MonkeyMind
</h1>
<div style="display: flex; align-items: center; gap: 8px;">
<span style="width: 8px; height: 8px; border-radius: 50%; background: #34d399; box-shadow: 0 0 10px #34d399;"></span>
<p style="margin: 0; font-size: 0.8rem; color: #94a3b8; font-family: 'JetBrains Mono', monospace;">AGENT ACTIVE</p>
</div>
</div>
</div>
""")
# ===== APP DESCRIPTION =====
gr.Markdown("""
> **πŸ’ MonkeyMind** is your Knowledge Transfer & RAG Agent. Analyze GitHub repos, local projects, or YouTube videos
> to build a personal knowledge base. Chat with your sources using AI-powered retrieval.
>
> **Quick Start:** Paste a GitHub URL β†’ Click **Analyze** β†’ Click **Index** β†’ Start chatting!
> Bookmark important sources to access them anytime.
""")
# ===== MAIN LAYOUT: 1/3 Left Panel + 2/3 Right Panel =====
with gr.Row():
# ===== LEFT PANEL: Data Sources + Bookmarks =====
with gr.Column(scale=1, min_width=350):
# Data Sources Section
gr.Markdown("### πŸ“‘ Data Ingestion")
# Source Type Selector (pill-style tabs)
source_type = gr.Radio(
choices=["πŸ”— GitHub", "πŸ“ Local", "πŸŽ₯ YouTube"],
value="πŸ”— GitHub",
label="",
container=False,
interactive=True
)
# Dynamic source input based on selection
with gr.Group(visible=True) as github_group:
gr.Markdown("*Paste a GitHub repo URL and click **Analyze** to extract docs. Then **Index** to enable RAG chat. You can see retrieved contents in the **Preview** tab on the right.*")
github_url = gr.Textbox(
label="Repository URL",
placeholder="https://github.com/username/repo",
show_label=True,
)
github_analyze_btn = gr.Button("πŸ” Analyze Repository", variant="secondary")
github_index_btn = gr.Button("⚑ Index for RAG", variant="primary")
github_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"])
github_bookmark_btn = gr.Button("πŸ”– Bookmark & Save to Knowledge Base", variant="secondary")
gr.Markdown("---")
gr.Markdown("**πŸ“ Notepad** *(copy useful info here)*")
github_notepad = gr.Textbox(
label="",
placeholder="Paste or write notes here...",
lines=3,
container=False,
show_copy_button=True,
)
with gr.Row():
github_notepad_download = gr.DownloadButton("πŸ“₯ Download as .md", variant="secondary", size="sm")
with gr.Group(visible=False) as local_group:
gr.Markdown("*Upload a project folder. Irrelevant files are auto-filtered. You can see retrieved contents in the **Preview** tab on the right.*")
local_folder = gr.File(
label="Upload Project Folder",
file_count="directory",
type="filepath",
)
local_analyze_btn = gr.Button("πŸ” Analyze Project", variant="secondary")
local_index_btn = gr.Button("⚑ Index for RAG", variant="primary")
local_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"])
local_bookmark_btn = gr.Button("πŸ”– Bookmark & Save to Knowledge Base", variant="secondary")
gr.Markdown("---")
gr.Markdown("**πŸ“ Notepad** *(copy useful info here)*")
local_notepad = gr.Textbox(
label="",
placeholder="Paste or write notes here...",
lines=3,
container=False,
show_copy_button=True,
)
with gr.Row():
local_notepad_download = gr.DownloadButton("πŸ“₯ Download as .md", variant="secondary", size="sm")
with gr.Group(visible=False) as youtube_group:
gr.Markdown("*Paste a YouTube video URL to extract and analyze the transcript. You can see retrieved contents in the **Preview** tab on the right.*")
youtube_url = gr.Textbox(
label="Video URL",
placeholder="https://www.youtube.com/watch?v=...",
)
youtube_analyze_btn = gr.Button("πŸ” Analyze Video", variant="secondary")
youtube_index_btn = gr.Button("⚑ Index for RAG", variant="primary")
youtube_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"])
youtube_bookmark_btn = gr.Button("πŸ”– Bookmark & Save to Knowledge Base", variant="secondary")
gr.Markdown("---")
gr.Markdown("**πŸ“ Notepad** *(copy useful info here)*")
youtube_notepad = gr.Textbox(
label="",
placeholder="Paste or write notes here...",
lines=3,
container=False,
show_copy_button=True,
)
with gr.Row():
youtube_notepad_download = gr.DownloadButton("πŸ“₯ Download as .md", variant="secondary", size="sm")
# Source type switching logic
def switch_source(choice):
return (
gr.Group(visible=("GitHub" in choice)),
gr.Group(visible=("Local" in choice)),
gr.Group(visible=("YouTube" in choice)),
)
source_type.change(
fn=switch_source,
inputs=[source_type],
outputs=[github_group, local_group, youtube_group],
)
gr.Markdown("---")
# Bookmarks Quick Access Section
gr.Markdown("### 🧠 Knowledge Base")
gr.Markdown("*Select sources to use for chat. Multiple selections allowed.*")
# Use CheckboxGroup for multi-select
bookmarks_checkboxes = gr.CheckboxGroup(
label="Active Sources",
choices=[],
value=[],
interactive=True,
info="Check sources to include in chat context"
)
# Keep dropdown for compatibility (hidden, used internally)
bookmarks_dropdown = gr.Dropdown(
label="",
choices=[],
value=None,
interactive=True,
visible=False,
)
with gr.Row():
refresh_bookmarks_btn = gr.Button("πŸ”„ Refresh", variant="secondary", size="sm", scale=1)
view_all_btn = gr.Button("πŸ“‹ View All", variant="secondary", size="sm", scale=1)
# Bookmark info display
bookmark_info = gr.Markdown(
value="*No sources selected. Bookmark repos to add them here.*",
elem_classes=["info-box"]
)
# ===== RIGHT PANEL: Chat & Preview =====
with gr.Column(scale=2, min_width=500):
# Chat vs Preview Toggle
right_panel_mode = gr.Radio(
choices=["πŸ’¬ Chat & RAG", "πŸ“„ Preview"],
value="πŸ’¬ Chat & RAG",
label="",
container=False,
)
# Chat Interface
with gr.Group(visible=True) as chat_panel:
with gr.Group(elem_classes=["box-container"]):
gr.Markdown("### πŸ’ Knowledge Assistant")
gr.Markdown("*Hover over messages to see copy icon. Use toggles below to augment with web/wiki search.*")
chatbot_kwargs = dict(
value=[],
height=450,
show_label=False,
avatar_images=["images/user.png", "images/monkey.png"],
elem_classes=["chat-container"],
type="messages",
)
chatbot = gr.Chatbot(**chatbot_kwargs)
# Toolbar row: Web Search, Wiki Search (Clear Chat removed)
with gr.Row():
# clear_chat_btn removed as requested
web_search_toggle = gr.Checkbox(
label="🌐 Web Search",
value=False,
interactive=True,
)
wiki_search_toggle = gr.Checkbox(
label="πŸ“š Wikipedia",
value=False,
interactive=True,
)
with gr.Row():
question_box = gr.Textbox(
label="",
placeholder="Ask anything about your indexed sources...",
lines=1,
scale=5,
container=False,
)
send_btn = gr.Button("Send", variant="primary", scale=1)
gr.Examples(
examples=[
"Summarize the main architecture patterns",
"What are the key dependencies?",
"Explain the core functionality",
"What patterns can I reuse?",
],
inputs=question_box,
label="Quick Questions"
)
# ===== EXPERIMENTAL LAB (Collapsible) - Moved here =====
with gr.Group(elem_classes=["box-container"]):
gr.Markdown("### πŸ§ͺ Experimental Lab")
gr.Markdown("Use this lab to prototype small apps based on your knowledge base.")
with gr.Accordion("Open Lab", open=False):
gr.Markdown("*The agent generates Gradio code and tests it in a sandbox.*")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 🎯 Build an Experiment")
lab_intention = gr.Textbox(
label="What do you want to build?",
placeholder="e.g., A Gradio app that visualizes patterns from a Knowledge Transfer report.",
lines=3,
)
lab_report_dropdown = gr.Dropdown(
label="Reference Material (bookmarked report)",
choices=_list_knowledge_report_choices(),
value=None,
interactive=True,
info="Select a Knowledge Transfer report as context"
)
with gr.Row():
lab_refresh_reports_btn = gr.Button("πŸ”„ Refresh", variant="secondary", size="sm")
lab_start_btn = gr.Button("▢️ Start Build", variant="primary", size="sm")
gr.Markdown("---")
gr.Markdown("**πŸ”§ Fix Issues**")
lab_fix_instruction = gr.Textbox(
label="",
placeholder="Describe what needs to be fixed (e.g., 'The button click handler is not working')...",
lines=2,
container=False,
)
with gr.Row():
lab_fix_btn = gr.Button("πŸ”§ Apply Fix", variant="secondary", size="sm")
lab_happy_btn = gr.Button("βœ… Done", variant="secondary", size="sm")
lab_export_btn = gr.Button("πŸ“₯ Export Code", variant="primary")
lab_download = gr.File(label="Download", visible=False)
with gr.Column(scale=2):
gr.Markdown("### πŸ”¬ Experiment Output")
lab_output = gr.Markdown(
"Describe what you want to build, select a reference report from your bookmarks, then click **Start Build**.\n\n"
"If there are errors, describe the issue in the fix box and click **Apply Fix**."
)
lab_code_display = gr.Code(
label="Generated Code",
language="python",
visible=False,
)
# Preview Interface
with gr.Group(visible=False) as preview_panel:
gr.Markdown("### πŸ“„ Document Preview")
with gr.Row():
with gr.Column(scale=1):
preview_source_info = gr.Markdown("""
**Source:** Not selected
**Status:** ⏳ Pending
**Chunks:** 0 vectors
""", elem_classes=["info-box"])
with gr.Column(scale=1, min_width=100):
with gr.Row():
preview_copy_btn = gr.Button("πŸ“‹ Copy", variant="secondary", size="sm", scale=1)
preview_download_btn = gr.DownloadButton("πŸ“₯ Download", variant="secondary", size="sm", scale=1)
preview_download_file = gr.File(visible=False)
preview_content = gr.Markdown(
value="Select a source and analyze it to see the preview here.",
elem_classes=["preview-card"]
)
# Panel switching logic
def switch_panel(choice):
return (
gr.Group(visible=("Chat" in choice)),
gr.Group(visible=("Preview" in choice)),
)
right_panel_mode.change(
fn=switch_panel,
inputs=[right_panel_mode],
outputs=[chat_panel, preview_panel],
)
gr.Markdown("---")
# ===== HIDDEN STATE COMPONENTS FOR DOWNLOADS =====
github_download = gr.File(label="Download", visible=False)
local_download = gr.File(label="Download", visible=False)
youtube_download = gr.File(label="Download", visible=False)
# ===== EVENT HANDLERS =====
# GitHub handlers
github_analyze_btn.click(
fn=lambda: "⏳ **Analyzing repository...** Please wait.",
outputs=[github_status],
).then(
fn=run_github_ingestion,
inputs=[github_url],
outputs=[preview_content, preview_source_info, github_state],
).then(
fn=lambda: "βœ… **Analysis complete!** Click **Index for RAG** to enable chat.",
outputs=[github_status],
)
def index_github_with_status(state):
status, new_state = index_github_repo(state)
chunks = new_state.get("vector_chunks", 0) if new_state else 0
if "βœ…" in status or chunks > 0:
return f"βœ… **Indexed {chunks} vector chunks.** Ready for RAG queries!", new_state
return status, new_state
def index_local_with_status(state):
status, new_state = index_local_repo(state)
chunks = new_state.get("vector_chunks", 0) if new_state else 0
if "βœ…" in status or chunks > 0:
return f"βœ… **Indexed {chunks} vector chunks.** Ready for RAG queries!", new_state
return status, new_state
def index_youtube_with_status(state):
status, new_state = index_youtube_video(state)
chunks = new_state.get("vector_chunks", 0) if new_state else 0
if "βœ…" in status or chunks > 0:
return f"βœ… **Indexed {chunks} transcript chunks.** Ready for RAG queries!", new_state
return status, new_state
github_index_btn.click(
fn=lambda: "⏳ **Indexing...** Building vector embeddings.",
outputs=[github_status],
).then(
fn=index_github_with_status,
inputs=[github_state],
outputs=[github_status, github_state],
)
def bookmark_with_refresh(state):
status, new_state, dropdown = bookmark_github_repo(state)
choices, meta = get_dropdown_options()
return (
"πŸ”– **Bookmarked!** Added to Knowledge Base for future chat sessions.",
new_state,
gr.Dropdown(choices=choices, value=None),
gr.CheckboxGroup(choices=choices, value=[]),
meta
)
github_bookmark_btn.click(
fn=bookmark_with_refresh,
inputs=[github_state],
outputs=[github_status, github_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata],
)
# Local handlers
local_analyze_btn.click(
fn=lambda: "⏳ **Analyzing project...** Please wait.",
outputs=[local_status],
).then(
fn=run_local_repo_ingestion,
inputs=[local_folder],
outputs=[preview_content, preview_source_info, local_state],
).then(
fn=lambda: "βœ… **Analysis complete!** Click **Index for RAG** to enable chat.",
outputs=[local_status],
)
local_index_btn.click(
fn=lambda: "⏳ **Indexing...** Building vector embeddings.",
outputs=[local_status],
).then(
fn=index_local_with_status,
inputs=[local_state],
outputs=[local_status, local_state],
)
def bookmark_local_with_refresh(state):
status, new_state, dropdown = bookmark_local_repo(state)
choices, meta = get_dropdown_options()
return (
"πŸ”– **Bookmarked!** Added to Knowledge Base for future chat sessions.",
new_state,
gr.Dropdown(choices=choices, value=None),
gr.CheckboxGroup(choices=choices, value=[]),
meta
)
local_bookmark_btn.click(
fn=bookmark_local_with_refresh,
inputs=[local_state],
outputs=[local_status, local_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata],
)
# YouTube handlers
youtube_analyze_btn.click(
fn=lambda: "⏳ **Fetching transcript...** Please wait.",
outputs=[youtube_status],
).then(
fn=run_youtube_ingestion,
inputs=[youtube_url],
outputs=[preview_content, preview_source_info, youtube_state],
).then(
fn=lambda: "βœ… **Analysis complete!** Click **Index for RAG** to enable chat.",
outputs=[youtube_status],
)
youtube_index_btn.click(
fn=lambda: "⏳ **Indexing...** Building vector embeddings.",
outputs=[youtube_status],
).then(
fn=index_youtube_with_status,
inputs=[youtube_state],
outputs=[youtube_status, youtube_state],
)
def bookmark_youtube_with_refresh(state):
status, new_state, dropdown = bookmark_youtube_video(state)
choices, meta = get_dropdown_options()
return (
"πŸ”– **Bookmarked!** Added to Knowledge Base for future chat sessions.",
new_state,
gr.Dropdown(choices=choices, value=None),
gr.CheckboxGroup(choices=choices, value=[]),
meta
)
youtube_bookmark_btn.click(
fn=bookmark_youtube_with_refresh,
inputs=[youtube_state],
outputs=[youtube_status, youtube_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata],
)
# Chat handlers
def handle_chat_with_history(question, history, github_s, local_s, youtube_s, selected_sources, meta):
# Use first selected source from checkboxes, or fall back to indexed sources
bookmark = selected_sources[0] if selected_sources else None
answer = answer_chat_question(question, github_s, local_s, youtube_s, bookmark, meta)
history = history or []
history = history or []
history.append({"role": "user", "content": question})
history.append({"role": "assistant", "content": answer})
return history, ""
send_btn.click(
fn=handle_chat_with_history,
inputs=[question_box, chatbot, github_state, local_state, youtube_state, bookmarks_checkboxes, bookmarks_metadata],
outputs=[chatbot, question_box],
)
question_box.submit(
fn=handle_chat_with_history,
inputs=[question_box, chatbot, github_state, local_state, youtube_state, bookmarks_checkboxes, bookmarks_metadata],
outputs=[chatbot, question_box],
)
# Chatbot Like/Dislike Handler
def handle_chatbot_like(data: gr.LikeData):
# Placeholder for future feedback logging
print(f"User feedback: {'Liked' if data.liked else 'Disliked'} message index {data.index}")
return None
chatbot.like(
fn=handle_chatbot_like,
outputs=None
)
# Bookmark handlers - refresh both dropdown and checkboxes
def refresh_all_bookmarks():
choices, meta = get_dropdown_options()
return (
gr.Dropdown(choices=choices, value=None),
gr.CheckboxGroup(choices=choices, value=[]),
meta
)
refresh_bookmarks_btn.click(
fn=refresh_all_bookmarks,
outputs=[bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata],
)
# View All button - show bookmark details
def view_all_bookmarks(meta):
if not meta:
return "*No bookmarks found. Analyze and bookmark repositories to see them here.*"
lines = ["**πŸ“š All Bookmarked Sources:**\n"]
for m in meta:
name = m.get("repo_name", "Unknown")
date = m.get("last_pulled_display", "--")
chunks = m.get("vector_chunks", 0)
lines.append(f"- **{name}** β€” {date} β€” {chunks} chunks")
return "\n".join(lines)
view_all_btn.click(
fn=view_all_bookmarks,
inputs=[bookmarks_metadata],
outputs=[bookmark_info],
)
# Update bookmark info when checkboxes change
def update_checkbox_info(selected, meta):
if not selected:
return "*No sources selected. Check sources above to include in chat.*"
lines = [f"**{len(selected)} source(s) selected:**\n"]
for label in selected:
m = find_metadata_by_label(label, meta or [])
if m:
lines.append(f"- {m.get('repo_name', label)} ({m.get('vector_chunks', 0)} chunks)")
return "\n".join(lines)
bookmarks_checkboxes.change(
fn=update_checkbox_info,
inputs=[bookmarks_checkboxes, bookmarks_metadata],
outputs=[bookmark_info],
)
# Notepad download handlers
def download_notepad_as_md(content):
if not content or not content.strip():
return None
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f:
f.write(content)
return f.name
github_notepad_download.click(
fn=download_notepad_as_md,
inputs=[github_notepad],
outputs=[github_notepad_download],
)
local_notepad_download.click(
fn=download_notepad_as_md,
inputs=[local_notepad],
outputs=[local_notepad_download],
)
youtube_notepad_download.click(
fn=download_notepad_as_md,
inputs=[youtube_notepad],
outputs=[youtube_notepad_download],
)
# Preview Copy Handler (JS)
preview_copy_btn.click(
fn=None,
inputs=[preview_content],
js="(content) => { navigator.clipboard.writeText(content); return 'Copied!'; }",
)
# Preview Download Handler
preview_download_btn.click(
fn=download_notepad_as_md, # Reusing the md download function
inputs=[preview_content],
outputs=[preview_download_btn],
)
# Lab handlers with improved output
def run_lab_with_code(intention, report):
result = run_experimental_lab(intention, report)
# Extract code if present in result
if "```python" in result:
code_start = result.find("```python") + 9
code_end = result.find("```", code_start)
code = result[code_start:code_end].strip() if code_end > code_start else ""
return result, gr.Code(value=code, visible=True)
return result, gr.Code(visible=False)
lab_start_btn.click(
fn=run_lab_with_code,
inputs=[lab_intention, lab_report_dropdown],
outputs=[lab_output, lab_code_display],
)
def fix_lab_with_instruction(intention, report, fix_instruction):
# Pass the fix instruction to the fix function
combined = f"{intention}\n\nFIX REQUEST: {fix_instruction}" if fix_instruction else intention
return lab_fix_bugs(combined, report)
lab_fix_btn.click(
fn=fix_lab_with_instruction,
inputs=[lab_intention, lab_report_dropdown, lab_fix_instruction],
outputs=[lab_output],
)
lab_happy_btn.click(
fn=lab_mark_happy,
inputs=[lab_intention, lab_report_dropdown],
outputs=[lab_output],
)
# Export lab code to file
def export_lab_code(code_content):
if not code_content:
return None
import tempfile
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(code_content)
return f.name
lab_export_btn.click(
fn=export_lab_code,
inputs=[lab_code_display],
outputs=[lab_download],
).then(
fn=lambda: gr.File(visible=True),
outputs=[lab_download],
)
lab_refresh_reports_btn.click(
fn=_refresh_lab_reports_dropdown,
outputs=[lab_report_dropdown],
)
# Load bookmarks on startup (refresh both dropdown and checkboxes)
demo.load(
fn=refresh_all_bookmarks,
outputs=[bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata],
)
return demo, (app_theme if IS_GRADIO_V6 else None), (custom_css if IS_GRADIO_V6 else None)
if __name__ == "__main__":
demo, app_theme, custom_css = build_interface()
launch_kwargs = {"share": False}
if IS_GRADIO_V6:
launch_kwargs.update(theme=app_theme, css=custom_css)
demo.launch(**launch_kwargs)