Spaces:
Running
Running
| """Core repository analysis helpers used by both the Gradio UI and MCP server.""" | |
| from __future__ import annotations | |
| from typing import Any, Dict, List, Optional, Tuple | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import tempfile | |
| from pathlib import Path | |
| import contextlib | |
| import io | |
| import re | |
| import traceback | |
| import requests | |
| from git import GitCommandError, Repo | |
| from langchain_community.embeddings import HuggingFaceEmbeddings | |
| from langchain_community.vectorstores import Chroma | |
| from langchain_openai import ChatOpenAI | |
| from langchain_text_splitters import RecursiveCharacterTextSplitter | |
| from dotenv import load_dotenv, dotenv_values | |
| from mcp import ClientSession, StdioServerParameters | |
| from mcp.client.streamable_http import streamablehttp_client | |
| from mcp.client.stdio import stdio_client | |
| logger = logging.getLogger(__name__) | |
| PROJECT_ROOT = Path(__file__).resolve().parent | |
| DOTENV_PATH = PROJECT_ROOT / ".env" | |
| load_dotenv(DOTENV_PATH, override=False) | |
| _ENV_CACHE = dotenv_values(DOTENV_PATH) if DOTENV_PATH.exists() else {} | |
| DOC_DIRECTORIES = ("docs", "documentation", "doc") | |
| DOC_EXTENSIONS = (".md", ".rst", ".txt") | |
| DOC_FILENAMES = {"readme", "readme.md", "readme.rst", "changelog", "contributing"} | |
| IGNORE_DIRS = {".git", "__pycache__", "node_modules", "dist", "build", ".venv", "venv", ".tox"} | |
| DEFAULT_EMBEDDING_MODEL = os.getenv("SENTENCE_EMBEDDER", "sentence-transformers/all-MiniLM-L6-v2") | |
| class RepoAnalyzer: | |
| """Utility that clones a repo and extracts any documentation-esque files.""" | |
| def __init__(self, repo_url: str, working_dir: str): | |
| self.repo_url = repo_url.strip() | |
| self.working_dir = Path(working_dir) | |
| self.repo_name = self._derive_repo_name() | |
| self.repo_path = self.working_dir / self.repo_name | |
| self.docs_path: Path | None = None | |
| def _derive_repo_name(self) -> str: | |
| base = self.repo_url.rstrip("/").split("/")[-1] | |
| if base.endswith(".git"): | |
| base = base[:-4] | |
| return base or "repository" | |
| def clone_repo(self) -> bool: | |
| try: | |
| Repo.clone_from(self.repo_url, self.repo_path) | |
| logger.info("Cloned %s", self.repo_url) | |
| return True | |
| except GitCommandError as err: | |
| logger.error("Failed to clone repo: %s", err) | |
| return False | |
| def resolve_docs_directory(self) -> bool: | |
| for doc_dir in DOC_DIRECTORIES: | |
| candidate = self.repo_path / doc_dir | |
| if candidate.exists() and candidate.is_dir(): | |
| self.docs_path = candidate | |
| return True | |
| return False | |
| def find_documentation_files(self) -> List[Path]: | |
| doc_files: List[Path] = [] | |
| for root, dirs, files in os.walk(self.repo_path): | |
| dirs[:] = [d for d in dirs if d not in IGNORE_DIRS and not d.startswith(".")] | |
| for filename in files: | |
| lower = filename.lower() | |
| if lower.endswith(DOC_EXTENSIONS) or lower in DOC_FILENAMES: | |
| doc_files.append(Path(root) / filename) | |
| return doc_files | |
| def read_documentation_files(self, doc_files: List[Path]) -> List[Dict[str, Any]]: | |
| docs: List[Dict[str, Any]] = [] | |
| for path in doc_files: | |
| try: | |
| content = path.read_text(encoding="utf-8", errors="ignore") | |
| except Exception as err: # pragma: no cover - best effort read | |
| logger.warning("Unable to read %s: %s", path, err) | |
| continue | |
| docs.append({ | |
| "path": str(path.relative_to(self.repo_path)), | |
| "content": content, | |
| }) | |
| return docs | |
| def get_repo_structure(self) -> List[str]: | |
| structure: List[str] = [] | |
| for root, dirs, files in os.walk(self.repo_path): | |
| dirs[:] = [d for d in dirs if not d.startswith(".") and d not in IGNORE_DIRS] | |
| files = [f for f in files if not f.startswith(".")] | |
| rel_root = Path(root) | |
| level = len(rel_root.relative_to(self.repo_path).parts) | |
| indent = " " * 4 * level | |
| structure.append(f"{indent}{rel_root.name}/") | |
| subindent = " " * 4 * (level + 1) | |
| for file_name in files: | |
| structure.append(f"{subindent}{file_name}") | |
| return structure | |
| def analyze_repo(self, persist_path: Path | None = None) -> Dict[str, Any]: | |
| if not self.clone_repo(): | |
| return {"error": "Failed to clone repository. Confirm the URL is reachable."} | |
| has_docs = self.resolve_docs_directory() | |
| doc_files = self.find_documentation_files() | |
| documentation = self.read_documentation_files(doc_files) if doc_files else [] | |
| if persist_path: | |
| persist_path.mkdir(parents=True, exist_ok=True) | |
| for doc in documentation: | |
| rel_path = Path(doc["path"]).with_suffix(".txt") | |
| target_file = persist_path / rel_path | |
| target_file.parent.mkdir(parents=True, exist_ok=True) | |
| target_file.write_text(doc.get("content", "")) | |
| metadata_file = persist_path / "metadata.json" | |
| metadata_file.write_text(json.dumps({ | |
| "repo_url": self.repo_url, | |
| "repo_name": self.repo_name, | |
| "documentation_files": [doc["path"] for doc in documentation], | |
| }, indent=2)) | |
| return { | |
| "repo_url": self.repo_url, | |
| "repo_name": self.repo_name, | |
| "has_documentation": has_docs, | |
| "documentation_count": len(documentation), | |
| "documentation_files": [doc["path"] for doc in documentation], | |
| "documentation": documentation, | |
| "structure": self.get_repo_structure(), | |
| } | |
| def analyze_github_repo(repo_url: str, *, persist_dir: Optional[Path] = None) -> Dict[str, Any]: | |
| repo_url = (repo_url or "").strip() | |
| if not repo_url: | |
| return {"error": "Please provide a GitHub repository URL."} | |
| with tempfile.TemporaryDirectory() as tmp_dir: | |
| analyzer = RepoAnalyzer(repo_url, tmp_dir) | |
| return analyzer.analyze_repo(persist_dir) | |
| def analyze_local_repo(root_dir: str) -> Dict[str, Any]: | |
| """Analyze a local repository directory without performing a git clone. | |
| This mirrors the payload shape of ``analyze_github_repo`` so the UI | |
| and bookmarking logic can treat remote and local repos uniformly. | |
| """ | |
| root = Path(root_dir) | |
| if not root.exists() or not root.is_dir(): | |
| return {"error": "Uploaded folder was not found on the server."} | |
| repo_name = root.name or "local-repository" | |
| doc_files: List[Path] = [] | |
| for r, dirs, files in os.walk(root): | |
| dirs[:] = [d for d in dirs if d not in IGNORE_DIRS and not d.startswith(".")] | |
| for filename in files: | |
| lower = filename.lower() | |
| if lower.endswith(DOC_EXTENSIONS) or lower in DOC_FILENAMES: | |
| doc_files.append(Path(r) / filename) | |
| documentation: List[Dict[str, Any]] = [] | |
| for path in doc_files: | |
| try: | |
| content = path.read_text(encoding="utf-8", errors="ignore") | |
| except Exception as err: # pragma: no cover - best effort read | |
| logger.warning("Unable to read %s: %s", path, err) | |
| continue | |
| documentation.append( | |
| { | |
| "path": str(path.relative_to(root)), | |
| "content": content, | |
| } | |
| ) | |
| structure: List[str] = [] | |
| for r, dirs, files in os.walk(root): | |
| dirs[:] = [d for d in dirs if not d.startswith(".") and d not in IGNORE_DIRS] | |
| files = [f for f in files if not f.startswith(".")] | |
| rel_root = Path(r) | |
| level = len(rel_root.relative_to(root).parts) | |
| indent = " " * 4 * level | |
| structure.append(f"{indent}{rel_root.name}/") | |
| subindent = " " * 4 * (level + 1) | |
| for file_name in files: | |
| structure.append(f"{subindent}{file_name}") | |
| return { | |
| "repo_url": f"local://{repo_name}", | |
| "repo_name": repo_name, | |
| "has_documentation": bool(doc_files), | |
| "documentation_count": len(documentation), | |
| "documentation_files": [str(p.relative_to(root)) for p in doc_files], | |
| "documentation": documentation, | |
| "structure": structure, | |
| } | |
| def _get_embeddings() -> HuggingFaceEmbeddings: | |
| return HuggingFaceEmbeddings(model_name=DEFAULT_EMBEDDING_MODEL) | |
| def build_repo_vector_store(documents: List[Dict[str, Any]], *, persist_path: Path) -> Tuple[Optional[Chroma], int]: | |
| if not documents: | |
| return None, 0 | |
| text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150) | |
| texts: List[str] = [] | |
| metadatas: List[Dict[str, str]] = [] | |
| for doc in documents: | |
| content = doc.get("content") or "" | |
| if not content: | |
| continue | |
| chunks = text_splitter.split_text(content) | |
| texts.extend(chunks) | |
| metadatas.extend({"path": doc.get("path", "") or ""} for _ in chunks) | |
| if not texts: | |
| return None, 0 | |
| embeddings = _get_embeddings() | |
| persist_path.mkdir(parents=True, exist_ok=True) | |
| vectorstore = Chroma.from_texts( | |
| texts=texts, | |
| metadatas=metadatas, | |
| embedding=embeddings, | |
| persist_directory=str(persist_path), | |
| ) | |
| return vectorstore, len(texts) | |
| def load_vector_store(vector_dir: Path) -> Optional[Chroma]: | |
| if not vector_dir.exists(): | |
| return None | |
| embeddings = _get_embeddings() | |
| return Chroma( | |
| persist_directory=str(vector_dir), | |
| embedding_function=embeddings, | |
| ) | |
| def _get_env_var(*names: str) -> str: | |
| for name in names: | |
| value = os.getenv(name) | |
| if value: | |
| value = value.strip().strip('"').strip("'") | |
| if value: | |
| return value | |
| cache_val = (_ENV_CACHE or {}).get(name) | |
| if cache_val: | |
| cache_val = cache_val.strip().strip('"').strip("'") | |
| if cache_val: | |
| return cache_val | |
| return "" | |
| def get_llm_provider() -> str: | |
| provider = os.getenv("LLM_PROVIDER", "").strip().lower() | |
| if provider in ("openrouter", "openrounter"): | |
| return "openrouter" | |
| if provider == "openai": | |
| return "openai" | |
| if _get_env_var("OPENAI_API_KEY"): | |
| return "openai" | |
| if _get_env_var("OPENROUTER_API_KEY", "OPENROUNTER_API_KEY"): | |
| return "openrouter" | |
| return "openai" | |
| def _build_openrouter_chat_model(default_model: str | None = None) -> ChatOpenAI: | |
| api_key = _get_env_var("OPENROUTER_API_KEY", "OPENROUNTER_API_KEY") | |
| if not api_key: | |
| raise ValueError( | |
| "OpenRouter API key is not set. Provide OPENROUTER_API_KEY to enable the fallback provider." | |
| ) | |
| base_url = os.getenv("OPENROUTER_BASE_URL", "https://openrouter.ai/api/v1") | |
| model = default_model or os.getenv("OPENROUTER_MODEL", "openrouter/sherlock-think-alpha") | |
| return ChatOpenAI(model=model, api_key=api_key, base_url=base_url, temperature=0) | |
| def get_chat_model() -> ChatOpenAI: | |
| provider = get_llm_provider() | |
| if provider == "openrouter": | |
| logger.info("Using OpenRouter provider (model=%s)", os.getenv("OPENROUTER_MODEL", "openrouter/sherlock-think-alpha")) | |
| return _build_openrouter_chat_model() | |
| api_key = _get_env_var("OPENAI_API_KEY") | |
| if api_key: | |
| model = os.getenv("OPENAI_MODEL", "gpt-4o-mini") | |
| logger.info("Using OpenAI provider (model=%s)", model) | |
| return ChatOpenAI(model=model, api_key=api_key, temperature=0) | |
| # Fallback: use OpenRouter with grok-4.1-fast when OpenAI key is missing | |
| logger.warning("OPENAI_API_KEY not found; falling back to OpenRouter grok-4.1-fast") | |
| return _build_openrouter_chat_model(default_model="openrouter/sherlock-think-alpha") | |
| def rag_answer_from_store(vector_dir: Path, question: str, repo_summary: str = "") -> str: | |
| if not question.strip(): | |
| return "Please enter a question to search your bookmarked repository." | |
| vectorstore = load_vector_store(vector_dir) | |
| if vectorstore is None: | |
| return "Vector store not found. Bookmark the repository first to build embeddings." | |
| chunk_count = None | |
| collection = getattr(vectorstore, "_collection", None) | |
| if collection: | |
| try: | |
| chunk_count = collection.count() | |
| except Exception as err: # pragma: no cover - debug helper | |
| logger.debug("Unable to count Chroma collection: %s", err) | |
| logger.info( | |
| "RAG query: dir=%s | chunks=%s | question=%.60s", | |
| vector_dir, | |
| chunk_count, | |
| question.strip(), | |
| ) | |
| retriever = vectorstore.as_retriever(search_kwargs={"k": 4}) | |
| docs = retriever.invoke(question) | |
| logger.info("Retriever returned %s documents", len(docs) if hasattr(docs, "__len__") else "unknown") | |
| if not docs: | |
| return "No relevant context found in the selected repository." | |
| context = "\n\n".join( | |
| f"Source: {doc.metadata.get('path', 'unknown')}\n{doc.page_content}" | |
| for doc in docs | |
| ) | |
| llm = get_chat_model() | |
| prompt = ( | |
| "You are a helpful study assistant for a GitHub repository.\n" | |
| "Use the repository context below as your primary source of truth.\n" | |
| "Prefer concise, direct answers.\n" | |
| "If the context is incomplete, you may draw on general knowledge, but make clear when you are doing so.\n" | |
| "Only say \"I don't know based on this repository\" if the question truly cannot be answered, even approximately, from the context and your general knowledge.\n\n" | |
| f"Repository summary:\n{repo_summary}\n\nContext:\n{context}\n\nQuestion: {question}" | |
| ) | |
| response = llm.invoke(prompt) | |
| logger.info("RAG LLM raw response: %.120s", getattr(response, "content", str(response)).replace("\n", " ")) | |
| return getattr(response, "content", str(response)) | |
| def qa_on_repo(repo_url: str, question: str) -> str: | |
| del repo_url, question | |
| return "Repository Q&A is now handled via bookmarked vector stores." | |
| def _strip_markdown_code_fences(code: str) -> str: | |
| """Remove markdown code fences (```python ... ```) from LLM output.""" | |
| code = code.strip() | |
| # Remove opening fence with optional language specifier | |
| if code.startswith("```"): | |
| first_newline = code.find("\n") | |
| if first_newline != -1: | |
| code = code[first_newline + 1:] | |
| # Remove closing fence | |
| if code.rstrip().endswith("```"): | |
| code = code.rstrip()[:-3].rstrip() | |
| return code | |
| def _sandbox_test_experiment_code(code: str) -> Tuple[str, str]: | |
| # Strip markdown fences if LLM included them | |
| code = _strip_markdown_code_fences(code) | |
| buf = io.StringIO() | |
| ns: Dict[str, Any] = {} | |
| with contextlib.redirect_stdout(buf), contextlib.redirect_stderr(buf): | |
| try: | |
| exec(code, ns, ns) | |
| except Exception: | |
| return buf.getvalue(), traceback.format_exc() | |
| return buf.getvalue(), "" | |
| def build_experiment_from_report(intention: str, report_markdown: str) -> Dict[str, Any]: | |
| intention = (intention or "").strip() | |
| if not intention: | |
| return {"code": "", "stdout": "", "error": "No intention was provided."} | |
| llm = get_chat_model() | |
| snippet = report_markdown[:8000] if report_markdown else "" | |
| prompt = ( | |
| "You are a senior Python engineer. Given the following project knowledge transfer report " | |
| "and a user intention, write a minimal, self-contained Gradio app in Python.\n\n" | |
| "Constraints:\n" | |
| "- Use the 'gradio' library.\n" | |
| "- Define a function `build_experiment()` that returns a `gr.Blocks` instance.\n" | |
| "- Do NOT call `launch()` anywhere. The caller will handle running the app.\n" | |
| "- Keep the app small and focused on the intention.\n" | |
| "- Return only Python code, with no explanations or Markdown fences.\n\n" | |
| f"KNOWLEDGE TRANSFER REPORT (truncated):\n{snippet}\n\n" | |
| f"USER INTENTION:\n{intention}\n" | |
| ) | |
| response = llm.invoke(prompt) | |
| code = getattr(response, "content", str(response)) | |
| stdout, error = _sandbox_test_experiment_code(code) | |
| return {"code": code, "stdout": stdout, "error": error} | |
| _YOUTUBE_TRANSCRIPT_API = "https://youtube-captions-transcript-subtitles-video-combiner.p.rapidapi.com/download-all/{video_id}" | |
| _YOUTUBE_LANGUAGES_API = "https://youtube-captions-transcript-subtitles-video-combiner.p.rapidapi.com/languages/{video_id}" | |
| _YOUTUBE_TRANSCRIPT_FALLBACK_API = "https://youtube-video-summarizer-gpt-ai.p.rapidapi.com/api/v1/get-transcript-v2" | |
| def _extract_video_id(url_or_id: str) -> str: | |
| pattern = re.compile(r"(?:v=|/)([0-9A-Za-z_-]{11})") | |
| match = pattern.search(url_or_id) | |
| if match: | |
| return match.group(1) | |
| # Maybe the user already passed the ID. | |
| return url_or_id.strip() | |
| def _parse_transcript_payload(data: Any) -> str: | |
| """Extract transcript text from various API response shapes.""" | |
| transcript_text = "" | |
| if isinstance(data, dict): | |
| candidates = [] | |
| for key in ("transcript", "subtitle", "subtitles", "caption", "captions"): | |
| if key in data: | |
| candidates.append(data[key]) | |
| payload = data.get("data") | |
| if payload and isinstance(payload, dict): | |
| for key in ("transcript", "subtitle", "subtitles"): | |
| if key in payload: | |
| candidates.append(payload[key]) | |
| for candidate in candidates: | |
| if isinstance(candidate, str) and candidate.strip(): | |
| transcript_text = candidate.strip() | |
| break | |
| if isinstance(candidate, list): | |
| joined = " ".join(str(item).strip() for item in candidate if str(item).strip()) | |
| if joined.strip(): | |
| transcript_text = joined.strip() | |
| break | |
| return transcript_text.strip() | |
| def _call_primary_transcript_api(video_id: str, language: str, rapidapi_key: str) -> Dict[str, Any]: | |
| headers = { | |
| "x-rapidapi-key": rapidapi_key, | |
| "x-rapidapi-host": "youtube-captions-transcript-subtitles-video-combiner.p.rapidapi.com", | |
| } | |
| params = {"format_subtitle": "srt", "format_answer": "json", "lang": language} | |
| api_url = _YOUTUBE_TRANSCRIPT_API.format(video_id=video_id) | |
| response = requests.get(api_url, headers=headers, params=params, timeout=20) | |
| return {"status": response.status_code, "data": response.json() if response.content else {}, "text": response.text} | |
| def _call_fallback_transcript_api(video_id: str, rapidapi_key: str) -> Dict[str, Any]: | |
| headers = { | |
| "x-rapidapi-key": rapidapi_key, | |
| "x-rapidapi-host": "youtube-video-summarizer-gpt-ai.p.rapidapi.com", | |
| } | |
| params = {"video_id": video_id, "platform": "youtube"} | |
| response = requests.get(_YOUTUBE_TRANSCRIPT_FALLBACK_API, headers=headers, params=params, timeout=20) | |
| payload = response.json() if response.content else {} | |
| return {"status": response.status_code, "data": payload, "text": response.text} | |
| def fetch_youtube_transcript(url: str, lang: str = "en") -> Dict[str, Any]: | |
| """Fetch a YouTube transcript using RapidAPI endpoints with fallback.""" | |
| url = (url or "").strip() | |
| if not url: | |
| return {"error": "Please provide a YouTube video URL."} | |
| video_id = _extract_video_id(url) | |
| if not video_id: | |
| return {"error": "Unable to determine the YouTube video ID."} | |
| rapidapi_key = (os.getenv("RAPID_API_KEY") or "").strip() | |
| if not rapidapi_key: | |
| return {"error": "RAPID_API_KEY is not configured. Set it in your .env file."} | |
| language = (lang or "en").strip() or "en" | |
| try: | |
| primary_result = _call_primary_transcript_api(video_id, language, rapidapi_key) | |
| except requests.RequestException as err: | |
| logger.error("Error calling RapidAPI transcript endpoint: %s", err) | |
| primary_result = {"status": 503, "data": {}, "text": str(err)} | |
| transcript_text = "" | |
| if primary_result["status"] == 200: | |
| transcript_text = _parse_transcript_payload(primary_result["data"]) or primary_result.get("text", "").strip() | |
| if primary_result["status"] == 403 or not transcript_text: | |
| logger.warning( | |
| "Primary transcript API failed (status=%s). Falling back to youtube-video-summarizer endpoint.", | |
| primary_result["status"], | |
| ) | |
| try: | |
| fallback_result = _call_fallback_transcript_api(video_id, rapidapi_key) | |
| except requests.RequestException as err: | |
| logger.error("Fallback transcript endpoint error: %s", err) | |
| return {"error": f"Fallback transcript service failed: {err}"} | |
| if fallback_result["status"] != 200: | |
| logger.error( | |
| "Fallback transcript endpoint returned %s: %s", | |
| fallback_result["status"], | |
| fallback_result.get("text", ""), | |
| ) | |
| return { | |
| "error": f"Transcript APIs failed (primary status {primary_result['status']}, fallback status {fallback_result['status']}).", | |
| } | |
| transcript_text = _parse_transcript_payload(fallback_result["data"]) or fallback_result.get("text", "").strip() | |
| if not transcript_text: | |
| return {"error": "Transcript APIs returned no textual content."} | |
| return { | |
| "url": url, | |
| "video_id": video_id, | |
| "lang": language, | |
| "raw_transcript": transcript_text, | |
| } | |
| def summarize_youtube_chapters(transcript: str, url: str = "") -> str: | |
| """Summarize a YouTube transcript into chapter-style groups. | |
| This uses the main chat model to infer chapter titles and rough time ranges | |
| from the transcript text. It is intentionally lightweight and returns | |
| Markdown directly for display in the UI. | |
| """ | |
| text = (transcript or "").strip() | |
| if not text: | |
| return "No transcript text available to generate chapter groups." | |
| llm = get_chat_model() | |
| snippet = text | |
| prompt = ( | |
| "You are a helpful study assistant in teaching core concepts and ideas.", | |
| " Given the following YouTube transcript, divide it into a small number of", | |
| " high-level chapters with intricate important details that would be useful for the students to prepare for the exam and interviews.", | |
| "\n\nRequirements:\n", | |
| "- Return Markdown only, no code fences.\n", | |
| "- For each chapter, provide a short time range (approximate is fine), a title,", | |
| " and 3-6 bullet points summarizing the key ideas and concepts.\n", | |
| "- Prefer 5-12 chapters for a long subtitles.\n", | |
| f"Source URL (optional): {url or 'N/A'}\n\n", | |
| f"TRANSCRIPT (truncated):\n{snippet}\n", | |
| ) | |
| full_prompt = "".join(prompt) | |
| response = llm.invoke(full_prompt) | |
| return getattr(response, "content", str(response)) | |
| def generate_youtube_study_notes(chapters_markdown: str, url: str = "") -> str: | |
| """Generate extended study and interview-oriented notes from chapter groups.""" | |
| text = (chapters_markdown or "").strip() | |
| if not text: | |
| return "No chapter outline is available to derive study notes." | |
| llm = get_chat_model() | |
| prompt = ( | |
| "You are an expert instructor preparing a study and interview guide based on a YouTube lecture.\n\n" | |
| "You are given a chapter-style outline (with headings and bullet points).\n\n" | |
| "Produce Markdown (no code fences) with these sections:\n" | |
| "1. **Key Concepts & Skills to Master** β group related ideas, describe why they matter, and point to where in the video they appear.\n" | |
| "2. **How to Study This Video** β concrete tips on how a learner should watch, pause, and practice to internalize the material.\n" | |
| "3. **Interview Preparation Checklist** β a list of specific topics, sub-skills, and example questions a candidate should be ready to answer, based only on this video.\n\n" | |
| "Keep the tone concise but rich in information. Do not repeat the entire outline verbatim; instead, synthesize and reorganize it for learning.\n\n" | |
| f"Source URL (optional): {url or 'N/A'}\n\n" | |
| f"CHAPTER OUTLINE:\n{text}\n" | |
| ) | |
| response = llm.invoke(prompt) | |
| return getattr(response, "content", str(response)) |