Spaces:
Running
Running
| from datetime import datetime, timezone | |
| from pathlib import Path | |
| import os | |
| import re | |
| import shutil | |
| import gradio as gr | |
| from agent import ( | |
| analyze_github_repo, | |
| analyze_local_repo, | |
| build_experiment_from_report, | |
| build_repo_vector_store, | |
| fetch_youtube_transcript, | |
| generate_youtube_study_notes, | |
| rag_answer_from_store, | |
| summarize_youtube_chapters, | |
| ) | |
| from bookmarks import ( | |
| bookmark_repo_from_analysis, | |
| find_metadata_by_label, | |
| get_cache_dirs, | |
| get_dropdown_options, | |
| ) | |
| # File extensions and folders to ignore for local uploads | |
| IGNORE_PATTERNS = { | |
| 'folders': { | |
| '__pycache__', '.git', '.svn', '.hg', 'node_modules', | |
| 'venv', 'env', '.venv', '.env', 'dist', 'build', | |
| '.idea', '.vscode', '.pytest_cache', '.mypy_cache', | |
| 'coverage', '.coverage', 'htmlcov', '.tox', 'eggs', | |
| '.eggs', '*.egg-info', '.DS_Store' | |
| }, | |
| 'extensions': { | |
| '.pyc', '.pyo', '.pyd', '.so', '.dll', '.dylib', | |
| '.class', '.o', '.obj', '.exe', '.bin', '.lock', | |
| '.log', '.tmp', '.temp', '.cache', '.bak', '.swp', | |
| '.swo', '.DS_Store', '.gitignore' | |
| } | |
| } | |
| def _should_ignore_path(path: Path) -> bool: | |
| """Check if a path should be ignored during local folder processing.""" | |
| for part in path.parts: | |
| if part in IGNORE_PATTERNS['folders']: | |
| return True | |
| if part.startswith('.') and part not in {'.', '..'}: | |
| return True | |
| if path.suffix.lower() in IGNORE_PATTERNS['extensions']: | |
| return True | |
| return False | |
| KNOWLEDGE_TRANSFER_ROOT = Path("Knowledge Transfer") | |
| def _is_gradio_v6_or_newer() -> bool: | |
| """Return True if the installed Gradio major version is >= 6.""" | |
| version_str = getattr(gr, "__version__", "0") | |
| try: | |
| major = int(version_str.split(".")[0]) | |
| return major >= 6 | |
| except (ValueError, IndexError): | |
| return False | |
| IS_GRADIO_V6 = _is_gradio_v6_or_newer() | |
| def run_github_ingestion(repo_url: str): | |
| """Analyze GitHub repository without indexing.""" | |
| repo_url = (repo_url or "").strip() | |
| if not repo_url: | |
| warning = "β οΈ Please paste a public GitHub repository URL to begin." | |
| source_info = "**Source:** Not selected\n**Status:** β³ Pending\n**Chunks:** 0 vectors" | |
| return warning, source_info, { | |
| "analysis": None, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "summary_base": [], | |
| } | |
| result = analyze_github_repo(repo_url) | |
| if "error" in result: | |
| error_msg = f"β {result['error']}" | |
| source_info = f"**Source:** {repo_url}\n**Status:** β Error\n**Chunks:** 0 vectors" | |
| return error_msg, source_info, { | |
| "analysis": None, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "summary_base": [], | |
| } | |
| docs = result.get("documentation", []) | |
| repo_name = result.get("repo_name", repo_url) | |
| timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC") | |
| # Source info for preview panel | |
| source_info = f"""**Source:** GitHub Repository | |
| **Repository:** {repo_name} | |
| **Status:** β Analyzed | |
| **Documents:** {len(docs)} files | |
| **Analyzed:** {timestamp}""" | |
| # Document preview content | |
| preview_sections = [] | |
| for doc in docs[:5]: | |
| content = (doc.get("content") or "").strip() | |
| if not content: | |
| continue | |
| snippet = content[:600] | |
| if len(content) > 600: | |
| snippet = snippet.rstrip() + " ..." | |
| preview_sections.append( | |
| f"### π {doc.get('path', 'document')}\n\n{snippet}" | |
| ) | |
| preview_content = ( | |
| "\n\n---\n\n".join(preview_sections) | |
| if preview_sections | |
| else "*No textual documentation snippets were found.*" | |
| ) | |
| state_payload = { | |
| "analysis": result, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "summary_base": [repo_name, str(len(docs))], | |
| "processed_timestamp": timestamp, | |
| "indexed": False, | |
| } | |
| return preview_content, source_info, state_payload | |
| def index_github_repo(state_payload: dict | None): | |
| """Index the analyzed GitHub repository for RAG.""" | |
| analysis_data = (state_payload or {}).get("analysis") if state_payload else None | |
| if not analysis_data: | |
| return "β οΈ Run analysis before indexing.", state_payload | |
| if state_payload.get("indexed"): | |
| return "β Repository already indexed and ready for RAG queries.", state_payload | |
| docs = analysis_data.get("documentation", []) | |
| repo_name = analysis_data.get("repo_name", "repo") | |
| repo_url = analysis_data.get("repo_url", "") | |
| slug, cache_dir, cache_vector_dir = get_cache_dirs(repo_url, repo_name) | |
| if cache_dir.exists(): | |
| shutil.rmtree(cache_dir) | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| vector_chunks = 0 | |
| if docs: | |
| _, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir) | |
| vector_chunks = chunk_count | |
| else: | |
| return "β οΈ No documentation found to index.", state_payload | |
| new_state = { | |
| **state_payload, | |
| "vector_dir": str(cache_vector_dir), | |
| "vector_chunks": vector_chunks, | |
| "indexed": True, | |
| } | |
| return f"β Indexed {vector_chunks} vector chunks. Ready for RAG queries!", new_state | |
| def bookmark_github_repo(state_payload: dict | None): | |
| """Bookmark and index the GitHub repository permanently.""" | |
| analysis_data = (state_payload or {}).get("analysis") if state_payload else None | |
| if not analysis_data: | |
| return "β οΈ Run analysis before bookmarking.", state_payload, gr.Dropdown() | |
| docs = analysis_data.get("documentation", []) | |
| if not docs: | |
| return "β οΈ No documentation to bookmark.", state_payload, gr.Dropdown() | |
| repo_url = analysis_data.get("repo_url") or analysis_data.get("repo_name") | |
| # Build vector store if not already done | |
| vector_dir = state_payload.get("vector_dir") if state_payload else "" | |
| if not vector_dir: | |
| repo_name = analysis_data.get("repo_name", "repo") | |
| slug, cache_dir, cache_vector_dir = get_cache_dirs(repo_url, repo_name) | |
| if cache_dir.exists(): | |
| shutil.rmtree(cache_dir) | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| if docs: | |
| _, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir) | |
| vector_dir = str(cache_vector_dir) | |
| else: | |
| chunk_count = 0 | |
| else: | |
| chunk_count = state_payload.get("vector_chunks", 0) | |
| metadata = bookmark_repo_from_analysis( | |
| repo_url, | |
| analysis_data, | |
| prebuilt_vector_dir=Path(vector_dir) if vector_dir else None, | |
| prebuilt_chunks=chunk_count, | |
| ) | |
| choices, metadata_list = get_dropdown_options() | |
| dropdown_update = gr.Dropdown( | |
| choices=choices, | |
| value=metadata.dropdown_label, | |
| interactive=True, | |
| ) | |
| new_state = { | |
| **state_payload, | |
| "vector_dir": vector_dir, | |
| "vector_chunks": chunk_count, | |
| "indexed": True, | |
| } | |
| return f"πΎ Repository bookmarked on {metadata.last_pulled_display}. Access it in the Chat tab!", new_state, dropdown_update | |
| def run_youtube_ingestion(youtube_url: str): | |
| """Analyze YouTube video without indexing.""" | |
| youtube_url = (youtube_url or "").strip() | |
| if not youtube_url: | |
| warning = "β οΈ Paste a YouTube video URL to begin." | |
| source_info = "**Source:** Not selected\n**Status:** β³ Pending\n**Chunks:** 0 vectors" | |
| return warning, source_info, { | |
| "analysis": None, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| } | |
| result = fetch_youtube_transcript(youtube_url) | |
| if "error" in result: | |
| error_msg = f"β {result['error']}" | |
| source_info = f"**Source:** YouTube\n**Status:** β Error\n**Chunks:** 0 vectors" | |
| return error_msg, source_info, { | |
| "analysis": None, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| } | |
| transcript = (result.get("raw_transcript") or "").strip() | |
| if not transcript: | |
| source_info = "**Source:** YouTube\n**Status:** β οΈ No transcript\n**Chunks:** 0 vectors" | |
| return "β οΈ No transcript text was returned.", source_info, {"analysis": None} | |
| timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC") | |
| video_url = result.get("url", youtube_url) | |
| lang = result.get("lang", "en") | |
| # Source info for preview panel | |
| source_info = f"""**Source:** YouTube Video | |
| **URL:** {video_url} | |
| **Language:** {lang} | |
| **Status:** β Analyzed | |
| **Analyzed:** {timestamp}""" | |
| # Generate chapter summaries | |
| chapters = summarize_youtube_chapters(transcript, url=video_url) | |
| # Preview content with chapters | |
| preview_content = f"""### πΊ Video Transcript Analysis | |
| {chapters} | |
| --- | |
| ### π Transcript Preview | |
| {transcript[:2000]}{"..." if len(transcript) > 2000 else ""} | |
| """ | |
| state_payload = { | |
| "analysis": { | |
| "transcript": transcript, | |
| "url": video_url, | |
| "lang": lang, | |
| "chapters": chapters, | |
| }, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "indexed": False, | |
| } | |
| return preview_content, source_info, state_payload | |
| def index_youtube_video(state_payload: dict | None): | |
| """Index YouTube transcript for RAG.""" | |
| analysis_data = (state_payload or {}).get("analysis") if state_payload else None | |
| if not analysis_data: | |
| return "β οΈ Run analysis before indexing.", state_payload | |
| if state_payload.get("indexed"): | |
| return "β Video already indexed and ready for RAG queries.", state_payload | |
| transcript = analysis_data.get("transcript", "") | |
| if not transcript: | |
| return "β οΈ No transcript found to index.", state_payload | |
| # Create pseudo-documents from transcript | |
| docs = [{ | |
| "path": "transcript.txt", | |
| "content": transcript, | |
| "type": "transcript", | |
| }] | |
| url = analysis_data.get("url", "youtube") | |
| slug, cache_dir, cache_vector_dir = get_cache_dirs(url, "youtube") | |
| if cache_dir.exists(): | |
| shutil.rmtree(cache_dir) | |
| cache_dir.mkdir(parents=True, exist_ok=True) | |
| _, chunk_count = build_repo_vector_store(docs, persist_path=cache_vector_dir) | |
| new_state = { | |
| **state_payload, | |
| "vector_dir": str(cache_vector_dir), | |
| "vector_chunks": chunk_count, | |
| "indexed": True, | |
| } | |
| return f"β Indexed {chunk_count} transcript chunks. Ready for RAG queries!", new_state | |
| def bookmark_youtube_video(state_payload: dict | None): | |
| """Bookmark YouTube video - persists transcript to bookmarks system.""" | |
| analysis_data = (state_payload or {}).get("analysis") if state_payload else None | |
| if not analysis_data: | |
| return "β οΈ Run analysis before bookmarking.", state_payload, gr.Dropdown() | |
| transcript = analysis_data.get("transcript", "") | |
| if not transcript: | |
| return "β οΈ No transcript found to bookmark.", state_payload, gr.Dropdown() | |
| video_url = analysis_data.get("url", "youtube-video") | |
| chapters = analysis_data.get("chapters", "") | |
| # Create pseudo-analysis structure compatible with bookmark_repo_from_analysis | |
| pseudo_analysis = { | |
| "repo_name": f"YouTube: {video_url[:50]}", | |
| "repo_url": video_url, | |
| "documentation": [ | |
| { | |
| "path": "transcript.txt", | |
| "content": transcript, | |
| }, | |
| { | |
| "path": "chapters.md", | |
| "content": chapters, | |
| } | |
| ], | |
| } | |
| # Use prebuilt vector store if already indexed | |
| prebuilt_dir = None | |
| prebuilt_chunks = None | |
| if state_payload.get("indexed") and state_payload.get("vector_dir"): | |
| prebuilt_dir = Path(state_payload["vector_dir"]) | |
| prebuilt_chunks = state_payload.get("vector_chunks", 0) | |
| metadata = bookmark_repo_from_analysis( | |
| video_url, | |
| pseudo_analysis, | |
| prebuilt_vector_dir=prebuilt_dir, | |
| prebuilt_chunks=prebuilt_chunks, | |
| ) | |
| choices, _ = get_dropdown_options() | |
| dropdown_update = gr.Dropdown(choices=choices, value=metadata.dropdown_label) | |
| new_state = { | |
| **state_payload, | |
| "vector_dir": metadata.vector_dir, | |
| "vector_chunks": metadata.vector_chunks, | |
| "indexed": True, | |
| "bookmarked": True, | |
| } | |
| return f"π YouTube video bookmarked! {metadata.vector_chunks} chunks indexed.", new_state, dropdown_update | |
| def generate_youtube_transfer_report(youtube_url: str): | |
| youtube_url = (youtube_url or "").strip() | |
| if not youtube_url: | |
| return "β οΈ Paste a YouTube video URL before generating a report." | |
| result = fetch_youtube_transcript(youtube_url) | |
| if "error" in result: | |
| return f"β {result['error']}" | |
| transcript = (result.get("raw_transcript") or "").strip() | |
| if not transcript: | |
| return "No transcript text was returned by the youtube-transcript MCP server; report generation was skipped." | |
| chapters = summarize_youtube_chapters(transcript, url=result.get("url", youtube_url)) | |
| study_notes = generate_youtube_study_notes(chapters, url=result.get("url", youtube_url)) | |
| generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") | |
| header_lines = [ | |
| "# YouTube Knowledge Transfer Report", | |
| "", | |
| f"- Source: {result.get('url', youtube_url)}", | |
| f"- Language: {result.get('lang', 'en')}", | |
| f"- Generated at: {generated_at}", | |
| ] | |
| lines: list[str] = [] | |
| lines.extend(header_lines) | |
| lines.append("") | |
| lines.append("## 1. Topic & Chapter Outline") | |
| lines.append("") | |
| lines.append(chapters) | |
| lines.append("") | |
| lines.append("## 2. Study & Interview Guidance") | |
| lines.append("") | |
| lines.append(study_notes) | |
| report_markdown = "\n".join(lines) | |
| root = _ensure_knowledge_root() | |
| youtube_root = root / "Youtube Video" | |
| youtube_root.mkdir(parents=True, exist_ok=True) | |
| slug = _slugify_name(result.get("url", youtube_url)) | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") | |
| report_path = youtube_root / f"{slug}-youtube-knowledge-{ts}.md" | |
| report_path.write_text(report_markdown, encoding="utf-8") | |
| rel_path = report_path.relative_to(Path(".")) | |
| return f"π YouTube knowledge transfer report written to `{rel_path}`." | |
| def _ensure_knowledge_root() -> Path: | |
| KNOWLEDGE_TRANSFER_ROOT.mkdir(parents=True, exist_ok=True) | |
| return KNOWLEDGE_TRANSFER_ROOT | |
| def _slugify_name(name: str) -> str: | |
| base = (name or "project").lower() | |
| safe = re.sub(r"[^a-z0-9-]+", "-", base).strip("-") | |
| return safe or "project" | |
| def generate_knowledge_transfer_report(state_payload: dict | None): | |
| analysis_data = (state_payload or {}).get("analysis") if state_payload else None | |
| if not analysis_data: | |
| return "β οΈ Run an analysis before generating a report." | |
| repo_name = analysis_data.get("repo_name") or "Project" | |
| repo_url = analysis_data.get("repo_url") or "local upload" | |
| docs = analysis_data.get("documentation") or [] | |
| doc_count = len(docs) | |
| structure = analysis_data.get("structure") or [] | |
| vector_dir = state_payload.get("vector_dir") if state_payload else "" | |
| vector_chunks = state_payload.get("vector_chunks", 0) if state_payload else 0 | |
| summary_base = state_payload.get("summary_base", []) if state_payload else [] | |
| processed_timestamp = state_payload.get("processed_timestamp") if state_payload else None | |
| generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") | |
| header_lines = [ | |
| f"# Knowledge Transfer Report β {repo_name}", | |
| "", | |
| f"- Source: {repo_url}", | |
| f"- Generated at: {generated_at}", | |
| f"- Documentation files: {doc_count}", | |
| f"- Vector chunks: {vector_chunks}", | |
| ] | |
| if processed_timestamp: | |
| header_lines.append(f"- Last analysis run: {processed_timestamp}") | |
| overview_section = "\n".join(summary_base) if summary_base else "No high-level summary was captured during analysis." | |
| llm_summary_section = "" | |
| if vector_dir and vector_chunks: | |
| try: | |
| question = ( | |
| "Provide a detailed knowledge transfer summary of this repository. " | |
| "Explain its purpose, main components, architecture, key dependencies, " | |
| "and patterns that would be reusable in other projects. " | |
| "Focus on actionable insights and how to extend or adapt this codebase." | |
| ) | |
| llm_summary_section = rag_answer_from_store(Path(vector_dir), question, repo_summary=overview_section) | |
| except Exception as err: | |
| llm_summary_section = f"LLM summary unavailable due to error: {err}" | |
| else: | |
| llm_summary_section = "Vector store not available; running RAG-based summary was skipped." | |
| max_items = 80 | |
| structure_snippet = "\n".join(structure[:max_items]) if structure else "No repository structure information was captured." | |
| doc_paths = [d.get("path", "") for d in docs][:max_items] | |
| docs_list_section = "\n".join(f"- {p}" for p in doc_paths) if doc_paths else "No documentation files were detected." | |
| lines: list[str] = [] | |
| lines.extend(header_lines) | |
| lines.append("") | |
| lines.append("## 1. High-level Overview") | |
| lines.append("") | |
| lines.append(overview_section) | |
| lines.append("") | |
| lines.append("## 2. Repository Layout (snapshot)") | |
| lines.append("") | |
| lines.append("```") | |
| lines.append(structure_snippet) | |
| lines.append("```") | |
| lines.append("") | |
| lines.append("## 3. Documentation Files") | |
| lines.append("") | |
| lines.append(docs_list_section) | |
| lines.append("") | |
| lines.append("## 4. LLM Knowledge Summary") | |
| lines.append("") | |
| lines.append(llm_summary_section) | |
| lines.append("") | |
| lines.append("## 5. Notes for Future Reuse") | |
| lines.append("") | |
| lines.append( | |
| "Use this report as a starting point when designing new projects. " | |
| "Focus on reusing architecture patterns, utility modules, and any " | |
| "documented best practices or workflows." | |
| ) | |
| report_markdown = "\n".join(lines) | |
| root = _ensure_knowledge_root() | |
| slug = _slugify_name(repo_name) | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") | |
| report_path = root / f"{slug}-knowledge-transfer-{ts}.md" | |
| report_path.write_text(report_markdown, encoding="utf-8") | |
| rel_path = report_path.relative_to(Path(".")) | |
| return f"π Knowledge transfer report written to `{rel_path}`." | |
| def _list_knowledge_report_choices() -> list[str]: | |
| root = _ensure_knowledge_root() | |
| reports = sorted(root.glob("*.md")) | |
| return [report.name for report in reports] | |
| def _refresh_lab_reports_dropdown(): | |
| return gr.Dropdown( | |
| label="Knowledge Transfer report (optional)", | |
| choices=_list_knowledge_report_choices(), | |
| value=None, | |
| interactive=True, | |
| ) | |
| def load_study_deck_from_report(report_name: str | None) -> str: | |
| if not report_name: | |
| return "Select a Knowledge Transfer report above to view its study deck-style summary." | |
| root = _ensure_knowledge_root() | |
| report_path = root / report_name | |
| if not report_path.exists(): | |
| return f"Report `{report_name}` was not found in the Knowledge Transfer folder." | |
| try: | |
| raw = report_path.read_text(encoding="utf-8", errors="ignore") | |
| except OSError as err: | |
| return f"Unable to read report `{report_name}`: {err}" | |
| max_chars = 6000 | |
| snippet = raw[:max_chars] | |
| if len(raw) > max_chars: | |
| snippet = snippet.rstrip() + "\n\n... (truncated)" | |
| return ( | |
| f"### Study Deck Β· {report_name}\n\n" | |
| "Scroll through this condensed report to refresh yourself on the key concepts, " | |
| "architecture, and reusable patterns for this project.\n\n" | |
| f"```markdown\n{snippet}\n```" | |
| ) | |
| def _derive_local_repo_root(uploaded: str | list[str] | None) -> Path | None: | |
| """Given a directory-style file upload, infer the repository root directory. | |
| Gradio's File component with ``file_count="directory"`` returns a list of | |
| filepaths under the uploaded folder (or a single filepath). We compute the | |
| common parent directory and treat that as the repo root. | |
| """ | |
| if not uploaded: | |
| return None | |
| if isinstance(uploaded, str): | |
| paths = [uploaded] | |
| else: | |
| paths = [p for p in uploaded if p] | |
| if not paths: | |
| return None | |
| try: | |
| common = os.path.commonpath(paths) | |
| except ValueError: | |
| return None | |
| root = Path(common) | |
| return root if root.exists() and root.is_dir() else None | |
| def run_local_repo_ingestion(uploaded_folder): | |
| """Analyze local repository folder, filtering irrelevant files.""" | |
| repo_root = _derive_local_repo_root(uploaded_folder) | |
| if not repo_root: | |
| warning = "β οΈ Upload a project folder before running analysis." | |
| source_info = "**Source:** Not selected\n**Status:** β³ Pending\n**Chunks:** 0 vectors" | |
| return warning, source_info, { | |
| "analysis": None, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "summary_base": [], | |
| } | |
| # Filter out irrelevant files before analysis | |
| if isinstance(uploaded_folder, list): | |
| filtered_files = [f for f in uploaded_folder if not _should_ignore_path(Path(f))] | |
| if not filtered_files: | |
| warning = "β οΈ No relevant files found after filtering." | |
| source_info = "**Source:** Local\n**Status:** β οΈ No files\n**Chunks:** 0 vectors" | |
| return warning, source_info, { | |
| "analysis": None, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "summary_base": [], | |
| } | |
| result = analyze_local_repo(str(repo_root)) | |
| if "error" in result: | |
| error_msg = f"β {result['error']}" | |
| source_info = f"**Source:** Local\n**Status:** β Error\n**Chunks:** 0 vectors" | |
| return error_msg, source_info, { | |
| "analysis": None, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "summary_base": [], | |
| } | |
| docs = result.get("documentation", []) | |
| repo_name = result.get("repo_name", repo_root.name) | |
| timestamp = datetime.now(timezone.utc).strftime("%d %b %Y, %H:%M UTC") | |
| # Source info for preview panel | |
| source_info = f"""**Source:** Local Project | |
| **Folder:** {repo_name} | |
| **Status:** β Analyzed | |
| **Documents:** {len(docs)} files | |
| **Analyzed:** {timestamp}""" | |
| # Document preview content | |
| preview_sections = [] | |
| for doc in docs[:5]: | |
| content = (doc.get("content") or "").strip() | |
| if not content: | |
| continue | |
| snippet = content[:600] | |
| if len(content) > 600: | |
| snippet = snippet.rstrip() + " ..." | |
| preview_sections.append( | |
| f"### π {doc.get('path', 'document')}\n\n{snippet}" | |
| ) | |
| preview_content = ( | |
| "\n\n---\n\n".join(preview_sections) | |
| if preview_sections | |
| else "*No textual documentation snippets were found.*" | |
| ) | |
| state_payload = { | |
| "analysis": result, | |
| "vector_dir": "", | |
| "vector_chunks": 0, | |
| "summary_base": [repo_name, str(len(docs))], | |
| "processed_timestamp": timestamp, | |
| "indexed": False, | |
| } | |
| return preview_content, source_info, state_payload | |
| def index_local_repo(state_payload: dict | None): | |
| """Index the analyzed local repository for RAG.""" | |
| return index_github_repo(state_payload) # Same logic | |
| def bookmark_local_repo(state_payload: dict | None): | |
| """Bookmark and index the local repository permanently.""" | |
| return bookmark_github_repo(state_payload) # Same logic | |
| def _format_bookmark_info(metadata: dict | None) -> str: | |
| if not metadata: | |
| return ( | |
| "No bookmarks yet. Process a repository in the *Process New Repository* tab, then bookmark it to enable RAG chat." | |
| ) | |
| preview = (metadata.get("summary_preview") or "").strip() | |
| if preview: | |
| max_len = 600 | |
| if len(preview) > max_len: | |
| preview_display = preview[:max_len].rstrip() + " ..." | |
| else: | |
| preview_display = preview | |
| return ( | |
| f"### {metadata.get('repo_name', 'Saved Repository')}\n" | |
| f"- URL: {metadata.get('repo_url', 'N/A')}\n" | |
| f"- Last pulled: {metadata.get('last_pulled_display', '--/--/----')}\n" | |
| f"- Documentation files: {metadata.get('docs_count', 0)}\n" | |
| f"- Vector chunks: {metadata.get('vector_chunks', 0)}\n\n" | |
| f"**Preview:**\n\n{preview_display}" | |
| ) | |
| return ( | |
| f"### {metadata.get('repo_name', 'Saved Repository')}\n" | |
| f"- URL: {metadata.get('repo_url', 'N/A')}\n" | |
| f"- Last pulled: {metadata.get('last_pulled_display', '--/--/----')}\n" | |
| f"- Documentation files: {metadata.get('docs_count', 0)}\n" | |
| f"- Vector chunks: {metadata.get('vector_chunks', 0)}" | |
| ) | |
| def _refresh_bookmarks(preselect: str | None = None): | |
| choices, metadata_list = get_dropdown_options() | |
| value = preselect if preselect and preselect in choices else (choices[0] if choices else None) | |
| dropdown_update = gr.Dropdown( | |
| choices=choices, | |
| value=value, | |
| interactive=bool(choices), | |
| label="Bookmarked repositories", | |
| allow_custom_value=True, | |
| ) | |
| info = _format_bookmark_info( | |
| find_metadata_by_label(value, metadata_list) if value else None | |
| ) | |
| return dropdown_update, metadata_list, info | |
| def load_bookmarks_on_start(): | |
| dropdown_update, metadata_list, info = _refresh_bookmarks() | |
| status = "Bookmarks loaded." if metadata_list else "No bookmarks saved yet." | |
| return dropdown_update, metadata_list, info, status | |
| def _build_summary_from_base(base_lines: list[str], final_message: str) -> str: | |
| if not base_lines: | |
| return final_message | |
| return "\n".join(base_lines + ["", final_message]) | |
| def bookmark_current_repo(state_payload: dict | None): | |
| analysis_data = (state_payload or {}).get("analysis") if state_payload else None | |
| if not analysis_data or not analysis_data.get("documentation"): | |
| return ( | |
| "β οΈ Run an analysis before bookmarking a repository.", | |
| gr.Dropdown(choices=[], value=None, interactive=False, label="Bookmarked repositories"), | |
| [], | |
| _format_bookmark_info(None), | |
| _build_summary_from_base( | |
| (state_payload or {}).get("summary_base", []), | |
| "β οΈ Bookmark failed because no analysis data is available.", | |
| ), | |
| state_payload, | |
| ) | |
| repo_url = analysis_data.get("repo_url") or analysis_data.get("repo_name") | |
| vector_dir = state_payload.get("vector_dir") if state_payload else "" | |
| metadata = bookmark_repo_from_analysis( | |
| repo_url, | |
| analysis_data, | |
| prebuilt_vector_dir=Path(vector_dir) if vector_dir else None, | |
| prebuilt_chunks=state_payload.get("vector_chunks") if state_payload else None, | |
| ) | |
| dropdown_update, metadata_list, info = _refresh_bookmarks(preselect=metadata.dropdown_label) | |
| saved_msg = ( | |
| f"πΎ Repo saved on {metadata.last_pulled_display}. Access it via the Bookmarked tab for RAG chat." | |
| ) | |
| updated_summary = _build_summary_from_base( | |
| state_payload.get("summary_base", []), | |
| saved_msg, | |
| ) | |
| new_state = { | |
| **(state_payload or {}), | |
| "summary_base": state_payload.get("summary_base", []), | |
| "saved": True, | |
| } | |
| return saved_msg, dropdown_update, metadata_list, info, updated_summary, new_state | |
| def update_selected_bookmark(label: str, metadata_list: list[dict]): | |
| metadata = find_metadata_by_label(label, metadata_list or []) if label else None | |
| return _format_bookmark_info(metadata) | |
| def answer_bookmark_question(label: str, question: str, metadata_list: list[dict]): | |
| if not label: | |
| return "Select a bookmarked repository before asking a question." | |
| if not question.strip(): | |
| return "Enter a question to query your bookmarked repository." | |
| metadata = find_metadata_by_label(label, metadata_list or []) | |
| if not metadata: | |
| return "Bookmark metadata not found. Try refreshing bookmarks." | |
| if metadata.get("vector_chunks", 0) == 0: | |
| return "This bookmark has no vector store yet. Re-bookmark the repo to rebuild embeddings." | |
| summary = ( | |
| f"Repository: {metadata.get('repo_name', label)}\n" | |
| f"Docs: {metadata.get('docs_count', 0)} | Last pulled: {metadata.get('last_pulled_display', '--/--/----')}" | |
| ) | |
| answer = rag_answer_from_store(Path(metadata["vector_dir"]), question, repo_summary=summary) | |
| return answer | |
| def placeholder_action_message(label: str): | |
| if not label: | |
| return "Select a bookmarked repository to use this action." | |
| return f"Additional bookmark actions for **{label}** are coming soon." | |
| def run_experimental_lab(intention: str, report_name: str | None): | |
| text = (intention or "").strip() | |
| if not text: | |
| return "Describe what you want to build in the Experimental Lab to get started." | |
| root = _ensure_knowledge_root() | |
| report_markdown = "" | |
| context_note = "" | |
| if report_name: | |
| report_path = root / report_name | |
| if report_path.exists(): | |
| try: | |
| raw = report_path.read_text(encoding="utf-8", errors="ignore") | |
| report_markdown = raw | |
| snippet = raw[:3000] | |
| context_note = ( | |
| f"Using Knowledge Transfer report: `{report_name}` as reference.\n\n" | |
| f"Snippet from report (truncated):\n\n```markdown\n{snippet}\n```\n\n" | |
| ) | |
| except OSError: | |
| context_note = f"Unable to read Knowledge Transfer report `{report_name}`. Proceeding without embedded context.\n\n" | |
| build_result = build_experiment_from_report(text, report_markdown) | |
| code = build_result.get("code", "") | |
| stdout = build_result.get("stdout", "") | |
| error = build_result.get("error", "") | |
| base_intro = ( | |
| "Experimental Lab is a sandbox where future versions of MonkeyMind will use " | |
| "Knowledge Transfer reports as context to plan and build small Gradio apps.\n\n" | |
| ) | |
| code_section = "### Generated experiment code\n\n" | |
| if code: | |
| code_section += f"```python\n{code}\n```\n\n" | |
| else: | |
| code_section += "No code was generated.\n\n" | |
| results_section = "### Sandbox output\n\n" | |
| if stdout: | |
| results_section += f"**Stdout / logs:**\n\n```text\n{stdout}\n```\n\n" | |
| if error: | |
| results_section += f"**Error:**\n\n```text\n{error}\n```\n\n" | |
| if not stdout and not error: | |
| results_section += "No errors encountered during sandbox test.\n\n" | |
| return ( | |
| base_intro | |
| + context_note | |
| + f"You wrote:\n\n> {text}\n\n" | |
| + code_section | |
| + results_section | |
| ) | |
| def lab_fix_bugs(intention: str, report_name: str | None): | |
| base = run_experimental_lab(intention, report_name) | |
| return ( | |
| base | |
| + "\n\n---\n\n_This Fix bugs action will eventually trigger another build iteration to resolve errors in the generated app. " | |
| "For now, it simply records another planning pass based on your intention and chosen report._" | |
| ) | |
| def lab_mark_happy(intention: str, report_name: str | None): | |
| text = (intention or "").strip() | |
| return ( | |
| "Marking this experiment as complete.\n\n" | |
| f"Final intention:\n\n> {text or 'N/A'}\n\n" | |
| "You can now export or reuse this idea elsewhere. Future versions will attach concrete code artifacts here." | |
| ) | |
| def lab_export_project(intention: str, report_name: str | None): | |
| text = (intention or "").strip() | |
| return ( | |
| "Export placeholder: a future version will bundle generated code, configuration, and a short README " | |
| "into a downloadable package.\n\n" | |
| f"Current experiment description:\n\n> {text or 'N/A'}\n\n" | |
| f"Reference report: `{report_name or 'none selected'}`." | |
| ) | |
| def answer_chat_question(question: str, github_state, local_state, youtube_state, selected_bookmark, metadata_list): | |
| """Answer questions using RAG from any indexed source or bookmark.""" | |
| if not question.strip(): | |
| return "Please enter a question." | |
| # Check if using a bookmark | |
| if selected_bookmark: | |
| metadata = find_metadata_by_label(selected_bookmark, metadata_list or []) | |
| if metadata and metadata.get("vector_chunks", 0) > 0: | |
| summary = ( | |
| f"Repository: {metadata.get('repo_name', selected_bookmark)}\n" | |
| f"Last pulled: {metadata.get('last_pulled_display', '--/--/----')}" | |
| ) | |
| answer = rag_answer_from_store(Path(metadata["vector_dir"]), question, repo_summary=summary) | |
| return f"**[Bookmark: {selected_bookmark}]**\n\n{answer}" | |
| # Check current session sources | |
| for state, label in [ | |
| (github_state, "GitHub"), | |
| (local_state, "Local"), | |
| (youtube_state, "YouTube"), | |
| ]: | |
| if state and state.get("indexed") and state.get("vector_dir"): | |
| vector_dir = Path(state["vector_dir"]) | |
| if vector_dir.exists(): | |
| answer = rag_answer_from_store(vector_dir, question) | |
| return f"**[{label} Source]**\n\n{answer}" | |
| return "β οΈ No indexed sources available. Please index a repository or select a bookmark first." | |
| def generate_and_download_report(state_payload: dict | None, source_type: str): | |
| """Generate markdown report and return file path for download.""" | |
| analysis_data = (state_payload or {}).get("analysis") if state_payload else None | |
| if not analysis_data: | |
| return None | |
| if source_type == "youtube": | |
| transcript = analysis_data.get("transcript", "") | |
| chapters = analysis_data.get("chapters", "") | |
| url = analysis_data.get("url", "youtube") | |
| lang = analysis_data.get("lang", "en") | |
| study_notes = generate_youtube_study_notes(chapters, url=url) | |
| generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") | |
| lines = [ | |
| "# YouTube Knowledge Transfer Report", | |
| "", | |
| f"- Source: {url}", | |
| f"- Language: {lang}", | |
| f"- Generated at: {generated_at}", | |
| "", | |
| "## 1. Chapter Outline", | |
| "", | |
| chapters, | |
| "", | |
| "## 2. Study Notes", | |
| "", | |
| study_notes, | |
| ] | |
| else: | |
| # GitHub or Local repo | |
| repo_name = analysis_data.get("repo_name", "Project") | |
| repo_url = analysis_data.get("repo_url", "local") | |
| docs = analysis_data.get("documentation", []) | |
| generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S %Z") | |
| lines = [ | |
| f"# Knowledge Transfer Report β {repo_name}", | |
| "", | |
| f"- Source: {repo_url}", | |
| f"- Generated at: {generated_at}", | |
| f"- Documentation files: {len(docs)}", | |
| "", | |
| "## Documentation Files", | |
| "", | |
| ] | |
| for doc in docs[:50]: | |
| lines.append(f"- {doc.get('path', 'unknown')}") | |
| report_markdown = "\n".join(lines) | |
| root = _ensure_knowledge_root() | |
| slug = _slugify_name(analysis_data.get("repo_name", "project")) | |
| ts = datetime.now(timezone.utc).strftime("%Y%m%d-%H%M%S") | |
| report_path = root / f"{slug}-report-{ts}.md" | |
| report_path.write_text(report_markdown, encoding="utf-8") | |
| return str(report_path) | |
| def refresh_bookmarks_dropdown(): | |
| """Refresh the bookmarks dropdown.""" | |
| choices, metadata_list = get_dropdown_options() | |
| return gr.Dropdown(choices=choices, value=None, interactive=True), metadata_list | |
| def build_interface() -> tuple[gr.Blocks, gr.Theme | None, str | None]: | |
| """Build the Gradio interface with improved UI/UX inspired by modern dashboard design.""" | |
| custom_css = """ | |
| @import url('https://fonts.googleapis.com/css2?family=Outfit:wght@300;400;500;600;700&family=JetBrains+Mono:wght@400;500&display=swap'); | |
| :root { | |
| --primary: #34d399; /* Mint Green */ | |
| --primary-glow: rgba(52, 211, 153, 0.4); | |
| --glass-bg: rgba(15, 23, 42, 0.6); | |
| --glass-border: rgba(255, 255, 255, 0.08); | |
| --text-main: #f8fafc; | |
| --text-muted: #94a3b8; | |
| } | |
| body { | |
| background-color: #0f172a; | |
| color: var(--text-main); | |
| font-family: 'Outfit', sans-serif !important; | |
| } | |
| /* Global container override */ | |
| .gradio-container { | |
| max-width: 1400px !important; | |
| background: #0f172a !important; | |
| background-image: | |
| radial-gradient(circle at 0% 0%, rgba(52, 211, 153, 0.15) 0%, transparent 50%), | |
| radial-gradient(circle at 100% 100%, rgba(16, 185, 129, 0.1) 0%, transparent 50%) !important; | |
| border: none !important; | |
| } | |
| /* Header styling */ | |
| .header-container { | |
| background: rgba(15, 23, 42, 0.8); | |
| backdrop-filter: blur(12px); | |
| border-bottom: 1px solid var(--glass-border); | |
| padding: 20px 24px; | |
| margin: -16px -16px 24px -16px; | |
| border-radius: 0; | |
| } | |
| /* Card/Panel styling */ | |
| .source-card, .gradio-group, .tabs, .tabitem, .box-container { | |
| background: var(--glass-bg) !important; | |
| backdrop-filter: blur(12px); | |
| border: 1px solid var(--glass-border) !important; | |
| border-radius: 16px !important; | |
| box-shadow: 0 4px 20px rgba(0, 0, 0, 0.2); | |
| padding: 20px !important; | |
| margin-bottom: 20px !important; | |
| } | |
| /* Inputs and Textareas */ | |
| input, textarea, .gr-input, .gr-box, .dropdown-wrap { | |
| background-color: rgba(30, 41, 59, 0.6) !important; | |
| border: 1px solid rgba(255, 255, 255, 0.1) !important; | |
| color: var(--text-main) !important; | |
| border-radius: 10px !important; | |
| } | |
| input:focus, textarea:focus { | |
| border-color: var(--primary) !important; | |
| box-shadow: 0 0 0 2px var(--primary-glow) !important; | |
| } | |
| /* Buttons */ | |
| button.primary { | |
| background: linear-gradient(135deg, #34d399 0%, #10b981 100%) !important; | |
| color: #0f172a !important; | |
| font-weight: 600 !important; | |
| border: none !important; | |
| box-shadow: 0 4px 15px rgba(52, 211, 153, 0.3) !important; | |
| } | |
| button.secondary { | |
| background: rgba(30, 41, 59, 0.8) !important; | |
| border: 1px solid rgba(255, 255, 255, 0.1) !important; | |
| color: var(--text-muted) !important; | |
| } | |
| button.secondary:hover { | |
| color: var(--text-main) !important; | |
| border-color: var(--primary) !important; | |
| } | |
| button.stop { | |
| background: linear-gradient(135deg, #f87171 0%, #ef4444 100%) !important; | |
| color: white !important; | |
| } | |
| /* Status indicators */ | |
| .status-ready { | |
| background: rgba(16, 185, 129, 0.1); | |
| color: #34d399; | |
| padding: 6px 12px; | |
| border-radius: 20px; | |
| font-size: 12px; | |
| border: 1px solid rgba(16, 185, 129, 0.2); | |
| } | |
| .status-pending { | |
| background: rgba(251, 191, 36, 0.1); | |
| color: #fbbf24; | |
| padding: 6px 12px; | |
| border-radius: 20px; | |
| font-size: 12px; | |
| border: 1px solid rgba(251, 191, 36, 0.2); | |
| } | |
| /* Keep copy/like buttons always visible and styled */ | |
| .message-buttons { | |
| opacity: 1 !important; | |
| display: flex !important; | |
| gap: 4px !important; | |
| } | |
| .message-buttons button { | |
| color: #94a3b8 !important; /* text-muted */ | |
| background: transparent !important; | |
| border: none !important; | |
| box-shadow: none !important; | |
| } | |
| .message-buttons button:hover { | |
| color: #34d399 !important; /* primary */ | |
| background: rgba(52, 211, 153, 0.1) !important; | |
| } | |
| /* Chat bubbles */ | |
| .chat-assistant { | |
| background: rgba(30, 41, 59, 0.8) !important; | |
| border: 1px solid var(--glass-border); | |
| border-radius: 18px 18px 18px 4px !important; | |
| color: var(--text-main) !important; | |
| } | |
| .chat-user { | |
| background: linear-gradient(135deg, #34d399 0%, #10b981 100%) !important; | |
| color: #022c22 !important; | |
| border-radius: 18px 18px 4px 18px !important; | |
| font-weight: 500; | |
| } | |
| /* Chat message text size and font */ | |
| .message-wrap .message { | |
| font-size: 0.9rem !important; | |
| font-family: 'JetBrains Mono', monospace !important; | |
| line-height: 1.5 !important; | |
| } | |
| /* Typography overrides */ | |
| .prose, .prose h1, .prose h2, .prose h3, .prose p, .prose strong { | |
| color: var(--text-main) !important; | |
| } | |
| /* Scrollbar */ | |
| ::-webkit-scrollbar { width: 6px; height: 6px; } | |
| ::-webkit-scrollbar-track { background: transparent; } | |
| ::-webkit-scrollbar-thumb { background: #334155; border-radius: 10px; } | |
| ::-webkit-scrollbar-thumb:hover { background: #475569; } | |
| """ | |
| app_theme = gr.themes.Soft( | |
| primary_hue="emerald", | |
| secondary_hue="slate", | |
| neutral_hue="slate", | |
| ).set( | |
| body_background_fill="#0f172a", | |
| block_background_fill="#1e293b", | |
| block_border_color="rgba(255,255,255,0.1)", | |
| input_background_fill="#0f172a", | |
| button_primary_background_fill="#34d399", | |
| button_primary_text_color="#0f172a", | |
| ) | |
| blocks_kwargs = {"title": "π MonkeyMind - Knowledge Transfer Agent"} | |
| if not IS_GRADIO_V6: | |
| blocks_kwargs.update(theme=app_theme, css=custom_css) | |
| with gr.Blocks(**blocks_kwargs) as demo: | |
| # State variables | |
| github_state = gr.State({}) | |
| local_state = gr.State({}) | |
| youtube_state = gr.State({}) | |
| bookmarks_metadata = gr.State([]) | |
| chat_history = gr.State([]) | |
| notepad_content = gr.State("") # For notepad feature | |
| copied_texts = gr.State(set()) # Track copied texts to prevent duplicates | |
| # ===== HEADER ===== | |
| with gr.Row(elem_classes=["header-container"]): | |
| gr.HTML(""" | |
| <div style="display: flex; align-items: center; gap: 16px;"> | |
| <div style="width: 48px; height: 48px; border-radius: 12px; background: linear-gradient(135deg, #34d399 0%, #10b981 100%); display: flex; align-items: center; justify-content: center; box-shadow: 0 0 20px rgba(52, 211, 153, 0.3);"> | |
| <span style="font-size: 28px;">π</span> | |
| </div> | |
| <div> | |
| <h1 style="margin: 0; font-size: 1.5rem; font-weight: 700; color: #f8fafc; letter-spacing: -0.5px;"> | |
| MonkeyMind | |
| </h1> | |
| <div style="display: flex; align-items: center; gap: 8px;"> | |
| <span style="width: 8px; height: 8px; border-radius: 50%; background: #34d399; box-shadow: 0 0 10px #34d399;"></span> | |
| <p style="margin: 0; font-size: 0.8rem; color: #94a3b8; font-family: 'JetBrains Mono', monospace;">AGENT ACTIVE</p> | |
| </div> | |
| </div> | |
| </div> | |
| """) | |
| # ===== APP DESCRIPTION ===== | |
| gr.Markdown(""" | |
| > **π MonkeyMind** is your Knowledge Transfer & RAG Agent. Analyze GitHub repos, local projects, or YouTube videos | |
| > to build a personal knowledge base. Chat with your sources using AI-powered retrieval. | |
| > | |
| > **Quick Start:** Paste a GitHub URL β Click **Analyze** β Click **Index** β Start chatting! | |
| > Bookmark important sources to access them anytime. | |
| """) | |
| # ===== MAIN LAYOUT: 1/3 Left Panel + 2/3 Right Panel ===== | |
| with gr.Row(): | |
| # ===== LEFT PANEL: Data Sources + Bookmarks ===== | |
| with gr.Column(scale=1, min_width=350): | |
| # Data Sources Section | |
| gr.Markdown("### π‘ Data Ingestion") | |
| # Source Type Selector (pill-style tabs) | |
| source_type = gr.Radio( | |
| choices=["π GitHub", "π Local", "π₯ YouTube"], | |
| value="π GitHub", | |
| label="", | |
| container=False, | |
| interactive=True | |
| ) | |
| # Dynamic source input based on selection | |
| with gr.Group(visible=True) as github_group: | |
| gr.Markdown("*Paste a GitHub repo URL and click **Analyze** to extract docs. Then **Index** to enable RAG chat. You can see retrieved contents in the **Preview** tab on the right.*") | |
| github_url = gr.Textbox( | |
| label="Repository URL", | |
| placeholder="https://github.com/username/repo", | |
| show_label=True, | |
| ) | |
| github_analyze_btn = gr.Button("π Analyze Repository", variant="secondary") | |
| github_index_btn = gr.Button("β‘ Index for RAG", variant="primary") | |
| github_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"]) | |
| github_bookmark_btn = gr.Button("π Bookmark & Save to Knowledge Base", variant="secondary") | |
| gr.Markdown("---") | |
| gr.Markdown("**π Notepad** *(copy useful info here)*") | |
| github_notepad = gr.Textbox( | |
| label="", | |
| placeholder="Paste or write notes here...", | |
| lines=3, | |
| container=False, | |
| show_copy_button=True, | |
| ) | |
| with gr.Row(): | |
| github_notepad_download = gr.DownloadButton("π₯ Download as .md", variant="secondary", size="sm") | |
| with gr.Group(visible=False) as local_group: | |
| gr.Markdown("*Upload a project folder. Irrelevant files are auto-filtered. You can see retrieved contents in the **Preview** tab on the right.*") | |
| local_folder = gr.File( | |
| label="Upload Project Folder", | |
| file_count="directory", | |
| type="filepath", | |
| ) | |
| local_analyze_btn = gr.Button("π Analyze Project", variant="secondary") | |
| local_index_btn = gr.Button("β‘ Index for RAG", variant="primary") | |
| local_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"]) | |
| local_bookmark_btn = gr.Button("π Bookmark & Save to Knowledge Base", variant="secondary") | |
| gr.Markdown("---") | |
| gr.Markdown("**π Notepad** *(copy useful info here)*") | |
| local_notepad = gr.Textbox( | |
| label="", | |
| placeholder="Paste or write notes here...", | |
| lines=3, | |
| container=False, | |
| show_copy_button=True, | |
| ) | |
| with gr.Row(): | |
| local_notepad_download = gr.DownloadButton("π₯ Download as .md", variant="secondary", size="sm") | |
| with gr.Group(visible=False) as youtube_group: | |
| gr.Markdown("*Paste a YouTube video URL to extract and analyze the transcript. You can see retrieved contents in the **Preview** tab on the right.*") | |
| youtube_url = gr.Textbox( | |
| label="Video URL", | |
| placeholder="https://www.youtube.com/watch?v=...", | |
| ) | |
| youtube_analyze_btn = gr.Button("π Analyze Video", variant="secondary") | |
| youtube_index_btn = gr.Button("β‘ Index for RAG", variant="primary") | |
| youtube_status = gr.Markdown("Ready to analyze.", elem_classes=["status-pending"]) | |
| youtube_bookmark_btn = gr.Button("π Bookmark & Save to Knowledge Base", variant="secondary") | |
| gr.Markdown("---") | |
| gr.Markdown("**π Notepad** *(copy useful info here)*") | |
| youtube_notepad = gr.Textbox( | |
| label="", | |
| placeholder="Paste or write notes here...", | |
| lines=3, | |
| container=False, | |
| show_copy_button=True, | |
| ) | |
| with gr.Row(): | |
| youtube_notepad_download = gr.DownloadButton("π₯ Download as .md", variant="secondary", size="sm") | |
| # Source type switching logic | |
| def switch_source(choice): | |
| return ( | |
| gr.Group(visible=("GitHub" in choice)), | |
| gr.Group(visible=("Local" in choice)), | |
| gr.Group(visible=("YouTube" in choice)), | |
| ) | |
| source_type.change( | |
| fn=switch_source, | |
| inputs=[source_type], | |
| outputs=[github_group, local_group, youtube_group], | |
| ) | |
| gr.Markdown("---") | |
| # Bookmarks Quick Access Section | |
| gr.Markdown("### π§ Knowledge Base") | |
| gr.Markdown("*Select sources to use for chat. Multiple selections allowed.*") | |
| # Use CheckboxGroup for multi-select | |
| bookmarks_checkboxes = gr.CheckboxGroup( | |
| label="Active Sources", | |
| choices=[], | |
| value=[], | |
| interactive=True, | |
| info="Check sources to include in chat context" | |
| ) | |
| # Keep dropdown for compatibility (hidden, used internally) | |
| bookmarks_dropdown = gr.Dropdown( | |
| label="", | |
| choices=[], | |
| value=None, | |
| interactive=True, | |
| visible=False, | |
| ) | |
| with gr.Row(): | |
| refresh_bookmarks_btn = gr.Button("π Refresh", variant="secondary", size="sm", scale=1) | |
| view_all_btn = gr.Button("π View All", variant="secondary", size="sm", scale=1) | |
| # Bookmark info display | |
| bookmark_info = gr.Markdown( | |
| value="*No sources selected. Bookmark repos to add them here.*", | |
| elem_classes=["info-box"] | |
| ) | |
| # ===== RIGHT PANEL: Chat & Preview ===== | |
| with gr.Column(scale=2, min_width=500): | |
| # Chat vs Preview Toggle | |
| right_panel_mode = gr.Radio( | |
| choices=["π¬ Chat & RAG", "π Preview"], | |
| value="π¬ Chat & RAG", | |
| label="", | |
| container=False, | |
| ) | |
| # Chat Interface | |
| with gr.Group(visible=True) as chat_panel: | |
| with gr.Group(elem_classes=["box-container"]): | |
| gr.Markdown("### π Knowledge Assistant") | |
| gr.Markdown("*Hover over messages to see copy icon. Use toggles below to augment with web/wiki search.*") | |
| chatbot_kwargs = dict( | |
| value=[], | |
| height=450, | |
| show_label=False, | |
| avatar_images=["images/user.png", "images/monkey.png"], | |
| elem_classes=["chat-container"], | |
| type="messages", | |
| ) | |
| chatbot = gr.Chatbot(**chatbot_kwargs) | |
| # Toolbar row: Web Search, Wiki Search (Clear Chat removed) | |
| with gr.Row(): | |
| # clear_chat_btn removed as requested | |
| web_search_toggle = gr.Checkbox( | |
| label="π Web Search", | |
| value=False, | |
| interactive=True, | |
| ) | |
| wiki_search_toggle = gr.Checkbox( | |
| label="π Wikipedia", | |
| value=False, | |
| interactive=True, | |
| ) | |
| with gr.Row(): | |
| question_box = gr.Textbox( | |
| label="", | |
| placeholder="Ask anything about your indexed sources...", | |
| lines=1, | |
| scale=5, | |
| container=False, | |
| ) | |
| send_btn = gr.Button("Send", variant="primary", scale=1) | |
| gr.Examples( | |
| examples=[ | |
| "Summarize the main architecture patterns", | |
| "What are the key dependencies?", | |
| "Explain the core functionality", | |
| "What patterns can I reuse?", | |
| ], | |
| inputs=question_box, | |
| label="Quick Questions" | |
| ) | |
| # ===== EXPERIMENTAL LAB (Collapsible) - Moved here ===== | |
| with gr.Group(elem_classes=["box-container"]): | |
| gr.Markdown("### π§ͺ Experimental Lab") | |
| gr.Markdown("Use this lab to prototype small apps based on your knowledge base.") | |
| with gr.Accordion("Open Lab", open=False): | |
| gr.Markdown("*The agent generates Gradio code and tests it in a sandbox.*") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π― Build an Experiment") | |
| lab_intention = gr.Textbox( | |
| label="What do you want to build?", | |
| placeholder="e.g., A Gradio app that visualizes patterns from a Knowledge Transfer report.", | |
| lines=3, | |
| ) | |
| lab_report_dropdown = gr.Dropdown( | |
| label="Reference Material (bookmarked report)", | |
| choices=_list_knowledge_report_choices(), | |
| value=None, | |
| interactive=True, | |
| info="Select a Knowledge Transfer report as context" | |
| ) | |
| with gr.Row(): | |
| lab_refresh_reports_btn = gr.Button("π Refresh", variant="secondary", size="sm") | |
| lab_start_btn = gr.Button("βΆοΈ Start Build", variant="primary", size="sm") | |
| gr.Markdown("---") | |
| gr.Markdown("**π§ Fix Issues**") | |
| lab_fix_instruction = gr.Textbox( | |
| label="", | |
| placeholder="Describe what needs to be fixed (e.g., 'The button click handler is not working')...", | |
| lines=2, | |
| container=False, | |
| ) | |
| with gr.Row(): | |
| lab_fix_btn = gr.Button("π§ Apply Fix", variant="secondary", size="sm") | |
| lab_happy_btn = gr.Button("β Done", variant="secondary", size="sm") | |
| lab_export_btn = gr.Button("π₯ Export Code", variant="primary") | |
| lab_download = gr.File(label="Download", visible=False) | |
| with gr.Column(scale=2): | |
| gr.Markdown("### π¬ Experiment Output") | |
| lab_output = gr.Markdown( | |
| "Describe what you want to build, select a reference report from your bookmarks, then click **Start Build**.\n\n" | |
| "If there are errors, describe the issue in the fix box and click **Apply Fix**." | |
| ) | |
| lab_code_display = gr.Code( | |
| label="Generated Code", | |
| language="python", | |
| visible=False, | |
| ) | |
| # Preview Interface | |
| with gr.Group(visible=False) as preview_panel: | |
| gr.Markdown("### π Document Preview") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| preview_source_info = gr.Markdown(""" | |
| **Source:** Not selected | |
| **Status:** β³ Pending | |
| **Chunks:** 0 vectors | |
| """, elem_classes=["info-box"]) | |
| with gr.Column(scale=1, min_width=100): | |
| with gr.Row(): | |
| preview_copy_btn = gr.Button("π Copy", variant="secondary", size="sm", scale=1) | |
| preview_download_btn = gr.DownloadButton("π₯ Download", variant="secondary", size="sm", scale=1) | |
| preview_download_file = gr.File(visible=False) | |
| preview_content = gr.Markdown( | |
| value="Select a source and analyze it to see the preview here.", | |
| elem_classes=["preview-card"] | |
| ) | |
| # Panel switching logic | |
| def switch_panel(choice): | |
| return ( | |
| gr.Group(visible=("Chat" in choice)), | |
| gr.Group(visible=("Preview" in choice)), | |
| ) | |
| right_panel_mode.change( | |
| fn=switch_panel, | |
| inputs=[right_panel_mode], | |
| outputs=[chat_panel, preview_panel], | |
| ) | |
| gr.Markdown("---") | |
| # ===== HIDDEN STATE COMPONENTS FOR DOWNLOADS ===== | |
| github_download = gr.File(label="Download", visible=False) | |
| local_download = gr.File(label="Download", visible=False) | |
| youtube_download = gr.File(label="Download", visible=False) | |
| # ===== EVENT HANDLERS ===== | |
| # GitHub handlers | |
| github_analyze_btn.click( | |
| fn=lambda: "β³ **Analyzing repository...** Please wait.", | |
| outputs=[github_status], | |
| ).then( | |
| fn=run_github_ingestion, | |
| inputs=[github_url], | |
| outputs=[preview_content, preview_source_info, github_state], | |
| ).then( | |
| fn=lambda: "β **Analysis complete!** Click **Index for RAG** to enable chat.", | |
| outputs=[github_status], | |
| ) | |
| def index_github_with_status(state): | |
| status, new_state = index_github_repo(state) | |
| chunks = new_state.get("vector_chunks", 0) if new_state else 0 | |
| if "β " in status or chunks > 0: | |
| return f"β **Indexed {chunks} vector chunks.** Ready for RAG queries!", new_state | |
| return status, new_state | |
| def index_local_with_status(state): | |
| status, new_state = index_local_repo(state) | |
| chunks = new_state.get("vector_chunks", 0) if new_state else 0 | |
| if "β " in status or chunks > 0: | |
| return f"β **Indexed {chunks} vector chunks.** Ready for RAG queries!", new_state | |
| return status, new_state | |
| def index_youtube_with_status(state): | |
| status, new_state = index_youtube_video(state) | |
| chunks = new_state.get("vector_chunks", 0) if new_state else 0 | |
| if "β " in status or chunks > 0: | |
| return f"β **Indexed {chunks} transcript chunks.** Ready for RAG queries!", new_state | |
| return status, new_state | |
| github_index_btn.click( | |
| fn=lambda: "β³ **Indexing...** Building vector embeddings.", | |
| outputs=[github_status], | |
| ).then( | |
| fn=index_github_with_status, | |
| inputs=[github_state], | |
| outputs=[github_status, github_state], | |
| ) | |
| def bookmark_with_refresh(state): | |
| status, new_state, dropdown = bookmark_github_repo(state) | |
| choices, meta = get_dropdown_options() | |
| return ( | |
| "π **Bookmarked!** Added to Knowledge Base for future chat sessions.", | |
| new_state, | |
| gr.Dropdown(choices=choices, value=None), | |
| gr.CheckboxGroup(choices=choices, value=[]), | |
| meta | |
| ) | |
| github_bookmark_btn.click( | |
| fn=bookmark_with_refresh, | |
| inputs=[github_state], | |
| outputs=[github_status, github_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], | |
| ) | |
| # Local handlers | |
| local_analyze_btn.click( | |
| fn=lambda: "β³ **Analyzing project...** Please wait.", | |
| outputs=[local_status], | |
| ).then( | |
| fn=run_local_repo_ingestion, | |
| inputs=[local_folder], | |
| outputs=[preview_content, preview_source_info, local_state], | |
| ).then( | |
| fn=lambda: "β **Analysis complete!** Click **Index for RAG** to enable chat.", | |
| outputs=[local_status], | |
| ) | |
| local_index_btn.click( | |
| fn=lambda: "β³ **Indexing...** Building vector embeddings.", | |
| outputs=[local_status], | |
| ).then( | |
| fn=index_local_with_status, | |
| inputs=[local_state], | |
| outputs=[local_status, local_state], | |
| ) | |
| def bookmark_local_with_refresh(state): | |
| status, new_state, dropdown = bookmark_local_repo(state) | |
| choices, meta = get_dropdown_options() | |
| return ( | |
| "π **Bookmarked!** Added to Knowledge Base for future chat sessions.", | |
| new_state, | |
| gr.Dropdown(choices=choices, value=None), | |
| gr.CheckboxGroup(choices=choices, value=[]), | |
| meta | |
| ) | |
| local_bookmark_btn.click( | |
| fn=bookmark_local_with_refresh, | |
| inputs=[local_state], | |
| outputs=[local_status, local_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], | |
| ) | |
| # YouTube handlers | |
| youtube_analyze_btn.click( | |
| fn=lambda: "β³ **Fetching transcript...** Please wait.", | |
| outputs=[youtube_status], | |
| ).then( | |
| fn=run_youtube_ingestion, | |
| inputs=[youtube_url], | |
| outputs=[preview_content, preview_source_info, youtube_state], | |
| ).then( | |
| fn=lambda: "β **Analysis complete!** Click **Index for RAG** to enable chat.", | |
| outputs=[youtube_status], | |
| ) | |
| youtube_index_btn.click( | |
| fn=lambda: "β³ **Indexing...** Building vector embeddings.", | |
| outputs=[youtube_status], | |
| ).then( | |
| fn=index_youtube_with_status, | |
| inputs=[youtube_state], | |
| outputs=[youtube_status, youtube_state], | |
| ) | |
| def bookmark_youtube_with_refresh(state): | |
| status, new_state, dropdown = bookmark_youtube_video(state) | |
| choices, meta = get_dropdown_options() | |
| return ( | |
| "π **Bookmarked!** Added to Knowledge Base for future chat sessions.", | |
| new_state, | |
| gr.Dropdown(choices=choices, value=None), | |
| gr.CheckboxGroup(choices=choices, value=[]), | |
| meta | |
| ) | |
| youtube_bookmark_btn.click( | |
| fn=bookmark_youtube_with_refresh, | |
| inputs=[youtube_state], | |
| outputs=[youtube_status, youtube_state, bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], | |
| ) | |
| # Chat handlers | |
| def handle_chat_with_history(question, history, github_s, local_s, youtube_s, selected_sources, meta): | |
| # Use first selected source from checkboxes, or fall back to indexed sources | |
| bookmark = selected_sources[0] if selected_sources else None | |
| answer = answer_chat_question(question, github_s, local_s, youtube_s, bookmark, meta) | |
| history = history or [] | |
| history = history or [] | |
| history.append({"role": "user", "content": question}) | |
| history.append({"role": "assistant", "content": answer}) | |
| return history, "" | |
| send_btn.click( | |
| fn=handle_chat_with_history, | |
| inputs=[question_box, chatbot, github_state, local_state, youtube_state, bookmarks_checkboxes, bookmarks_metadata], | |
| outputs=[chatbot, question_box], | |
| ) | |
| question_box.submit( | |
| fn=handle_chat_with_history, | |
| inputs=[question_box, chatbot, github_state, local_state, youtube_state, bookmarks_checkboxes, bookmarks_metadata], | |
| outputs=[chatbot, question_box], | |
| ) | |
| # Chatbot Like/Dislike Handler | |
| def handle_chatbot_like(data: gr.LikeData): | |
| # Placeholder for future feedback logging | |
| print(f"User feedback: {'Liked' if data.liked else 'Disliked'} message index {data.index}") | |
| return None | |
| chatbot.like( | |
| fn=handle_chatbot_like, | |
| outputs=None | |
| ) | |
| # Bookmark handlers - refresh both dropdown and checkboxes | |
| def refresh_all_bookmarks(): | |
| choices, meta = get_dropdown_options() | |
| return ( | |
| gr.Dropdown(choices=choices, value=None), | |
| gr.CheckboxGroup(choices=choices, value=[]), | |
| meta | |
| ) | |
| refresh_bookmarks_btn.click( | |
| fn=refresh_all_bookmarks, | |
| outputs=[bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], | |
| ) | |
| # View All button - show bookmark details | |
| def view_all_bookmarks(meta): | |
| if not meta: | |
| return "*No bookmarks found. Analyze and bookmark repositories to see them here.*" | |
| lines = ["**π All Bookmarked Sources:**\n"] | |
| for m in meta: | |
| name = m.get("repo_name", "Unknown") | |
| date = m.get("last_pulled_display", "--") | |
| chunks = m.get("vector_chunks", 0) | |
| lines.append(f"- **{name}** β {date} β {chunks} chunks") | |
| return "\n".join(lines) | |
| view_all_btn.click( | |
| fn=view_all_bookmarks, | |
| inputs=[bookmarks_metadata], | |
| outputs=[bookmark_info], | |
| ) | |
| # Update bookmark info when checkboxes change | |
| def update_checkbox_info(selected, meta): | |
| if not selected: | |
| return "*No sources selected. Check sources above to include in chat.*" | |
| lines = [f"**{len(selected)} source(s) selected:**\n"] | |
| for label in selected: | |
| m = find_metadata_by_label(label, meta or []) | |
| if m: | |
| lines.append(f"- {m.get('repo_name', label)} ({m.get('vector_chunks', 0)} chunks)") | |
| return "\n".join(lines) | |
| bookmarks_checkboxes.change( | |
| fn=update_checkbox_info, | |
| inputs=[bookmarks_checkboxes, bookmarks_metadata], | |
| outputs=[bookmark_info], | |
| ) | |
| # Notepad download handlers | |
| def download_notepad_as_md(content): | |
| if not content or not content.strip(): | |
| return None | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False, encoding='utf-8') as f: | |
| f.write(content) | |
| return f.name | |
| github_notepad_download.click( | |
| fn=download_notepad_as_md, | |
| inputs=[github_notepad], | |
| outputs=[github_notepad_download], | |
| ) | |
| local_notepad_download.click( | |
| fn=download_notepad_as_md, | |
| inputs=[local_notepad], | |
| outputs=[local_notepad_download], | |
| ) | |
| youtube_notepad_download.click( | |
| fn=download_notepad_as_md, | |
| inputs=[youtube_notepad], | |
| outputs=[youtube_notepad_download], | |
| ) | |
| # Preview Copy Handler (JS) | |
| preview_copy_btn.click( | |
| fn=None, | |
| inputs=[preview_content], | |
| js="(content) => { navigator.clipboard.writeText(content); return 'Copied!'; }", | |
| ) | |
| # Preview Download Handler | |
| preview_download_btn.click( | |
| fn=download_notepad_as_md, # Reusing the md download function | |
| inputs=[preview_content], | |
| outputs=[preview_download_btn], | |
| ) | |
| # Lab handlers with improved output | |
| def run_lab_with_code(intention, report): | |
| result = run_experimental_lab(intention, report) | |
| # Extract code if present in result | |
| if "```python" in result: | |
| code_start = result.find("```python") + 9 | |
| code_end = result.find("```", code_start) | |
| code = result[code_start:code_end].strip() if code_end > code_start else "" | |
| return result, gr.Code(value=code, visible=True) | |
| return result, gr.Code(visible=False) | |
| lab_start_btn.click( | |
| fn=run_lab_with_code, | |
| inputs=[lab_intention, lab_report_dropdown], | |
| outputs=[lab_output, lab_code_display], | |
| ) | |
| def fix_lab_with_instruction(intention, report, fix_instruction): | |
| # Pass the fix instruction to the fix function | |
| combined = f"{intention}\n\nFIX REQUEST: {fix_instruction}" if fix_instruction else intention | |
| return lab_fix_bugs(combined, report) | |
| lab_fix_btn.click( | |
| fn=fix_lab_with_instruction, | |
| inputs=[lab_intention, lab_report_dropdown, lab_fix_instruction], | |
| outputs=[lab_output], | |
| ) | |
| lab_happy_btn.click( | |
| fn=lab_mark_happy, | |
| inputs=[lab_intention, lab_report_dropdown], | |
| outputs=[lab_output], | |
| ) | |
| # Export lab code to file | |
| def export_lab_code(code_content): | |
| if not code_content: | |
| return None | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f: | |
| f.write(code_content) | |
| return f.name | |
| lab_export_btn.click( | |
| fn=export_lab_code, | |
| inputs=[lab_code_display], | |
| outputs=[lab_download], | |
| ).then( | |
| fn=lambda: gr.File(visible=True), | |
| outputs=[lab_download], | |
| ) | |
| lab_refresh_reports_btn.click( | |
| fn=_refresh_lab_reports_dropdown, | |
| outputs=[lab_report_dropdown], | |
| ) | |
| # Load bookmarks on startup (refresh both dropdown and checkboxes) | |
| demo.load( | |
| fn=refresh_all_bookmarks, | |
| outputs=[bookmarks_dropdown, bookmarks_checkboxes, bookmarks_metadata], | |
| ) | |
| return demo, (app_theme if IS_GRADIO_V6 else None), (custom_css if IS_GRADIO_V6 else None) | |
| if __name__ == "__main__": | |
| demo, app_theme, custom_css = build_interface() | |
| launch_kwargs = {"share": False} | |
| if IS_GRADIO_V6: | |
| launch_kwargs.update(theme=app_theme, css=custom_css) | |
| demo.launch(**launch_kwargs) | |