""" Security Tests for InnSight-AI API =================================== Tests for RBAC, tenant isolation, rate limiting, input validation, and PII safety. """ import pytest import time import os class TestRBAC: """Role-Based Access Control tests""" def test_admin_endpoint_requires_admin_role(self, client, auth_headers): """Test that admin endpoints reject non-admin users""" # Regular user trying to access admin endpoint response = client.get("/api/admin/users", headers=auth_headers) # Should be forbidden for non-admin assert response.status_code in [401, 403] def test_admin_analytics_requires_admin(self, client, auth_headers): """Test that analytics endpoints require admin role""" response = client.get("/api/admin/analytics", headers=auth_headers) assert response.status_code in [401, 403, 404] # 404 if endpoint doesn't exist def test_admin_can_list_users(self, client, admin_auth_headers): """Test that admin can list users""" response = client.get("/api/admin/users", headers=admin_auth_headers) # Admin should have access (or 404 if not implemented) assert response.status_code in [200, 404] def test_regular_user_cannot_create_admin(self, client, auth_headers): """Test that regular users cannot create admin accounts""" response = client.post("/api/admin/users", json={ "email": "newadmin@test.com", "full_name": "New Admin", "is_admin": True }, headers=auth_headers) assert response.status_code in [401, 403] def test_unauthenticated_cannot_access_protected_routes(self, client): """Test that unauthenticated requests are rejected""" protected_routes = [ ("GET", "/api/auth/me"), ("GET", "/api/hotels"), ("POST", "/api/export"), ("GET", "/api/comparisons"), ] for method, route in protected_routes: if method == "GET": response = client.get(route) else: response = client.post(route, json={}) # Should require authentication assert response.status_code in [401, 403, 405], f"Route {route} should require auth" class TestTenantIsolation: """Tenant isolation tests - users can only see their own data""" def test_user_cannot_see_other_users_data(self, client, auth_headers): """Test that users cannot access data from other users""" # Try to access another user's hotels response = client.get("/api/hotels?user_id=999", headers=auth_headers) # Should not return other user's data if response.status_code == 200: data = response.json() # Data should be filtered to current user # (exact validation depends on implementation) def test_user_cannot_export_other_users_comparisons(self, client, auth_headers): """Test that users cannot export other users' comparison data""" response = client.post("/api/export", json={ "format": "excel", "comparison_id": 999, # Non-existent or other user's comparison "user_id": 999 # Attempt to specify other user }, headers=auth_headers) # Should not allow export of other user's data assert response.status_code in [400, 403, 404, 422] class TestRateLimiting: """Rate limiting tests for sensitive endpoints""" @pytest.mark.slow def test_scrape_endpoint_is_rate_limited(self, client, auth_headers): """Test that scraping endpoint has rate limits""" # Make multiple rapid requests responses = [] for _ in range(20): response = client.post("/api/scrape", json={ "url": "https://example.com/hotel" }, headers=auth_headers) responses.append(response.status_code) time.sleep(0.05) # Small delay # At some point should hit rate limit (429) # Or endpoint might not exist (404) or require different params (422) # This test documents behavior @pytest.mark.slow def test_export_endpoint_is_rate_limited(self, client, auth_headers): """Test that export endpoint has rate limits""" responses = [] for _ in range(15): response = client.post("/api/export", json={ "format": "excel", "data": [{"hotel": "test", "price": 100}] }, headers=auth_headers) responses.append(response.status_code) time.sleep(0.1) # Should eventually hit rate limit or succeed # Rate limits may be disabled in test mode def test_login_has_brute_force_protection(self, client): """Test that login endpoint has brute force protection""" # Make multiple failed login attempts for i in range(10): response = client.post("/api/auth/login", json={ "email": "test@example.com", "password": f"wrongpassword{i}" }) # Should either rate limit or continue to reject # Implementation may vary class TestInputValidation: """Input validation and injection prevention tests""" def test_sql_injection_in_email(self, client): """Test that SQL injection in email is handled safely""" malicious_emails = [ "test@example.com'; DROP TABLE users; --", "' OR '1'='1", "admin'--@example.com", ] for email in malicious_emails: response = client.post("/api/auth/login", json={ "email": email, "password": "testpassword" }) # Should not crash and should reject invalid input assert response.status_code != 500 def test_xss_in_user_name(self, client): """Test that XSS in user name is sanitized""" import uuid unique_email = f"xss_test_{uuid.uuid4().hex[:8]}@test.com" response = client.post("/api/auth/register", json={ "email": unique_email, "password": "SecurePass123!", "full_name": "" }) if response.status_code == 200: user = response.json().get("user", {}) # Name should not contain raw script tags assert "