InnSight-Backend / api /async_http.py
jackonthemike's picture
feat: Sync backend updates including AI Revenue Analyst
cef0de3
"""
Async HTTP client for external API calls.
This module provides a pooled async HTTP client with retries,
timeouts, and circuit breaker pattern for reliability.
Usage:
from async_http import http_client
# GET request
response = await http_client.get("https://api.example.com/data")
# POST with JSON
response = await http_client.post(
"https://api.example.com/data",
json={"key": "value"}
)
"""
import asyncio
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional
import os
# Use httpx for async HTTP - fallback to aiohttp if available
try:
import httpx
HTTP_CLIENT = "httpx"
except ImportError:
try:
import aiohttp
HTTP_CLIENT = "aiohttp"
except ImportError:
HTTP_CLIENT = None
class CircuitState(Enum):
"""Circuit breaker states"""
CLOSED = "closed" # Normal operation
OPEN = "open" # Failing - reject requests
HALF_OPEN = "half_open" # Testing recovery
@dataclass
class CircuitBreakerConfig:
"""Circuit breaker configuration"""
failure_threshold: int = 5 # Failures before opening
recovery_timeout: float = 30.0 # Seconds before half-open
success_threshold: int = 2 # Successes to close from half-open
@dataclass
class CircuitBreaker:
"""
Circuit breaker for external services.
Prevents cascading failures by stopping requests
to failing services.
"""
config: CircuitBreakerConfig = field(default_factory=CircuitBreakerConfig)
state: CircuitState = CircuitState.CLOSED
failure_count: int = 0
success_count: int = 0
last_failure_time: float = 0
def can_execute(self) -> bool:
"""Check if request can proceed"""
if self.state == CircuitState.CLOSED:
return True
if self.state == CircuitState.OPEN:
# Check if recovery timeout elapsed
if time.time() - self.last_failure_time >= self.config.recovery_timeout:
self.state = CircuitState.HALF_OPEN
self.success_count = 0
return True
return False
# Half-open: allow limited requests
return True
def record_success(self) -> None:
"""Record successful request"""
if self.state == CircuitState.HALF_OPEN:
self.success_count += 1
if self.success_count >= self.config.success_threshold:
self.state = CircuitState.CLOSED
self.failure_count = 0
else:
self.failure_count = 0
def record_failure(self) -> None:
"""Record failed request"""
self.failure_count += 1
self.last_failure_time = time.time()
if self.state == CircuitState.HALF_OPEN:
self.state = CircuitState.OPEN
elif self.failure_count >= self.config.failure_threshold:
self.state = CircuitState.OPEN
class CircuitBreakerError(Exception):
"""Raised when circuit breaker is open"""
pass
@dataclass
class HTTPResponse:
"""Unified HTTP response object"""
status_code: int
headers: dict
body: bytes
json_data: Optional[Any] = None
elapsed: float = 0.0
@property
def ok(self) -> bool:
return 200 <= self.status_code < 300
def json(self) -> Any:
if self.json_data is not None:
return self.json_data
import json
self.json_data = json.loads(self.body)
return self.json_data
@property
def text(self) -> str:
return self.body.decode("utf-8")
class AsyncHTTPClient:
"""
Async HTTP client with connection pooling, retries, and circuit breaker.
"""
def __init__(
self,
timeout: float = 30.0,
max_retries: int = 3,
retry_delay: float = 1.0,
pool_size: int = 10,
circuit_breaker_config: Optional[CircuitBreakerConfig] = None
):
self.timeout = timeout
self.max_retries = max_retries
self.retry_delay = retry_delay
self.pool_size = pool_size
# Circuit breakers per host
self._circuit_breakers: dict[str, CircuitBreaker] = {}
self._circuit_config = circuit_breaker_config or CircuitBreakerConfig()
# Client instance (lazy initialized)
self._client = None
self._lock = asyncio.Lock()
async def _get_client(self):
"""Get or create HTTP client"""
if self._client is None:
async with self._lock:
if self._client is None:
if HTTP_CLIENT == "httpx":
self._client = httpx.AsyncClient(
timeout=httpx.Timeout(self.timeout),
limits=httpx.Limits(
max_connections=self.pool_size,
max_keepalive_connections=self.pool_size // 2
),
follow_redirects=True
)
elif HTTP_CLIENT == "aiohttp":
import aiohttp
connector = aiohttp.TCPConnector(
limit=self.pool_size,
keepalive_timeout=30
)
timeout = aiohttp.ClientTimeout(total=self.timeout)
self._client = aiohttp.ClientSession(
connector=connector,
timeout=timeout
)
return self._client
def _get_circuit_breaker(self, url: str) -> CircuitBreaker:
"""Get circuit breaker for host"""
from urllib.parse import urlparse
host = urlparse(url).netloc
if host not in self._circuit_breakers:
self._circuit_breakers[host] = CircuitBreaker(config=self._circuit_config)
return self._circuit_breakers[host]
async def request(
self,
method: str,
url: str,
headers: Optional[dict] = None,
params: Optional[dict] = None,
json: Optional[Any] = None,
data: Optional[Any] = None,
timeout: Optional[float] = None,
retries: Optional[int] = None
) -> HTTPResponse:
"""
Make HTTP request with retries and circuit breaker.
Args:
method: HTTP method
url: Request URL
headers: Optional headers
params: Query parameters
json: JSON body
data: Form data
timeout: Request timeout override
retries: Max retries override
Returns:
HTTPResponse object
"""
circuit = self._get_circuit_breaker(url)
if not circuit.can_execute():
raise CircuitBreakerError(
f"Circuit breaker open for {url}. "
f"Service is experiencing issues."
)
max_retries = retries if retries is not None else self.max_retries
last_error = None
start_time = time.time()
for attempt in range(max_retries + 1):
try:
response = await self._do_request(
method=method,
url=url,
headers=headers,
params=params,
json=json,
data=data,
timeout=timeout or self.timeout
)
circuit.record_success()
response.elapsed = time.time() - start_time
return response
except Exception as e:
last_error = e
# Don't retry client errors (4xx)
if hasattr(e, 'status_code') and 400 <= e.status_code < 500:
circuit.record_failure()
raise
if attempt < max_retries:
delay = self.retry_delay * (2 ** attempt) # Exponential backoff
await asyncio.sleep(delay)
else:
circuit.record_failure()
raise last_error or Exception("Request failed")
async def _do_request(
self,
method: str,
url: str,
headers: Optional[dict],
params: Optional[dict],
json: Optional[Any],
data: Optional[Any],
timeout: float
) -> HTTPResponse:
"""Execute single request"""
client = await self._get_client()
if HTTP_CLIENT == "httpx":
response = await client.request(
method=method,
url=url,
headers=headers,
params=params,
json=json,
data=data,
timeout=timeout
)
return HTTPResponse(
status_code=response.status_code,
headers=dict(response.headers),
body=response.content
)
elif HTTP_CLIENT == "aiohttp":
async with client.request(
method=method,
url=url,
headers=headers,
params=params,
json=json,
data=data
) as response:
body = await response.read()
return HTTPResponse(
status_code=response.status,
headers=dict(response.headers),
body=body
)
else:
# Fallback to urllib (sync)
import urllib.request
import urllib.parse
if params:
url = f"{url}?{urllib.parse.urlencode(params)}"
req = urllib.request.Request(url, method=method)
if headers:
for k, v in headers.items():
req.add_header(k, v)
body_bytes = None
if json:
import json as json_module
body_bytes = json_module.dumps(json).encode()
req.add_header("Content-Type", "application/json")
elif data:
body_bytes = urllib.parse.urlencode(data).encode()
with urllib.request.urlopen(req, body_bytes, timeout=timeout) as response:
return HTTPResponse(
status_code=response.status,
headers=dict(response.headers),
body=response.read()
)
async def get(self, url: str, **kwargs) -> HTTPResponse:
"""GET request"""
return await self.request("GET", url, **kwargs)
async def post(self, url: str, **kwargs) -> HTTPResponse:
"""POST request"""
return await self.request("POST", url, **kwargs)
async def put(self, url: str, **kwargs) -> HTTPResponse:
"""PUT request"""
return await self.request("PUT", url, **kwargs)
async def delete(self, url: str, **kwargs) -> HTTPResponse:
"""DELETE request"""
return await self.request("DELETE", url, **kwargs)
async def patch(self, url: str, **kwargs) -> HTTPResponse:
"""PATCH request"""
return await self.request("PATCH", url, **kwargs)
async def close(self):
"""Close HTTP client"""
if self._client:
if HTTP_CLIENT == "httpx":
await self._client.aclose()
elif HTTP_CLIENT == "aiohttp":
await self._client.close()
self._client = None
@property
def stats(self) -> dict:
"""Get client statistics"""
return {
"circuit_breakers": {
host: {
"state": cb.state.value,
"failure_count": cb.failure_count,
"success_count": cb.success_count
}
for host, cb in self._circuit_breakers.items()
}
}
# Global HTTP client instance
http_client = AsyncHTTPClient(
timeout=float(os.environ.get('HTTP_TIMEOUT', '30')),
max_retries=int(os.environ.get('HTTP_MAX_RETRIES', '3')),
pool_size=int(os.environ.get('HTTP_POOL_SIZE', '10'))
)
# Convenience functions
async def fetch_json(url: str, **kwargs) -> Any:
"""Fetch JSON from URL"""
response = await http_client.get(url, **kwargs)
return response.json()
async def post_json(url: str, data: Any, **kwargs) -> Any:
"""POST JSON and get JSON response"""
response = await http_client.post(url, json=data, **kwargs)
return response.json()