MetaSearch / pipeline /critique_extraction.py
Tirath5504's picture
use openrouter only instead of google-genai
08a5a31
import json
import os
from typing import List, Dict
from openai import OpenAI
from pydantic import BaseModel
import asyncio
from dotenv import load_dotenv
load_dotenv()
# Initialize OpenRouter client
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.getenv("OPENROUTER_API_KEY"),
)
# Model to use for critique extraction
CRITIQUE_MODEL = "google/gemini-2.5-flash-lite"
class CritiquePoint(BaseModel):
Methodology: List[str] = []
Experiments: List[str] = []
Clarity: List[str] = []
Significance: List[str] = []
Novelty: List[str] = []
async def extract_single_critique(review_text: str, retries: int = 5) -> Dict:
"""
Extract critique points from a single review using OpenRouter (Gemini)
Args:
review_text: The review text to analyze
retries: Maximum number of retries
Returns:
Dictionary with categorized critique points
"""
system_prompt = """
You are an expert at analyzing academic peer reviews.
Extract key critique points from the review and categorize them.
Respond with ONLY valid JSON in this format:
{
"Methodology": ["point1", "point2"],
"Experiments": ["point1"],
"Clarity": ["point1", "point2"],
"Significance": ["point1"],
"Novelty": ["point1"]
}
"""
user_prompt = f"""
Extract key critique points from the following research paper review.
Categorize them into aspects: Methodology, Experiments, Clarity, Significance, Novelty.
Review:
{review_text}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
for attempt in range(retries):
try:
response = await asyncio.to_thread(
client.chat.completions.create,
model=CRITIQUE_MODEL,
messages=messages,
max_tokens=2048,
response_format={"type": "json_object"},
)
if not response.choices or not response.choices[0].message.content.strip():
raise ValueError("Empty response from API")
result = json.loads(response.choices[0].message.content.strip())
# Validate structure
critique = CritiquePoint(**result)
return critique.model_dump()
except Exception as e:
error_msg = str(e)
# Check for content safety blocks
if "safety" in error_msg.lower() or "blocked" in error_msg.lower():
print(f"Content blocked by safety filters: {e}")
return {
"Methodology": [],
"Experiments": [],
"Clarity": [],
"Significance": [],
"Novelty": [],
"error": "Content blocked by safety filters"
}
wait_time = 2 ** attempt
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...")
if attempt < retries - 1:
await asyncio.sleep(wait_time)
else:
return {
"Methodology": [],
"Experiments": [],
"Clarity": [],
"Significance": [],
"Novelty": [],
"error": str(e)
}
async def extract_critiques(reviews: List[str]) -> List[Dict]:
"""
Extract critique points from multiple reviews
Args:
reviews: List of review texts
Returns:
List of dictionaries with categorized critique points
"""
if not reviews:
return []
# Filter valid reviews (must be strings with substantial content)
valid_reviews = [r for r in reviews if isinstance(r, str) and len(r.strip()) > 100]
if not valid_reviews:
return []
# Process reviews concurrently with rate limiting
tasks = []
for review in valid_reviews:
tasks.append(extract_single_critique(review))
# Small delay to avoid overwhelming the API
await asyncio.sleep(0.5)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions and return valid results
critiques = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Review {i} failed: {result}")
critiques.append({
"Methodology": [],
"Experiments": [],
"Clarity": [],
"Significance": [],
"Novelty": [],
"error": str(result)
})
else:
critiques.append(result)
return critiques