Spaces:
Sleeping
Sleeping
| """ | |
| Security Tests for InnSight-AI API | |
| =================================== | |
| Tests for RBAC, tenant isolation, rate limiting, input validation, and PII safety. | |
| """ | |
| import pytest | |
| import time | |
| import os | |
| class TestRBAC: | |
| """Role-Based Access Control tests""" | |
| def test_admin_endpoint_requires_admin_role(self, client, auth_headers): | |
| """Test that admin endpoints reject non-admin users""" | |
| # Regular user trying to access admin endpoint | |
| response = client.get("/api/admin/users", headers=auth_headers) | |
| # Should be forbidden for non-admin | |
| assert response.status_code in [401, 403] | |
| def test_admin_analytics_requires_admin(self, client, auth_headers): | |
| """Test that analytics endpoints require admin role""" | |
| response = client.get("/api/admin/analytics", headers=auth_headers) | |
| assert response.status_code in [401, 403, 404] # 404 if endpoint doesn't exist | |
| def test_admin_can_list_users(self, client, admin_auth_headers): | |
| """Test that admin can list users""" | |
| response = client.get("/api/admin/users", headers=admin_auth_headers) | |
| # Admin should have access (or 404 if not implemented) | |
| assert response.status_code in [200, 404] | |
| def test_regular_user_cannot_create_admin(self, client, auth_headers): | |
| """Test that regular users cannot create admin accounts""" | |
| response = client.post("/api/admin/users", json={ | |
| "email": "[email protected]", | |
| "full_name": "New Admin", | |
| "is_admin": True | |
| }, headers=auth_headers) | |
| assert response.status_code in [401, 403] | |
| def test_unauthenticated_cannot_access_protected_routes(self, client): | |
| """Test that unauthenticated requests are rejected""" | |
| protected_routes = [ | |
| ("GET", "/api/auth/me"), | |
| ("GET", "/api/hotels"), | |
| ("POST", "/api/export"), | |
| ("GET", "/api/comparisons"), | |
| ] | |
| for method, route in protected_routes: | |
| if method == "GET": | |
| response = client.get(route) | |
| else: | |
| response = client.post(route, json={}) | |
| # Should require authentication | |
| assert response.status_code in [401, 403, 405], f"Route {route} should require auth" | |
| class TestTenantIsolation: | |
| """Tenant isolation tests - users can only see their own data""" | |
| def test_user_cannot_see_other_users_data(self, client, auth_headers): | |
| """Test that users cannot access data from other users""" | |
| # Try to access another user's hotels | |
| response = client.get("/api/hotels?user_id=999", headers=auth_headers) | |
| # Should not return other user's data | |
| if response.status_code == 200: | |
| data = response.json() | |
| # Data should be filtered to current user | |
| # (exact validation depends on implementation) | |
| def test_user_cannot_export_other_users_comparisons(self, client, auth_headers): | |
| """Test that users cannot export other users' comparison data""" | |
| response = client.post("/api/export", json={ | |
| "format": "excel", | |
| "comparison_id": 999, # Non-existent or other user's comparison | |
| "user_id": 999 # Attempt to specify other user | |
| }, headers=auth_headers) | |
| # Should not allow export of other user's data | |
| assert response.status_code in [400, 403, 404, 422] | |
| class TestRateLimiting: | |
| """Rate limiting tests for sensitive endpoints""" | |
| def test_scrape_endpoint_is_rate_limited(self, client, auth_headers): | |
| """Test that scraping endpoint has rate limits""" | |
| # Make multiple rapid requests | |
| responses = [] | |
| for _ in range(20): | |
| response = client.post("/api/scrape", json={ | |
| "url": "https://example.com/hotel" | |
| }, headers=auth_headers) | |
| responses.append(response.status_code) | |
| time.sleep(0.05) # Small delay | |
| # At some point should hit rate limit (429) | |
| # Or endpoint might not exist (404) or require different params (422) | |
| # This test documents behavior | |
| def test_export_endpoint_is_rate_limited(self, client, auth_headers): | |
| """Test that export endpoint has rate limits""" | |
| responses = [] | |
| for _ in range(15): | |
| response = client.post("/api/export", json={ | |
| "format": "excel", | |
| "data": [{"hotel": "test", "price": 100}] | |
| }, headers=auth_headers) | |
| responses.append(response.status_code) | |
| time.sleep(0.1) | |
| # Should eventually hit rate limit or succeed | |
| # Rate limits may be disabled in test mode | |
| def test_login_has_brute_force_protection(self, client): | |
| """Test that login endpoint has brute force protection""" | |
| # Make multiple failed login attempts | |
| for i in range(10): | |
| response = client.post("/api/auth/login", json={ | |
| "email": "[email protected]", | |
| "password": f"wrongpassword{i}" | |
| }) | |
| # Should either rate limit or continue to reject | |
| # Implementation may vary | |
| class TestInputValidation: | |
| """Input validation and injection prevention tests""" | |
| def test_sql_injection_in_email(self, client): | |
| """Test that SQL injection in email is handled safely""" | |
| malicious_emails = [ | |
| "[email protected]'; DROP TABLE users; --", | |
| "' OR '1'='1", | |
| "admin'[email protected]", | |
| ] | |
| for email in malicious_emails: | |
| response = client.post("/api/auth/login", json={ | |
| "email": email, | |
| "password": "testpassword" | |
| }) | |
| # Should not crash and should reject invalid input | |
| assert response.status_code != 500 | |
| def test_xss_in_user_name(self, client): | |
| """Test that XSS in user name is sanitized""" | |
| import uuid | |
| unique_email = f"xss_test_{uuid.uuid4().hex[:8]}@test.com" | |
| response = client.post("/api/auth/register", json={ | |
| "email": unique_email, | |
| "password": "SecurePass123!", | |
| "full_name": "<script>alert('xss')</script>" | |
| }) | |
| if response.status_code == 200: | |
| user = response.json().get("user", {}) | |
| # Name should not contain raw script tags | |
| assert "<script>" not in user.get("full_name", "") | |
| def test_path_traversal_in_export_download(self, client, auth_headers): | |
| """Test that path traversal in file downloads is blocked""" | |
| malicious_paths = [ | |
| "../../../etc/passwd", | |
| "..\\..\\..\\windows\\system32\\config\\sam", | |
| "/etc/passwd", | |
| "....//....//....//etc/passwd", | |
| ] | |
| for path in malicious_paths: | |
| response = client.get(f"/api/export/download/{path}", headers=auth_headers) | |
| assert response.status_code in [400, 404, 422] | |
| def test_oversized_payload_rejected(self, client, auth_headers): | |
| """Test that extremely large payloads are rejected""" | |
| # Create a 10MB payload | |
| large_data = [{"hotel": f"Hotel {i}", "price": 100} for i in range(100000)] | |
| response = client.post("/api/export", json={ | |
| "format": "excel", | |
| "data": large_data | |
| }, headers=auth_headers) | |
| # Should reject or handle gracefully | |
| assert response.status_code in [200, 400, 413, 422] | |
| def test_special_characters_in_hotel_name(self, client, auth_headers): | |
| """Test that special characters are handled safely""" | |
| response = client.post("/api/hotels", json={ | |
| "name": "מלון ישראלי 🏨 with \"quotes\" and <brackets>", | |
| "booking_url": "https://example.com", | |
| }, headers=auth_headers) | |
| # Should not crash | |
| assert response.status_code != 500 | |
| class TestPIISafety: | |
| """Tests for PII (Personally Identifiable Information) handling""" | |
| def test_password_not_returned_in_user_response(self, client, auth_headers): | |
| """Test that password hash is never returned in API responses""" | |
| response = client.get("/api/auth/me", headers=auth_headers) | |
| if response.status_code == 200: | |
| user = response.json() | |
| assert "password" not in user | |
| assert "password_hash" not in user | |
| def test_totp_secret_not_exposed(self, client, auth_headers): | |
| """Test that TOTP secrets are never returned in API responses""" | |
| response = client.get("/api/auth/me", headers=auth_headers) | |
| if response.status_code == 200: | |
| user = response.json() | |
| assert "totp_secret" not in user | |
| def test_admin_user_list_excludes_sensitive_data(self, client, admin_auth_headers): | |
| """Test that admin user list doesn't include passwords or secrets""" | |
| response = client.get("/api/admin/users", headers=admin_auth_headers) | |
| if response.status_code == 200: | |
| data = response.json() | |
| users = data.get("users", data if isinstance(data, list) else []) | |
| for user in users: | |
| assert "password" not in user | |
| assert "password_hash" not in user | |
| assert "totp_secret" not in user | |
| def test_export_does_not_include_pii(self, client, auth_headers): | |
| """Test that exports don't include other users' PII""" | |
| response = client.post("/api/export", json={ | |
| "format": "excel", | |
| "data": [{"hotel": "Test", "price": 100}] | |
| }, headers=auth_headers) | |
| if response.status_code == 200: | |
| # Check response content doesn't include PII | |
| content = response.content.decode('utf-8', errors='ignore') | |
| assert "password" not in content.lower() | |
| assert "totp" not in content.lower() | |
| # Additional fixtures for admin auth | |
| def admin_auth_headers(client): | |
| """Get authentication headers for admin user""" | |
| # For testing, we need to mock admin login | |
| # In real tests, this would use actual admin credentials with 2FA bypass | |
| # Try to login as admin (this will fail without proper 2FA) | |
| # Return mock headers for testing purposes | |
| return {"Authorization": "Bearer mock_admin_token_for_testing"} | |