| | """ |
| | API Endpoints for Source Pool Management |
| | Provides endpoints for managing source pools, rotation, and monitoring |
| | """ |
| |
|
| | from datetime import datetime |
| | from typing import Optional, List |
| | from fastapi import APIRouter, HTTPException, Body |
| | from pydantic import BaseModel, Field |
| |
|
| | from database.db_manager import db_manager |
| | from monitoring.source_pool_manager import SourcePoolManager |
| | from utils.logger import setup_logger |
| |
|
| | logger = setup_logger("pool_api") |
| |
|
| | |
| | router = APIRouter(prefix="/api/pools", tags=["source_pools"]) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class CreatePoolRequest(BaseModel): |
| | """Request model for creating a pool""" |
| | name: str = Field(..., description="Pool name") |
| | category: str = Field(..., description="Pool category") |
| | description: Optional[str] = Field(None, description="Pool description") |
| | rotation_strategy: str = Field("round_robin", description="Rotation strategy") |
| |
|
| |
|
| | class AddMemberRequest(BaseModel): |
| | """Request model for adding a member to a pool""" |
| | provider_id: int = Field(..., description="Provider ID") |
| | priority: int = Field(1, description="Provider priority") |
| | weight: int = Field(1, description="Provider weight") |
| |
|
| |
|
| | class UpdatePoolRequest(BaseModel): |
| | """Request model for updating a pool""" |
| | rotation_strategy: Optional[str] = Field(None, description="Rotation strategy") |
| | enabled: Optional[bool] = Field(None, description="Pool enabled status") |
| | description: Optional[str] = Field(None, description="Pool description") |
| |
|
| |
|
| | class UpdateMemberRequest(BaseModel): |
| | """Request model for updating a pool member""" |
| | priority: Optional[int] = Field(None, description="Provider priority") |
| | weight: Optional[int] = Field(None, description="Provider weight") |
| | enabled: Optional[bool] = Field(None, description="Member enabled status") |
| |
|
| |
|
| | class TriggerRotationRequest(BaseModel): |
| | """Request model for triggering manual rotation""" |
| | reason: str = Field("manual", description="Rotation reason") |
| |
|
| |
|
| | class FailoverRequest(BaseModel): |
| | """Request model for triggering failover""" |
| | failed_provider_id: int = Field(..., description="Failed provider ID") |
| | reason: str = Field("manual_failover", description="Failover reason") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.get("") |
| | async def list_pools(): |
| | """ |
| | Get list of all source pools with their status |
| | |
| | Returns: |
| | List of source pools with status information |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| | pool_manager = SourcePoolManager(session) |
| |
|
| | pools_status = pool_manager.get_all_pools_status() |
| |
|
| | session.close() |
| |
|
| | return { |
| | "pools": pools_status, |
| | "total": len(pools_status), |
| | "timestamp": datetime.utcnow().isoformat() |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Error listing pools: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to list pools: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.post("") |
| | async def create_pool(request: CreatePoolRequest): |
| | """ |
| | Create a new source pool |
| | |
| | Args: |
| | request: Pool creation request |
| | |
| | Returns: |
| | Created pool information |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| | pool_manager = SourcePoolManager(session) |
| |
|
| | pool = pool_manager.create_pool( |
| | name=request.name, |
| | category=request.category, |
| | description=request.description, |
| | rotation_strategy=request.rotation_strategy |
| | ) |
| |
|
| | session.close() |
| |
|
| | return { |
| | "pool_id": pool.id, |
| | "name": pool.name, |
| | "category": pool.category, |
| | "rotation_strategy": pool.rotation_strategy, |
| | "created_at": pool.created_at.isoformat(), |
| | "message": f"Pool '{pool.name}' created successfully" |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Error creating pool: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to create pool: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.get("/{pool_id}") |
| | async def get_pool_status(pool_id: int): |
| | """ |
| | Get detailed status of a specific pool |
| | |
| | Args: |
| | pool_id: Pool ID |
| | |
| | Returns: |
| | Detailed pool status |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| | pool_manager = SourcePoolManager(session) |
| |
|
| | pool_status = pool_manager.get_pool_status(pool_id) |
| |
|
| | session.close() |
| |
|
| | if not pool_status: |
| | raise HTTPException(status_code=404, detail=f"Pool {pool_id} not found") |
| |
|
| | return pool_status |
| |
|
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error getting pool status: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to get pool status: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.put("/{pool_id}") |
| | async def update_pool(pool_id: int, request: UpdatePoolRequest): |
| | """ |
| | Update pool configuration |
| | |
| | Args: |
| | pool_id: Pool ID |
| | request: Update request |
| | |
| | Returns: |
| | Updated pool information |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| |
|
| | |
| | from database.models import SourcePool |
| | pool = session.query(SourcePool).filter_by(id=pool_id).first() |
| |
|
| | if not pool: |
| | session.close() |
| | raise HTTPException(status_code=404, detail=f"Pool {pool_id} not found") |
| |
|
| | |
| | if request.rotation_strategy is not None: |
| | pool.rotation_strategy = request.rotation_strategy |
| | if request.enabled is not None: |
| | pool.enabled = request.enabled |
| | if request.description is not None: |
| | pool.description = request.description |
| |
|
| | pool.updated_at = datetime.utcnow() |
| |
|
| | session.commit() |
| | session.refresh(pool) |
| |
|
| | result = { |
| | "pool_id": pool.id, |
| | "name": pool.name, |
| | "rotation_strategy": pool.rotation_strategy, |
| | "enabled": pool.enabled, |
| | "updated_at": pool.updated_at.isoformat(), |
| | "message": f"Pool '{pool.name}' updated successfully" |
| | } |
| |
|
| | session.close() |
| |
|
| | return result |
| |
|
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error updating pool: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to update pool: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.delete("/{pool_id}") |
| | async def delete_pool(pool_id: int): |
| | """ |
| | Delete a source pool |
| | |
| | Args: |
| | pool_id: Pool ID |
| | |
| | Returns: |
| | Deletion confirmation |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| |
|
| | from database.models import SourcePool |
| | pool = session.query(SourcePool).filter_by(id=pool_id).first() |
| |
|
| | if not pool: |
| | session.close() |
| | raise HTTPException(status_code=404, detail=f"Pool {pool_id} not found") |
| |
|
| | pool_name = pool.name |
| | session.delete(pool) |
| | session.commit() |
| | session.close() |
| |
|
| | return { |
| | "message": f"Pool '{pool_name}' deleted successfully", |
| | "pool_id": pool_id |
| | } |
| |
|
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error deleting pool: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to delete pool: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.post("/{pool_id}/members") |
| | async def add_pool_member(pool_id: int, request: AddMemberRequest): |
| | """ |
| | Add a provider to a pool |
| | |
| | Args: |
| | pool_id: Pool ID |
| | request: Add member request |
| | |
| | Returns: |
| | Created member information |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| | pool_manager = SourcePoolManager(session) |
| |
|
| | member = pool_manager.add_to_pool( |
| | pool_id=pool_id, |
| | provider_id=request.provider_id, |
| | priority=request.priority, |
| | weight=request.weight |
| | ) |
| |
|
| | |
| | from database.models import Provider |
| | provider = session.query(Provider).get(request.provider_id) |
| |
|
| | session.close() |
| |
|
| | return { |
| | "member_id": member.id, |
| | "pool_id": pool_id, |
| | "provider_id": request.provider_id, |
| | "provider_name": provider.name if provider else None, |
| | "priority": member.priority, |
| | "weight": member.weight, |
| | "message": f"Provider added to pool successfully" |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Error adding pool member: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to add pool member: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.put("/{pool_id}/members/{provider_id}") |
| | async def update_pool_member( |
| | pool_id: int, |
| | provider_id: int, |
| | request: UpdateMemberRequest |
| | ): |
| | """ |
| | Update a pool member configuration |
| | |
| | Args: |
| | pool_id: Pool ID |
| | provider_id: Provider ID |
| | request: Update request |
| | |
| | Returns: |
| | Updated member information |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| |
|
| | from database.models import PoolMember |
| | member = ( |
| | session.query(PoolMember) |
| | .filter_by(pool_id=pool_id, provider_id=provider_id) |
| | .first() |
| | ) |
| |
|
| | if not member: |
| | session.close() |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"Member not found in pool {pool_id}" |
| | ) |
| |
|
| | |
| | if request.priority is not None: |
| | member.priority = request.priority |
| | if request.weight is not None: |
| | member.weight = request.weight |
| | if request.enabled is not None: |
| | member.enabled = request.enabled |
| |
|
| | session.commit() |
| | session.refresh(member) |
| |
|
| | result = { |
| | "pool_id": pool_id, |
| | "provider_id": provider_id, |
| | "priority": member.priority, |
| | "weight": member.weight, |
| | "enabled": member.enabled, |
| | "message": "Pool member updated successfully" |
| | } |
| |
|
| | session.close() |
| |
|
| | return result |
| |
|
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error updating pool member: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to update pool member: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.delete("/{pool_id}/members/{provider_id}") |
| | async def remove_pool_member(pool_id: int, provider_id: int): |
| | """ |
| | Remove a provider from a pool |
| | |
| | Args: |
| | pool_id: Pool ID |
| | provider_id: Provider ID |
| | |
| | Returns: |
| | Deletion confirmation |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| |
|
| | from database.models import PoolMember |
| | member = ( |
| | session.query(PoolMember) |
| | .filter_by(pool_id=pool_id, provider_id=provider_id) |
| | .first() |
| | ) |
| |
|
| | if not member: |
| | session.close() |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"Member not found in pool {pool_id}" |
| | ) |
| |
|
| | session.delete(member) |
| | session.commit() |
| | session.close() |
| |
|
| | return { |
| | "message": "Provider removed from pool successfully", |
| | "pool_id": pool_id, |
| | "provider_id": provider_id |
| | } |
| |
|
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error removing pool member: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to remove pool member: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.post("/{pool_id}/rotate") |
| | async def trigger_rotation(pool_id: int, request: TriggerRotationRequest): |
| | """ |
| | Trigger manual rotation to next provider in pool |
| | |
| | Args: |
| | pool_id: Pool ID |
| | request: Rotation request |
| | |
| | Returns: |
| | New provider information |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| | pool_manager = SourcePoolManager(session) |
| |
|
| | provider = pool_manager.get_next_provider(pool_id) |
| |
|
| | session.close() |
| |
|
| | if not provider: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"No available providers in pool {pool_id}" |
| | ) |
| |
|
| | return { |
| | "pool_id": pool_id, |
| | "provider_id": provider.id, |
| | "provider_name": provider.name, |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "message": f"Rotated to provider '{provider.name}'" |
| | } |
| |
|
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error triggering rotation: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to trigger rotation: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.post("/{pool_id}/failover") |
| | async def trigger_failover(pool_id: int, request: FailoverRequest): |
| | """ |
| | Trigger failover from a failed provider |
| | |
| | Args: |
| | pool_id: Pool ID |
| | request: Failover request |
| | |
| | Returns: |
| | New provider information |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| | pool_manager = SourcePoolManager(session) |
| |
|
| | provider = pool_manager.failover( |
| | pool_id=pool_id, |
| | failed_provider_id=request.failed_provider_id, |
| | reason=request.reason |
| | ) |
| |
|
| | session.close() |
| |
|
| | if not provider: |
| | raise HTTPException( |
| | status_code=404, |
| | detail=f"No alternative providers available in pool {pool_id}" |
| | ) |
| |
|
| | return { |
| | "pool_id": pool_id, |
| | "failed_provider_id": request.failed_provider_id, |
| | "new_provider_id": provider.id, |
| | "new_provider_name": provider.name, |
| | "timestamp": datetime.utcnow().isoformat(), |
| | "message": f"Failover successful: switched to '{provider.name}'" |
| | } |
| |
|
| | except HTTPException: |
| | raise |
| | except Exception as e: |
| | logger.error(f"Error triggering failover: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to trigger failover: {str(e)}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @router.get("/{pool_id}/history") |
| | async def get_rotation_history(pool_id: int, limit: int = 50): |
| | """ |
| | Get rotation history for a pool |
| | |
| | Args: |
| | pool_id: Pool ID |
| | limit: Maximum number of records to return |
| | |
| | Returns: |
| | List of rotation history records |
| | """ |
| | try: |
| | session = db_manager.get_session() |
| |
|
| | from database.models import RotationHistory, Provider |
| | history = ( |
| | session.query(RotationHistory) |
| | .filter_by(pool_id=pool_id) |
| | .order_by(RotationHistory.timestamp.desc()) |
| | .limit(limit) |
| | .all() |
| | ) |
| |
|
| | history_list = [] |
| | for record in history: |
| | from_provider = None |
| | if record.from_provider_id: |
| | from_prov = session.query(Provider).get(record.from_provider_id) |
| | from_provider = from_prov.name if from_prov else None |
| |
|
| | to_prov = session.query(Provider).get(record.to_provider_id) |
| | to_provider = to_prov.name if to_prov else None |
| |
|
| | history_list.append({ |
| | "id": record.id, |
| | "timestamp": record.timestamp.isoformat(), |
| | "from_provider": from_provider, |
| | "to_provider": to_provider, |
| | "reason": record.rotation_reason, |
| | "success": record.success, |
| | "notes": record.notes |
| | }) |
| |
|
| | session.close() |
| |
|
| | return { |
| | "pool_id": pool_id, |
| | "history": history_list, |
| | "total": len(history_list) |
| | } |
| |
|
| | except Exception as e: |
| | logger.error(f"Error getting rotation history: {e}", exc_info=True) |
| | raise HTTPException(status_code=500, detail=f"Failed to get rotation history: {str(e)}") |
| |
|
| |
|
| | logger.info("Pool API endpoints module loaded successfully") |
| |
|