""" Async HTTP client for external API calls. This module provides a pooled async HTTP client with retries, timeouts, and circuit breaker pattern for reliability. Usage: from async_http import http_client # GET request response = await http_client.get("https://api.example.com/data") # POST with JSON response = await http_client.post( "https://api.example.com/data", json={"key": "value"} ) """ import asyncio import time from dataclasses import dataclass, field from enum import Enum from typing import Any, Optional import os # Use httpx for async HTTP - fallback to aiohttp if available try: import httpx HTTP_CLIENT = "httpx" except ImportError: try: import aiohttp HTTP_CLIENT = "aiohttp" except ImportError: HTTP_CLIENT = None class CircuitState(Enum): """Circuit breaker states""" CLOSED = "closed" # Normal operation OPEN = "open" # Failing - reject requests HALF_OPEN = "half_open" # Testing recovery @dataclass class CircuitBreakerConfig: """Circuit breaker configuration""" failure_threshold: int = 5 # Failures before opening recovery_timeout: float = 30.0 # Seconds before half-open success_threshold: int = 2 # Successes to close from half-open @dataclass class CircuitBreaker: """ Circuit breaker for external services. Prevents cascading failures by stopping requests to failing services. """ config: CircuitBreakerConfig = field(default_factory=CircuitBreakerConfig) state: CircuitState = CircuitState.CLOSED failure_count: int = 0 success_count: int = 0 last_failure_time: float = 0 def can_execute(self) -> bool: """Check if request can proceed""" if self.state == CircuitState.CLOSED: return True if self.state == CircuitState.OPEN: # Check if recovery timeout elapsed if time.time() - self.last_failure_time >= self.config.recovery_timeout: self.state = CircuitState.HALF_OPEN self.success_count = 0 return True return False # Half-open: allow limited requests return True def record_success(self) -> None: """Record successful request""" if self.state == CircuitState.HALF_OPEN: self.success_count += 1 if self.success_count >= self.config.success_threshold: self.state = CircuitState.CLOSED self.failure_count = 0 else: self.failure_count = 0 def record_failure(self) -> None: """Record failed request""" self.failure_count += 1 self.last_failure_time = time.time() if self.state == CircuitState.HALF_OPEN: self.state = CircuitState.OPEN elif self.failure_count >= self.config.failure_threshold: self.state = CircuitState.OPEN class CircuitBreakerError(Exception): """Raised when circuit breaker is open""" pass @dataclass class HTTPResponse: """Unified HTTP response object""" status_code: int headers: dict body: bytes json_data: Optional[Any] = None elapsed: float = 0.0 @property def ok(self) -> bool: return 200 <= self.status_code < 300 def json(self) -> Any: if self.json_data is not None: return self.json_data import json self.json_data = json.loads(self.body) return self.json_data @property def text(self) -> str: return self.body.decode("utf-8") class AsyncHTTPClient: """ Async HTTP client with connection pooling, retries, and circuit breaker. """ def __init__( self, timeout: float = 30.0, max_retries: int = 3, retry_delay: float = 1.0, pool_size: int = 10, circuit_breaker_config: Optional[CircuitBreakerConfig] = None ): self.timeout = timeout self.max_retries = max_retries self.retry_delay = retry_delay self.pool_size = pool_size # Circuit breakers per host self._circuit_breakers: dict[str, CircuitBreaker] = {} self._circuit_config = circuit_breaker_config or CircuitBreakerConfig() # Client instance (lazy initialized) self._client = None self._lock = asyncio.Lock() async def _get_client(self): """Get or create HTTP client""" if self._client is None: async with self._lock: if self._client is None: if HTTP_CLIENT == "httpx": self._client = httpx.AsyncClient( timeout=httpx.Timeout(self.timeout), limits=httpx.Limits( max_connections=self.pool_size, max_keepalive_connections=self.pool_size // 2 ), follow_redirects=True ) elif HTTP_CLIENT == "aiohttp": import aiohttp connector = aiohttp.TCPConnector( limit=self.pool_size, keepalive_timeout=30 ) timeout = aiohttp.ClientTimeout(total=self.timeout) self._client = aiohttp.ClientSession( connector=connector, timeout=timeout ) return self._client def _get_circuit_breaker(self, url: str) -> CircuitBreaker: """Get circuit breaker for host""" from urllib.parse import urlparse host = urlparse(url).netloc if host not in self._circuit_breakers: self._circuit_breakers[host] = CircuitBreaker(config=self._circuit_config) return self._circuit_breakers[host] async def request( self, method: str, url: str, headers: Optional[dict] = None, params: Optional[dict] = None, json: Optional[Any] = None, data: Optional[Any] = None, timeout: Optional[float] = None, retries: Optional[int] = None ) -> HTTPResponse: """ Make HTTP request with retries and circuit breaker. Args: method: HTTP method url: Request URL headers: Optional headers params: Query parameters json: JSON body data: Form data timeout: Request timeout override retries: Max retries override Returns: HTTPResponse object """ circuit = self._get_circuit_breaker(url) if not circuit.can_execute(): raise CircuitBreakerError( f"Circuit breaker open for {url}. " f"Service is experiencing issues." ) max_retries = retries if retries is not None else self.max_retries last_error = None start_time = time.time() for attempt in range(max_retries + 1): try: response = await self._do_request( method=method, url=url, headers=headers, params=params, json=json, data=data, timeout=timeout or self.timeout ) circuit.record_success() response.elapsed = time.time() - start_time return response except Exception as e: last_error = e # Don't retry client errors (4xx) if hasattr(e, 'status_code') and 400 <= e.status_code < 500: circuit.record_failure() raise if attempt < max_retries: delay = self.retry_delay * (2 ** attempt) # Exponential backoff await asyncio.sleep(delay) else: circuit.record_failure() raise last_error or Exception("Request failed") async def _do_request( self, method: str, url: str, headers: Optional[dict], params: Optional[dict], json: Optional[Any], data: Optional[Any], timeout: float ) -> HTTPResponse: """Execute single request""" client = await self._get_client() if HTTP_CLIENT == "httpx": response = await client.request( method=method, url=url, headers=headers, params=params, json=json, data=data, timeout=timeout ) return HTTPResponse( status_code=response.status_code, headers=dict(response.headers), body=response.content ) elif HTTP_CLIENT == "aiohttp": async with client.request( method=method, url=url, headers=headers, params=params, json=json, data=data ) as response: body = await response.read() return HTTPResponse( status_code=response.status, headers=dict(response.headers), body=body ) else: # Fallback to urllib (sync) import urllib.request import urllib.parse if params: url = f"{url}?{urllib.parse.urlencode(params)}" req = urllib.request.Request(url, method=method) if headers: for k, v in headers.items(): req.add_header(k, v) body_bytes = None if json: import json as json_module body_bytes = json_module.dumps(json).encode() req.add_header("Content-Type", "application/json") elif data: body_bytes = urllib.parse.urlencode(data).encode() with urllib.request.urlopen(req, body_bytes, timeout=timeout) as response: return HTTPResponse( status_code=response.status, headers=dict(response.headers), body=response.read() ) async def get(self, url: str, **kwargs) -> HTTPResponse: """GET request""" return await self.request("GET", url, **kwargs) async def post(self, url: str, **kwargs) -> HTTPResponse: """POST request""" return await self.request("POST", url, **kwargs) async def put(self, url: str, **kwargs) -> HTTPResponse: """PUT request""" return await self.request("PUT", url, **kwargs) async def delete(self, url: str, **kwargs) -> HTTPResponse: """DELETE request""" return await self.request("DELETE", url, **kwargs) async def patch(self, url: str, **kwargs) -> HTTPResponse: """PATCH request""" return await self.request("PATCH", url, **kwargs) async def close(self): """Close HTTP client""" if self._client: if HTTP_CLIENT == "httpx": await self._client.aclose() elif HTTP_CLIENT == "aiohttp": await self._client.close() self._client = None @property def stats(self) -> dict: """Get client statistics""" return { "circuit_breakers": { host: { "state": cb.state.value, "failure_count": cb.failure_count, "success_count": cb.success_count } for host, cb in self._circuit_breakers.items() } } # Global HTTP client instance http_client = AsyncHTTPClient( timeout=float(os.environ.get('HTTP_TIMEOUT', '30')), max_retries=int(os.environ.get('HTTP_MAX_RETRIES', '3')), pool_size=int(os.environ.get('HTTP_POOL_SIZE', '10')) ) # Convenience functions async def fetch_json(url: str, **kwargs) -> Any: """Fetch JSON from URL""" response = await http_client.get(url, **kwargs) return response.json() async def post_json(url: str, data: Any, **kwargs) -> Any: """POST JSON and get JSON response""" response = await http_client.post(url, json=data, **kwargs) return response.json()