| | """ |
| | Authentication and Security for API Endpoints |
| | """ |
| |
|
| | from fastapi import Security, HTTPException, status, Request |
| | from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials |
| | from config import config |
| |
|
| | security = HTTPBearer(auto_error=False) |
| |
|
| |
|
| | async def verify_token(credentials: HTTPAuthorizationCredentials = Security(security)): |
| | """Verify API token""" |
| | |
| | if not config.API_TOKENS: |
| | return None |
| |
|
| | |
| | if not credentials: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Authentication required" |
| | ) |
| |
|
| | if credentials.credentials not in config.API_TOKENS: |
| | raise HTTPException( |
| | status_code=status.HTTP_401_UNAUTHORIZED, |
| | detail="Invalid authentication token" |
| | ) |
| |
|
| | return credentials.credentials |
| |
|
| |
|
| | async def verify_ip(request: Request): |
| | """Verify IP whitelist""" |
| | if not config.ALLOWED_IPS: |
| | |
| | return True |
| |
|
| | client_ip = request.client.host |
| | if client_ip not in config.ALLOWED_IPS: |
| | raise HTTPException( |
| | status_code=status.HTTP_403_FORBIDDEN, |
| | detail="IP not whitelisted" |
| | ) |
| |
|
| | return True |
| |
|