MetaSearch / pipeline /search_retrieval.py
Tirath5504's picture
use openrouter only instead of google-genai
08a5a31
import os
from typing import Dict, List
import asyncio
from openai import OpenAI
from langchain_community.utilities import ArxivAPIWrapper, SerpAPIWrapper
from langchain_community.tools.semanticscholar.tool import SemanticScholarQueryRun
from langchain_community.tools.tavily_search import TavilySearchResults
from dotenv import load_dotenv
load_dotenv()
# Initialize OpenRouter client for LLM calls
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.getenv("OPENROUTER_API_KEY"),
)
# Model for search/retrieval tasks
SEARCH_MODEL = "google/gemini-2.5-flash-lite"
# Initialize search tools
semantic_scholar = SemanticScholarQueryRun()
google_scholar = SerpAPIWrapper(params={"engine": "google_scholar"})
arxiv_search = ArxivAPIWrapper()
tavily_search = TavilySearchResults(max_results=5)
def combine_critiques(critique_points: List[Dict]) -> Dict[str, str]:
"""
Combine critique points from multiple reviews into categories
Args:
critique_points: List of critique dictionaries
Returns:
Dictionary with combined critiques per category
"""
categories = ["Methodology", "Clarity", "Experiments", "Significance", "Novelty"]
combined = {cat: [] for cat in categories}
for review in critique_points:
for category in categories:
if category in review and review[category]:
combined[category].extend(review[category])
# Join into strings
for category in categories:
combined[category] = " | ".join(combined[category]) if combined[category] else "No critiques"
return combined
async def run_search_tool(tool_name: str, tool_func, query: str) -> str:
"""Run a search tool with error handling"""
try:
result = await asyncio.to_thread(tool_func, query)
return str(result) if result else ""
except Exception as e:
print(f"{tool_name} search failed: {e}")
return ""
async def search_sota(paper_title: str, paper_abstract: str, retries: int = 3) -> str:
"""
Search for state-of-the-art research related to the paper
Args:
paper_title: Paper title
paper_abstract: Paper abstract
retries: Maximum retry attempts
Returns:
Summary of SoTA findings
"""
# Create search query
search_query = f"{paper_title} recent advances methodology"
# Run multiple searches in parallel
search_tasks = [
run_search_tool("Tavily", tavily_search.run, search_query),
run_search_tool("ArXiv", arxiv_search.run, search_query[:300]),
run_search_tool("SemanticScholar", semantic_scholar.run, paper_title),
]
search_results = await asyncio.gather(*search_tasks)
# Combine all search results
combined_results = "\n\n".join([
f"=== Tavily Results ===\n{search_results[0]}" if search_results[0] else "",
f"=== ArXiv Results ===\n{search_results[1]}" if search_results[1] else "",
f"=== Semantic Scholar Results ===\n{search_results[2]}" if search_results[2] else "",
])
if not combined_results.strip():
return "No SoTA research found from available sources."
# Use LLM to synthesize the results
system_prompt = """
You are an expert at synthesizing academic research findings.
Summarize the search results to identify state-of-the-art approaches and recent advances.
Focus on methodologies, key findings, and how they relate to the paper being reviewed.
"""
user_prompt = f"""
Paper Title: {paper_title}
Paper Abstract: {paper_abstract[:500]}
Search Results:
{combined_results[:4000]}
Provide a concise summary of the state-of-the-art research relevant to this paper.
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
for attempt in range(retries):
try:
response = await asyncio.to_thread(
client.chat.completions.create,
model=SEARCH_MODEL,
messages=messages,
max_tokens=2048,
)
if not response.choices or not response.choices[0].message.content.strip():
raise ValueError("Empty response from API")
return response.choices[0].message.content.strip()
except Exception as e:
wait_time = 2 ** attempt
print(f"SoTA synthesis attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
await asyncio.sleep(wait_time)
else:
# Return raw results if synthesis fails
return f"Raw search results (synthesis failed):\n{combined_results[:2000]}"
async def retrieve_evidence_for_category(
category: str,
critiques: str,
retries: int = 3
) -> str:
"""
Retrieve evidence for critiques in a specific category
Args:
category: Category name (e.g., "Methodology")
critiques: Combined critique text
retries: Maximum retry attempts
Returns:
Evidence findings
"""
if critiques == "No critiques" or not critiques.strip():
return f"No critiques to validate for {category}"
# Create targeted search query
search_query = f"{category} research validation {critiques[:200]}"
# Run search
try:
tavily_result = await run_search_tool("Tavily", tavily_search.run, search_query)
arxiv_result = await run_search_tool("ArXiv", arxiv_search.run, search_query[:200])
combined = f"{tavily_result}\n{arxiv_result}".strip()
if not combined:
return f"No evidence found for {category} critiques"
# Use LLM to analyze relevance
system_prompt = f"""
You are an expert at evaluating academic critiques.
Analyze the search results to find evidence that supports or contradicts the critiques.
Focus on the {category} aspect.
"""
user_prompt = f"""
Critiques for {category}: {critiques}
Search Results:
{combined[:2000]}
Summarize the evidence found that relates to these critiques.
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
for attempt in range(retries):
try:
response = await asyncio.to_thread(
client.chat.completions.create,
model=SEARCH_MODEL,
messages=messages,
max_tokens=1024,
)
if response.choices and response.choices[0].message.content.strip():
return response.choices[0].message.content.strip()
except Exception as e:
if attempt < retries - 1:
await asyncio.sleep(2 ** attempt)
return f"Evidence retrieval completed for {category}"
except Exception as e:
return f"Error retrieving evidence for {category}: {str(e)}"
async def retrieve_evidence(combined_critiques: Dict[str, str]) -> Dict[str, str]:
"""
Retrieve evidence for all critique categories
Args:
combined_critiques: Dictionary of combined critiques per category
Returns:
Dictionary of evidence per category
"""
evidence_results = {}
# Process categories with rate limiting
for category, critiques in combined_critiques.items():
evidence_results[category] = await retrieve_evidence_for_category(
category,
critiques
)
# Delay between requests
await asyncio.sleep(1)
return evidence_results
async def search_and_retrieve(
paper_title: str,
paper_abstract: str,
critique_points: List[Dict]
) -> Dict:
"""
Complete search and retrieval pipeline
Args:
paper_title: Paper title
paper_abstract: Paper abstract
critique_points: List of critique point dictionaries
Returns:
Dictionary with SoTA results, combined critiques, and evidence
"""
try:
# Step 1: Search for SoTA research
sota_results = await search_sota(paper_title, paper_abstract)
# Step 2: Combine critique points
combined_critiques = combine_critiques(critique_points)
# Step 3: Retrieve evidence for critiques
evidence = await retrieve_evidence(combined_critiques)
return {
"SoTA_Results": sota_results,
"Combined_Critiques": combined_critiques,
"Retrieved_Evidence": evidence
}
except Exception as e:
return {
"error": str(e),
"SoTA_Results": "",
"Combined_Critiques": {},
"Retrieved_Evidence": {}
}