import os from typing import Dict, List import asyncio from openai import OpenAI from langchain_community.utilities import ArxivAPIWrapper, SerpAPIWrapper from langchain_community.tools.semanticscholar.tool import SemanticScholarQueryRun from langchain_community.tools.tavily_search import TavilySearchResults from dotenv import load_dotenv load_dotenv() # Initialize OpenRouter client for LLM calls client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), ) # Model for search/retrieval tasks SEARCH_MODEL = "google/gemini-2.5-flash-lite" # Initialize search tools semantic_scholar = SemanticScholarQueryRun() google_scholar = SerpAPIWrapper(params={"engine": "google_scholar"}) arxiv_search = ArxivAPIWrapper() tavily_search = TavilySearchResults(max_results=5) def combine_critiques(critique_points: List[Dict]) -> Dict[str, str]: """ Combine critique points from multiple reviews into categories Args: critique_points: List of critique dictionaries Returns: Dictionary with combined critiques per category """ categories = ["Methodology", "Clarity", "Experiments", "Significance", "Novelty"] combined = {cat: [] for cat in categories} for review in critique_points: for category in categories: if category in review and review[category]: combined[category].extend(review[category]) # Join into strings for category in categories: combined[category] = " | ".join(combined[category]) if combined[category] else "No critiques" return combined async def run_search_tool(tool_name: str, tool_func, query: str) -> str: """Run a search tool with error handling""" try: result = await asyncio.to_thread(tool_func, query) return str(result) if result else "" except Exception as e: print(f"{tool_name} search failed: {e}") return "" async def search_sota(paper_title: str, paper_abstract: str, retries: int = 3) -> str: """ Search for state-of-the-art research related to the paper Args: paper_title: Paper title paper_abstract: Paper abstract retries: Maximum retry attempts Returns: Summary of SoTA findings """ # Create search query search_query = f"{paper_title} recent advances methodology" # Run multiple searches in parallel search_tasks = [ run_search_tool("Tavily", tavily_search.run, search_query), run_search_tool("ArXiv", arxiv_search.run, search_query[:300]), run_search_tool("SemanticScholar", semantic_scholar.run, paper_title), ] search_results = await asyncio.gather(*search_tasks) # Combine all search results combined_results = "\n\n".join([ f"=== Tavily Results ===\n{search_results[0]}" if search_results[0] else "", f"=== ArXiv Results ===\n{search_results[1]}" if search_results[1] else "", f"=== Semantic Scholar Results ===\n{search_results[2]}" if search_results[2] else "", ]) if not combined_results.strip(): return "No SoTA research found from available sources." # Use LLM to synthesize the results system_prompt = """ You are an expert at synthesizing academic research findings. Summarize the search results to identify state-of-the-art approaches and recent advances. Focus on methodologies, key findings, and how they relate to the paper being reviewed. """ user_prompt = f""" Paper Title: {paper_title} Paper Abstract: {paper_abstract[:500]} Search Results: {combined_results[:4000]} Provide a concise summary of the state-of-the-art research relevant to this paper. """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] for attempt in range(retries): try: response = await asyncio.to_thread( client.chat.completions.create, model=SEARCH_MODEL, messages=messages, max_tokens=2048, ) if not response.choices or not response.choices[0].message.content.strip(): raise ValueError("Empty response from API") return response.choices[0].message.content.strip() except Exception as e: wait_time = 2 ** attempt print(f"SoTA synthesis attempt {attempt + 1} failed: {e}") if attempt < retries - 1: await asyncio.sleep(wait_time) else: # Return raw results if synthesis fails return f"Raw search results (synthesis failed):\n{combined_results[:2000]}" async def retrieve_evidence_for_category( category: str, critiques: str, retries: int = 3 ) -> str: """ Retrieve evidence for critiques in a specific category Args: category: Category name (e.g., "Methodology") critiques: Combined critique text retries: Maximum retry attempts Returns: Evidence findings """ if critiques == "No critiques" or not critiques.strip(): return f"No critiques to validate for {category}" # Create targeted search query search_query = f"{category} research validation {critiques[:200]}" # Run search try: tavily_result = await run_search_tool("Tavily", tavily_search.run, search_query) arxiv_result = await run_search_tool("ArXiv", arxiv_search.run, search_query[:200]) combined = f"{tavily_result}\n{arxiv_result}".strip() if not combined: return f"No evidence found for {category} critiques" # Use LLM to analyze relevance system_prompt = f""" You are an expert at evaluating academic critiques. Analyze the search results to find evidence that supports or contradicts the critiques. Focus on the {category} aspect. """ user_prompt = f""" Critiques for {category}: {critiques} Search Results: {combined[:2000]} Summarize the evidence found that relates to these critiques. """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] for attempt in range(retries): try: response = await asyncio.to_thread( client.chat.completions.create, model=SEARCH_MODEL, messages=messages, max_tokens=1024, ) if response.choices and response.choices[0].message.content.strip(): return response.choices[0].message.content.strip() except Exception as e: if attempt < retries - 1: await asyncio.sleep(2 ** attempt) return f"Evidence retrieval completed for {category}" except Exception as e: return f"Error retrieving evidence for {category}: {str(e)}" async def retrieve_evidence(combined_critiques: Dict[str, str]) -> Dict[str, str]: """ Retrieve evidence for all critique categories Args: combined_critiques: Dictionary of combined critiques per category Returns: Dictionary of evidence per category """ evidence_results = {} # Process categories with rate limiting for category, critiques in combined_critiques.items(): evidence_results[category] = await retrieve_evidence_for_category( category, critiques ) # Delay between requests await asyncio.sleep(1) return evidence_results async def search_and_retrieve( paper_title: str, paper_abstract: str, critique_points: List[Dict] ) -> Dict: """ Complete search and retrieval pipeline Args: paper_title: Paper title paper_abstract: Paper abstract critique_points: List of critique point dictionaries Returns: Dictionary with SoTA results, combined critiques, and evidence """ try: # Step 1: Search for SoTA research sota_results = await search_sota(paper_title, paper_abstract) # Step 2: Combine critique points combined_critiques = combine_critiques(critique_points) # Step 3: Retrieve evidence for critiques evidence = await retrieve_evidence(combined_critiques) return { "SoTA_Results": sota_results, "Combined_Critiques": combined_critiques, "Retrieved_Evidence": evidence } except Exception as e: return { "error": str(e), "SoTA_Results": "", "Combined_Critiques": {}, "Retrieved_Evidence": {} }