import json import os from typing import List, Dict from openai import OpenAI from pydantic import BaseModel import asyncio from dotenv import load_dotenv load_dotenv() # Initialize OpenRouter client client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), ) # Model to use for critique extraction CRITIQUE_MODEL = "google/gemini-2.5-flash-lite" class CritiquePoint(BaseModel): Methodology: List[str] = [] Experiments: List[str] = [] Clarity: List[str] = [] Significance: List[str] = [] Novelty: List[str] = [] async def extract_single_critique(review_text: str, retries: int = 5) -> Dict: """ Extract critique points from a single review using OpenRouter (Gemini) Args: review_text: The review text to analyze retries: Maximum number of retries Returns: Dictionary with categorized critique points """ system_prompt = """ You are an expert at analyzing academic peer reviews. Extract key critique points from the review and categorize them. Respond with ONLY valid JSON in this format: { "Methodology": ["point1", "point2"], "Experiments": ["point1"], "Clarity": ["point1", "point2"], "Significance": ["point1"], "Novelty": ["point1"] } """ user_prompt = f""" Extract key critique points from the following research paper review. Categorize them into aspects: Methodology, Experiments, Clarity, Significance, Novelty. Review: {review_text} """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] for attempt in range(retries): try: response = await asyncio.to_thread( client.chat.completions.create, model=CRITIQUE_MODEL, messages=messages, max_tokens=2048, response_format={"type": "json_object"}, ) if not response.choices or not response.choices[0].message.content.strip(): raise ValueError("Empty response from API") result = json.loads(response.choices[0].message.content.strip()) # Validate structure critique = CritiquePoint(**result) return critique.model_dump() except Exception as e: error_msg = str(e) # Check for content safety blocks if "safety" in error_msg.lower() or "blocked" in error_msg.lower(): print(f"Content blocked by safety filters: {e}") return { "Methodology": [], "Experiments": [], "Clarity": [], "Significance": [], "Novelty": [], "error": "Content blocked by safety filters" } wait_time = 2 ** attempt print(f"Attempt {attempt + 1} failed: {e}. Retrying in {wait_time}s...") if attempt < retries - 1: await asyncio.sleep(wait_time) else: return { "Methodology": [], "Experiments": [], "Clarity": [], "Significance": [], "Novelty": [], "error": str(e) } async def extract_critiques(reviews: List[str]) -> List[Dict]: """ Extract critique points from multiple reviews Args: reviews: List of review texts Returns: List of dictionaries with categorized critique points """ if not reviews: return [] # Filter valid reviews (must be strings with substantial content) valid_reviews = [r for r in reviews if isinstance(r, str) and len(r.strip()) > 100] if not valid_reviews: return [] # Process reviews concurrently with rate limiting tasks = [] for review in valid_reviews: tasks.append(extract_single_critique(review)) # Small delay to avoid overwhelming the API await asyncio.sleep(0.5) results = await asyncio.gather(*tasks, return_exceptions=True) # Filter out exceptions and return valid results critiques = [] for i, result in enumerate(results): if isinstance(result, Exception): print(f"Review {i} failed: {result}") critiques.append({ "Methodology": [], "Experiments": [], "Clarity": [], "Significance": [], "Novelty": [], "error": str(result) }) else: critiques.append(result) return critiques