import json import os import re from typing import List, Dict from openai import OpenAI from pydantic import BaseModel import asyncio from dotenv import load_dotenv load_dotenv() # Initialize OpenRouter client client = OpenAI( base_url="https://openrouter.ai/api/v1", api_key=os.getenv("OPENROUTER_API_KEY"), ) # Priority list of models to try # 1. DeepSeek R1 (Best reasoning, most expensive) # 2. DeepSeek R1 Distill (Good reasoning, cheaper) # 3. Gemini 2.5 Flash Lite (Cheap, fast fallback) MODELS = [ "deepseek/deepseek-r1", "deepseek/deepseek-r1-distill-llama-70b", "google/gemini-2.5-flash-lite" ] class ResolutionDetails(BaseModel): accepted_critique_points: Dict[str, List[str]] rejected_critique_points: Dict[str, List[str]] final_resolution_summary: str class DisagreementResolutionResult(BaseModel): review_pair: List[int] resolution_details: ResolutionDetails def construct_resolution_prompt( paper_title: str, paper_abstract: str, disagreement: Dict, combined_critiques: Dict, sota_results: str, retrieved_evidence: Dict ) -> tuple: """ Construct prompt for disagreement resolution """ system_prompt = """ You are an AI specialized in resolving academic peer review disagreements. Your task is to analyze critiques, verify evidence, and provide a structured resolution. IMPORTANT: detailed reasoning is allowed, but the FINAL output must be valid JSON only. Respond in the following JSON format: { "accepted_critique_points": {"category": ["critique_1", "critique_2"]}, "rejected_critique_points": {"category": ["critique_3"]}, "final_resolution_summary": "After analyzing critiques and evidence, we conclude that..." } """ disagreement_details = disagreement.get('disagreement_details', {}) disagreement_score = disagreement.get('disagreement_score', 0.0) user_prompt = f""" ### **Paper Details** **Title:** {paper_title} **Abstract:** {paper_abstract} ### **Reviewer Disagreement (Score: {disagreement_score})** - **Methodology:** {', '.join(disagreement_details.get('Methodology', ['N/A']))} - **Experiments:** {', '.join(disagreement_details.get('Experiments', ['N/A']))} - **Clarity:** {', '.join(disagreement_details.get('Clarity', ['N/A']))} - **Significance:** {', '.join(disagreement_details.get('Significance', ['N/A']))} - **Novelty:** {', '.join(disagreement_details.get('Novelty', ['N/A']))} ### **Supporting Information** **Combined Critique Points from Reviews:** {json.dumps(combined_critiques, indent=2)} **State-of-the-Art (SoTA) Findings:** {sota_results[:2000]} **Retrieved Evidence:** {json.dumps(retrieved_evidence, indent=2)[:2000]} ### **Resolution Task** 1. Validate critique points and categorize them into accepted or rejected. 2. Compare with SoTA research and retrieved evidence. 3. Provide a final resolution summary explaining whether the disagreement is justified. Respond with ONLY valid JSON. """ return system_prompt, user_prompt def extract_json_from_text(text: str) -> Dict: """ Robustly extract JSON from text that might contain markdown or thinking traces. """ try: # 1. Try straightforward parse return json.loads(text) except json.JSONDecodeError: pass # 2. Try removing markdown code blocks if "```json" in text: pattern = r"```json(.*?)```" match = re.search(pattern, text, re.DOTALL) if match: try: return json.loads(match.group(1).strip()) except: pass # 3. Regex search for the outermost curly braces # This handles cases where DeepSeek outputs ... before the JSON try: match = re.search(r"(\{.*\})", text, re.DOTALL) if match: return json.loads(match.group(1)) except: pass raise ValueError("Could not extract valid JSON from model response") async def resolve_single_disagreement( paper_title: str, paper_abstract: str, disagreement: Dict, combined_critiques: Dict, sota_results: str, retrieved_evidence: Dict, retries: int = 3 # Reduced retries since we have model fallback ) -> Dict: """ Resolve a single disagreement with Model Fallback and Token Limits """ system_prompt, user_prompt = construct_resolution_prompt( paper_title, paper_abstract, disagreement, combined_critiques, sota_results, retrieved_evidence ) messages = [ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}, ] last_exception = None # Loop through available models in case of error (402 Payment, 429 Rate Limit) for model in MODELS: print(f"Attempting resolution with model: {model}") for attempt in range(retries): try: response = await asyncio.to_thread( client.chat.completions.create, model=model, messages=messages, # CRITICAL FIX: Limit max_tokens to prevent "Insufficient Credits" error # OpenRouter reserves credits based on this number. max_tokens=4096, response_format={"type": "json_object"}, ) if not response.choices or not response.choices[0].message.content.strip(): raise ValueError("Empty response from AI") content = response.choices[0].message.content.strip() llm_output = extract_json_from_text(content) # Validate required keys required_keys = { "accepted_critique_points", "rejected_critique_points", "final_resolution_summary" } if not required_keys.issubset(llm_output.keys()): raise ValueError(f"Missing keys. Present: {llm_output.keys()}") # Validate structure resolution = DisagreementResolutionResult( review_pair=disagreement.get('review_pair', [0, 1]), resolution_details=ResolutionDetails(**llm_output) ) return resolution.model_dump() except Exception as e: last_exception = e error_msg = str(e) print(f"Model {model} - Attempt {attempt + 1} failed: {error_msg}") # Immediate fallback on payment errors if "402" in error_msg or "insufficient_quota" in error_msg: print("Insufficient credits detected. Switching to cheaper model...") break # Break retry loop to go to next model wait_time = 2 ** attempt if attempt < retries - 1: await asyncio.sleep(wait_time) # If all models and retries fail return { "review_pair": disagreement.get('review_pair', [0, 1]), "resolution_details": { "accepted_critique_points": {}, "rejected_critique_points": {}, "final_resolution_summary": f"Failed to resolve disagreement after trying multiple models. Final Error: {str(last_exception)}" }, "error": str(last_exception) } async def resolve_disagreements( paper_title: str, paper_abstract: str, disagreements: List[Dict], critique_points: List[Dict], search_results: Dict ) -> List[Dict]: """ Resolve all disagreements """ if not disagreements: return [] combined_critiques = search_results.get('Combined_Critiques', {}) sota_results = search_results.get('SoTA_Results', '') retrieved_evidence = search_results.get('Retrieved_Evidence', {}) # Process disagreements with rate limiting tasks = [] for disagreement in disagreements: tasks.append( resolve_single_disagreement( paper_title, paper_abstract, disagreement, combined_critiques, sota_results, retrieved_evidence ) ) # Delay between API calls await asyncio.sleep(1) results = await asyncio.gather(*tasks, return_exceptions=True) # Filter results resolutions = [] for i, result in enumerate(results): if isinstance(result, Exception): print(f"Resolution {i} failed: {result}") else: resolutions.append(result) return resolutions