Spaces:
Sleeping
Sleeping
| """ | |
| Async HTTP client for external API calls. | |
| This module provides a pooled async HTTP client with retries, | |
| timeouts, and circuit breaker pattern for reliability. | |
| Usage: | |
| from async_http import http_client | |
| # GET request | |
| response = await http_client.get("https://api.example.com/data") | |
| # POST with JSON | |
| response = await http_client.post( | |
| "https://api.example.com/data", | |
| json={"key": "value"} | |
| ) | |
| """ | |
| import asyncio | |
| import time | |
| from dataclasses import dataclass, field | |
| from enum import Enum | |
| from typing import Any, Optional | |
| import os | |
| # Use httpx for async HTTP - fallback to aiohttp if available | |
| try: | |
| import httpx | |
| HTTP_CLIENT = "httpx" | |
| except ImportError: | |
| try: | |
| import aiohttp | |
| HTTP_CLIENT = "aiohttp" | |
| except ImportError: | |
| HTTP_CLIENT = None | |
| class CircuitState(Enum): | |
| """Circuit breaker states""" | |
| CLOSED = "closed" # Normal operation | |
| OPEN = "open" # Failing - reject requests | |
| HALF_OPEN = "half_open" # Testing recovery | |
| class CircuitBreakerConfig: | |
| """Circuit breaker configuration""" | |
| failure_threshold: int = 5 # Failures before opening | |
| recovery_timeout: float = 30.0 # Seconds before half-open | |
| success_threshold: int = 2 # Successes to close from half-open | |
| class CircuitBreaker: | |
| """ | |
| Circuit breaker for external services. | |
| Prevents cascading failures by stopping requests | |
| to failing services. | |
| """ | |
| config: CircuitBreakerConfig = field(default_factory=CircuitBreakerConfig) | |
| state: CircuitState = CircuitState.CLOSED | |
| failure_count: int = 0 | |
| success_count: int = 0 | |
| last_failure_time: float = 0 | |
| def can_execute(self) -> bool: | |
| """Check if request can proceed""" | |
| if self.state == CircuitState.CLOSED: | |
| return True | |
| if self.state == CircuitState.OPEN: | |
| # Check if recovery timeout elapsed | |
| if time.time() - self.last_failure_time >= self.config.recovery_timeout: | |
| self.state = CircuitState.HALF_OPEN | |
| self.success_count = 0 | |
| return True | |
| return False | |
| # Half-open: allow limited requests | |
| return True | |
| def record_success(self) -> None: | |
| """Record successful request""" | |
| if self.state == CircuitState.HALF_OPEN: | |
| self.success_count += 1 | |
| if self.success_count >= self.config.success_threshold: | |
| self.state = CircuitState.CLOSED | |
| self.failure_count = 0 | |
| else: | |
| self.failure_count = 0 | |
| def record_failure(self) -> None: | |
| """Record failed request""" | |
| self.failure_count += 1 | |
| self.last_failure_time = time.time() | |
| if self.state == CircuitState.HALF_OPEN: | |
| self.state = CircuitState.OPEN | |
| elif self.failure_count >= self.config.failure_threshold: | |
| self.state = CircuitState.OPEN | |
| class CircuitBreakerError(Exception): | |
| """Raised when circuit breaker is open""" | |
| pass | |
| class HTTPResponse: | |
| """Unified HTTP response object""" | |
| status_code: int | |
| headers: dict | |
| body: bytes | |
| json_data: Optional[Any] = None | |
| elapsed: float = 0.0 | |
| def ok(self) -> bool: | |
| return 200 <= self.status_code < 300 | |
| def json(self) -> Any: | |
| if self.json_data is not None: | |
| return self.json_data | |
| import json | |
| self.json_data = json.loads(self.body) | |
| return self.json_data | |
| def text(self) -> str: | |
| return self.body.decode("utf-8") | |
| class AsyncHTTPClient: | |
| """ | |
| Async HTTP client with connection pooling, retries, and circuit breaker. | |
| """ | |
| def __init__( | |
| self, | |
| timeout: float = 30.0, | |
| max_retries: int = 3, | |
| retry_delay: float = 1.0, | |
| pool_size: int = 10, | |
| circuit_breaker_config: Optional[CircuitBreakerConfig] = None | |
| ): | |
| self.timeout = timeout | |
| self.max_retries = max_retries | |
| self.retry_delay = retry_delay | |
| self.pool_size = pool_size | |
| # Circuit breakers per host | |
| self._circuit_breakers: dict[str, CircuitBreaker] = {} | |
| self._circuit_config = circuit_breaker_config or CircuitBreakerConfig() | |
| # Client instance (lazy initialized) | |
| self._client = None | |
| self._lock = asyncio.Lock() | |
| async def _get_client(self): | |
| """Get or create HTTP client""" | |
| if self._client is None: | |
| async with self._lock: | |
| if self._client is None: | |
| if HTTP_CLIENT == "httpx": | |
| self._client = httpx.AsyncClient( | |
| timeout=httpx.Timeout(self.timeout), | |
| limits=httpx.Limits( | |
| max_connections=self.pool_size, | |
| max_keepalive_connections=self.pool_size // 2 | |
| ), | |
| follow_redirects=True | |
| ) | |
| elif HTTP_CLIENT == "aiohttp": | |
| import aiohttp | |
| connector = aiohttp.TCPConnector( | |
| limit=self.pool_size, | |
| keepalive_timeout=30 | |
| ) | |
| timeout = aiohttp.ClientTimeout(total=self.timeout) | |
| self._client = aiohttp.ClientSession( | |
| connector=connector, | |
| timeout=timeout | |
| ) | |
| return self._client | |
| def _get_circuit_breaker(self, url: str) -> CircuitBreaker: | |
| """Get circuit breaker for host""" | |
| from urllib.parse import urlparse | |
| host = urlparse(url).netloc | |
| if host not in self._circuit_breakers: | |
| self._circuit_breakers[host] = CircuitBreaker(config=self._circuit_config) | |
| return self._circuit_breakers[host] | |
| async def request( | |
| self, | |
| method: str, | |
| url: str, | |
| headers: Optional[dict] = None, | |
| params: Optional[dict] = None, | |
| json: Optional[Any] = None, | |
| data: Optional[Any] = None, | |
| timeout: Optional[float] = None, | |
| retries: Optional[int] = None | |
| ) -> HTTPResponse: | |
| """ | |
| Make HTTP request with retries and circuit breaker. | |
| Args: | |
| method: HTTP method | |
| url: Request URL | |
| headers: Optional headers | |
| params: Query parameters | |
| json: JSON body | |
| data: Form data | |
| timeout: Request timeout override | |
| retries: Max retries override | |
| Returns: | |
| HTTPResponse object | |
| """ | |
| circuit = self._get_circuit_breaker(url) | |
| if not circuit.can_execute(): | |
| raise CircuitBreakerError( | |
| f"Circuit breaker open for {url}. " | |
| f"Service is experiencing issues." | |
| ) | |
| max_retries = retries if retries is not None else self.max_retries | |
| last_error = None | |
| start_time = time.time() | |
| for attempt in range(max_retries + 1): | |
| try: | |
| response = await self._do_request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| params=params, | |
| json=json, | |
| data=data, | |
| timeout=timeout or self.timeout | |
| ) | |
| circuit.record_success() | |
| response.elapsed = time.time() - start_time | |
| return response | |
| except Exception as e: | |
| last_error = e | |
| # Don't retry client errors (4xx) | |
| if hasattr(e, 'status_code') and 400 <= e.status_code < 500: | |
| circuit.record_failure() | |
| raise | |
| if attempt < max_retries: | |
| delay = self.retry_delay * (2 ** attempt) # Exponential backoff | |
| await asyncio.sleep(delay) | |
| else: | |
| circuit.record_failure() | |
| raise last_error or Exception("Request failed") | |
| async def _do_request( | |
| self, | |
| method: str, | |
| url: str, | |
| headers: Optional[dict], | |
| params: Optional[dict], | |
| json: Optional[Any], | |
| data: Optional[Any], | |
| timeout: float | |
| ) -> HTTPResponse: | |
| """Execute single request""" | |
| client = await self._get_client() | |
| if HTTP_CLIENT == "httpx": | |
| response = await client.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| params=params, | |
| json=json, | |
| data=data, | |
| timeout=timeout | |
| ) | |
| return HTTPResponse( | |
| status_code=response.status_code, | |
| headers=dict(response.headers), | |
| body=response.content | |
| ) | |
| elif HTTP_CLIENT == "aiohttp": | |
| async with client.request( | |
| method=method, | |
| url=url, | |
| headers=headers, | |
| params=params, | |
| json=json, | |
| data=data | |
| ) as response: | |
| body = await response.read() | |
| return HTTPResponse( | |
| status_code=response.status, | |
| headers=dict(response.headers), | |
| body=body | |
| ) | |
| else: | |
| # Fallback to urllib (sync) | |
| import urllib.request | |
| import urllib.parse | |
| if params: | |
| url = f"{url}?{urllib.parse.urlencode(params)}" | |
| req = urllib.request.Request(url, method=method) | |
| if headers: | |
| for k, v in headers.items(): | |
| req.add_header(k, v) | |
| body_bytes = None | |
| if json: | |
| import json as json_module | |
| body_bytes = json_module.dumps(json).encode() | |
| req.add_header("Content-Type", "application/json") | |
| elif data: | |
| body_bytes = urllib.parse.urlencode(data).encode() | |
| with urllib.request.urlopen(req, body_bytes, timeout=timeout) as response: | |
| return HTTPResponse( | |
| status_code=response.status, | |
| headers=dict(response.headers), | |
| body=response.read() | |
| ) | |
| async def get(self, url: str, **kwargs) -> HTTPResponse: | |
| """GET request""" | |
| return await self.request("GET", url, **kwargs) | |
| async def post(self, url: str, **kwargs) -> HTTPResponse: | |
| """POST request""" | |
| return await self.request("POST", url, **kwargs) | |
| async def put(self, url: str, **kwargs) -> HTTPResponse: | |
| """PUT request""" | |
| return await self.request("PUT", url, **kwargs) | |
| async def delete(self, url: str, **kwargs) -> HTTPResponse: | |
| """DELETE request""" | |
| return await self.request("DELETE", url, **kwargs) | |
| async def patch(self, url: str, **kwargs) -> HTTPResponse: | |
| """PATCH request""" | |
| return await self.request("PATCH", url, **kwargs) | |
| async def close(self): | |
| """Close HTTP client""" | |
| if self._client: | |
| if HTTP_CLIENT == "httpx": | |
| await self._client.aclose() | |
| elif HTTP_CLIENT == "aiohttp": | |
| await self._client.close() | |
| self._client = None | |
| def stats(self) -> dict: | |
| """Get client statistics""" | |
| return { | |
| "circuit_breakers": { | |
| host: { | |
| "state": cb.state.value, | |
| "failure_count": cb.failure_count, | |
| "success_count": cb.success_count | |
| } | |
| for host, cb in self._circuit_breakers.items() | |
| } | |
| } | |
| # Global HTTP client instance | |
| http_client = AsyncHTTPClient( | |
| timeout=float(os.environ.get('HTTP_TIMEOUT', '30')), | |
| max_retries=int(os.environ.get('HTTP_MAX_RETRIES', '3')), | |
| pool_size=int(os.environ.get('HTTP_POOL_SIZE', '10')) | |
| ) | |
| # Convenience functions | |
| async def fetch_json(url: str, **kwargs) -> Any: | |
| """Fetch JSON from URL""" | |
| response = await http_client.get(url, **kwargs) | |
| return response.json() | |
| async def post_json(url: str, data: Any, **kwargs) -> Any: | |
| """POST JSON and get JSON response""" | |
| response = await http_client.post(url, json=data, **kwargs) | |
| return response.json() | |