import json import os from typing import List, Dict from itertools import combinations from openai import OpenAI from pydantic import BaseModel, Field import asyncio from dotenv import load_dotenv load_dotenv() # Initialize OpenRouter client client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), ) # Model to use for disagreement detection DISAGREEMENT_MODEL = "google/gemini-2.5-flash-lite" class DisagreementDetails(BaseModel): Methodology: List[str] = Field(default_factory=list) Experiments: List[str] = Field(default_factory=list) Clarity: List[str] = Field(default_factory=list) Significance: List[str] = Field(default_factory=list) Novelty: List[str] = Field(default_factory=list) class DisagreementResult(BaseModel): review_pair: List[int] disagreement_score: float = Field(..., ge=0.0, le=1.0) disagreement_details: DisagreementDetails def list_to_string(lst: List[str]) -> str: """Convert list to formatted string""" return "\n".join(f"- {item}" for item in lst) if lst else "None" async def compare_review_pair( review1: Dict, review2: Dict, idx1: int, idx2: int, retries: int = 5 ) -> Dict: """ Compare two reviews and detect disagreements Args: review1: First review's critique points review2: Second review's critique points idx1: Index of first review idx2: Index of second review retries: Maximum retry attempts Returns: Disagreement analysis results """ system_prompt = """ You are an expert at analyzing academic peer review disagreements. Compare reviews and identify disagreements across different aspects. Respond with ONLY valid JSON in this exact format: { "disagreement_score": 0.5, "disagreement_details": { "Methodology": ["specific disagreement point 1"], "Experiments": ["specific disagreement point 1"], "Clarity": [], "Significance": ["specific disagreement point 1"], "Novelty": [] } } """ user_prompt = f""" Compare the following two reviews and identify disagreements across different aspects. Assess disagreement level (0.0 = perfect agreement, 1.0 = complete disagreement) and list specific points of disagreement for each category. Review 1: Methodology: {list_to_string(review1.get('Methodology', []))} Experiments: {list_to_string(review1.get('Experiments', []))} Clarity: {list_to_string(review1.get('Clarity', []))} Significance: {list_to_string(review1.get('Significance', []))} Novelty: {list_to_string(review1.get('Novelty', []))} Review 2: Methodology: {list_to_string(review2.get('Methodology', []))} Experiments: {list_to_string(review2.get('Experiments', []))} Clarity: {list_to_string(review2.get('Clarity', []))} Significance: {list_to_string(review2.get('Significance', []))} Novelty: {list_to_string(review2.get('Novelty', []))} """ messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] for attempt in range(retries): try: response = await asyncio.to_thread( client.chat.completions.create, model=DISAGREEMENT_MODEL, messages=messages, max_tokens=2048, response_format={"type": "json_object"}, ) if not response.choices or not response.choices[0].message.content.strip(): raise ValueError("Empty response from API") result = json.loads(response.choices[0].message.content.strip()) # Validate structure disagreement = DisagreementResult( review_pair=[idx1, idx2], disagreement_score=result["disagreement_score"], disagreement_details=result["disagreement_details"] ) return disagreement.model_dump() except Exception as e: wait_time = 2 ** attempt print(f"Disagreement detection attempt {attempt + 1} failed: {e}") if attempt < retries - 1: await asyncio.sleep(wait_time) else: return { "review_pair": [idx1, idx2], "disagreement_score": 0.0, "disagreement_details": { "Methodology": [], "Experiments": [], "Clarity": [], "Significance": [], "Novelty": [] }, "error": str(e) } async def detect_disagreements(critique_points: List[Dict]) -> List[Dict]: """ Detect disagreements across all review pairs Args: critique_points: List of critique point dictionaries Returns: List of disagreement analyses """ if len(critique_points) < 2: return [] # Generate all review pairs review_pairs = list(combinations(range(len(critique_points)), 2)) if not review_pairs: return [] # Process pairs concurrently with rate limiting tasks = [] for idx1, idx2 in review_pairs: tasks.append( compare_review_pair( critique_points[idx1], critique_points[idx2], idx1, idx2 ) ) # Small delay between API calls await asyncio.sleep(0.3) results = await asyncio.gather(*tasks, return_exceptions=True) # Filter results disagreements = [] for i, result in enumerate(results): if isinstance(result, Exception): print(f"Review pair {review_pairs[i]} failed: {result}") else: disagreements.append(result) return disagreements