MetaSearch / pipeline /disagreement_detection.py
Tirath5504's picture
use openrouter only instead of google-genai
08a5a31
import json
import os
from typing import List, Dict
from itertools import combinations
from openai import OpenAI
from pydantic import BaseModel, Field
import asyncio
from dotenv import load_dotenv
load_dotenv()
# Initialize OpenRouter client
client = OpenAI(
base_url="https://openrouter.ai/api/v1",
api_key=os.getenv("OPENROUTER_API_KEY"),
)
# Model to use for disagreement detection
DISAGREEMENT_MODEL = "google/gemini-2.5-flash-lite"
class DisagreementDetails(BaseModel):
Methodology: List[str] = Field(default_factory=list)
Experiments: List[str] = Field(default_factory=list)
Clarity: List[str] = Field(default_factory=list)
Significance: List[str] = Field(default_factory=list)
Novelty: List[str] = Field(default_factory=list)
class DisagreementResult(BaseModel):
review_pair: List[int]
disagreement_score: float = Field(..., ge=0.0, le=1.0)
disagreement_details: DisagreementDetails
def list_to_string(lst: List[str]) -> str:
"""Convert list to formatted string"""
return "\n".join(f"- {item}" for item in lst) if lst else "None"
async def compare_review_pair(
review1: Dict,
review2: Dict,
idx1: int,
idx2: int,
retries: int = 5
) -> Dict:
"""
Compare two reviews and detect disagreements
Args:
review1: First review's critique points
review2: Second review's critique points
idx1: Index of first review
idx2: Index of second review
retries: Maximum retry attempts
Returns:
Disagreement analysis results
"""
system_prompt = """
You are an expert at analyzing academic peer review disagreements.
Compare reviews and identify disagreements across different aspects.
Respond with ONLY valid JSON in this exact format:
{
"disagreement_score": 0.5,
"disagreement_details": {
"Methodology": ["specific disagreement point 1"],
"Experiments": ["specific disagreement point 1"],
"Clarity": [],
"Significance": ["specific disagreement point 1"],
"Novelty": []
}
}
"""
user_prompt = f"""
Compare the following two reviews and identify disagreements across different aspects.
Assess disagreement level (0.0 = perfect agreement, 1.0 = complete disagreement) and
list specific points of disagreement for each category.
Review 1:
Methodology: {list_to_string(review1.get('Methodology', []))}
Experiments: {list_to_string(review1.get('Experiments', []))}
Clarity: {list_to_string(review1.get('Clarity', []))}
Significance: {list_to_string(review1.get('Significance', []))}
Novelty: {list_to_string(review1.get('Novelty', []))}
Review 2:
Methodology: {list_to_string(review2.get('Methodology', []))}
Experiments: {list_to_string(review2.get('Experiments', []))}
Clarity: {list_to_string(review2.get('Clarity', []))}
Significance: {list_to_string(review2.get('Significance', []))}
Novelty: {list_to_string(review2.get('Novelty', []))}
"""
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
for attempt in range(retries):
try:
response = await asyncio.to_thread(
client.chat.completions.create,
model=DISAGREEMENT_MODEL,
messages=messages,
max_tokens=2048,
response_format={"type": "json_object"},
)
if not response.choices or not response.choices[0].message.content.strip():
raise ValueError("Empty response from API")
result = json.loads(response.choices[0].message.content.strip())
# Validate structure
disagreement = DisagreementResult(
review_pair=[idx1, idx2],
disagreement_score=result["disagreement_score"],
disagreement_details=result["disagreement_details"]
)
return disagreement.model_dump()
except Exception as e:
wait_time = 2 ** attempt
print(f"Disagreement detection attempt {attempt + 1} failed: {e}")
if attempt < retries - 1:
await asyncio.sleep(wait_time)
else:
return {
"review_pair": [idx1, idx2],
"disagreement_score": 0.0,
"disagreement_details": {
"Methodology": [],
"Experiments": [],
"Clarity": [],
"Significance": [],
"Novelty": []
},
"error": str(e)
}
async def detect_disagreements(critique_points: List[Dict]) -> List[Dict]:
"""
Detect disagreements across all review pairs
Args:
critique_points: List of critique point dictionaries
Returns:
List of disagreement analyses
"""
if len(critique_points) < 2:
return []
# Generate all review pairs
review_pairs = list(combinations(range(len(critique_points)), 2))
if not review_pairs:
return []
# Process pairs concurrently with rate limiting
tasks = []
for idx1, idx2 in review_pairs:
tasks.append(
compare_review_pair(
critique_points[idx1],
critique_points[idx2],
idx1,
idx2
)
)
# Small delay between API calls
await asyncio.sleep(0.3)
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter results
disagreements = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Review pair {review_pairs[i]} failed: {result}")
else:
disagreements.append(result)
return disagreements