Spaces:
Sleeping
Sleeping
| """ | |
| Performance and Latency Tests | |
| ============================== | |
| Tests for p95 latency thresholds, concurrent request handling, and SLA compliance. | |
| """ | |
| import pytest | |
| import time | |
| import statistics | |
| import concurrent.futures | |
| from typing import List | |
| # Performance thresholds (in seconds) | |
| THRESHOLDS = { | |
| "comparisons_p95": 0.5, # 500ms for comparison reads | |
| "dashboard_p95": 0.3, # 300ms for dashboard | |
| "export_p95": 2.0, # 2000ms for exports | |
| "search_p95": 1.0, # 1000ms for hotel search | |
| "auth_p95": 0.2, # 200ms for auth operations | |
| } | |
| # Number of iterations for latency tests | |
| LATENCY_ITERATIONS = 20 | |
| def calculate_p95(latencies: List[float]) -> float: | |
| """Calculate p95 latency from a list of latencies""" | |
| if not latencies: | |
| return 0.0 | |
| sorted_latencies = sorted(latencies) | |
| index = int(len(sorted_latencies) * 0.95) | |
| return sorted_latencies[min(index, len(sorted_latencies) - 1)] | |
| class TestLatencyThresholds: | |
| """Tests for API endpoint latency SLAs""" | |
| def test_comparisons_endpoint_p95_latency(self, client, auth_headers): | |
| """Test that comparisons endpoint meets p95 latency SLA""" | |
| latencies = [] | |
| for _ in range(LATENCY_ITERATIONS): | |
| start = time.perf_counter() | |
| response = client.get("/api/comparisons", headers=auth_headers) | |
| end = time.perf_counter() | |
| if response.status_code in [200, 404]: | |
| latencies.append(end - start) | |
| if latencies: | |
| p95 = calculate_p95(latencies) | |
| avg = statistics.mean(latencies) | |
| print(f"Comparisons - p95: {p95*1000:.1f}ms, avg: {avg*1000:.1f}ms") | |
| assert p95 < THRESHOLDS["comparisons_p95"], f"p95 latency {p95:.3f}s exceeds threshold" | |
| def test_dashboard_endpoint_p95_latency(self, client, auth_headers): | |
| """Test that dashboard endpoint meets p95 latency SLA""" | |
| latencies = [] | |
| for _ in range(LATENCY_ITERATIONS): | |
| start = time.perf_counter() | |
| response = client.get("/api/dashboard", headers=auth_headers) | |
| end = time.perf_counter() | |
| if response.status_code in [200, 403, 404]: | |
| latencies.append(end - start) | |
| if latencies: | |
| p95 = calculate_p95(latencies) | |
| print(f"Dashboard - p95: {p95*1000:.1f}ms") | |
| assert p95 < THRESHOLDS["dashboard_p95"], f"p95 latency {p95:.3f}s exceeds threshold" | |
| def test_export_endpoint_p95_latency(self, client, auth_headers): | |
| """Test that export endpoint meets p95 latency SLA""" | |
| latencies = [] | |
| export_data = { | |
| "format": "excel", | |
| "data": [{"hotel": f"Hotel {i}", "price": 100 + i} for i in range(10)] | |
| } | |
| for _ in range(10): # Fewer iterations for expensive operation | |
| start = time.perf_counter() | |
| response = client.post("/api/export", json=export_data, headers=auth_headers) | |
| end = time.perf_counter() | |
| if response.status_code in [200, 400, 422]: | |
| latencies.append(end - start) | |
| if latencies: | |
| p95 = calculate_p95(latencies) | |
| print(f"Export - p95: {p95*1000:.1f}ms") | |
| assert p95 < THRESHOLDS["export_p95"], f"p95 latency {p95:.3f}s exceeds threshold" | |
| def test_auth_endpoint_p95_latency(self, client): | |
| """Test that auth endpoints meet p95 latency SLA""" | |
| latencies = [] | |
| for _ in range(LATENCY_ITERATIONS): | |
| start = time.perf_counter() | |
| response = client.get("/api/health") # Lightweight auth check | |
| end = time.perf_counter() | |
| if response.status_code == 200: | |
| latencies.append(end - start) | |
| if latencies: | |
| p95 = calculate_p95(latencies) | |
| print(f"Health/Auth - p95: {p95*1000:.1f}ms") | |
| assert p95 < THRESHOLDS["auth_p95"], f"p95 latency {p95:.3f}s exceeds threshold" | |
| class TestConcurrentRequests: | |
| """Tests for concurrent request handling""" | |
| def test_concurrent_reads(self, client, auth_headers): | |
| """Test that API handles concurrent read requests""" | |
| num_concurrent = 10 | |
| def make_request(): | |
| start = time.perf_counter() | |
| response = client.get("/api/dashboard", headers=auth_headers) | |
| end = time.perf_counter() | |
| return response.status_code, end - start | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent) as executor: | |
| futures = [executor.submit(make_request) for _ in range(num_concurrent)] | |
| results = [f.result() for f in futures] | |
| # All requests should succeed or return consistent status | |
| status_codes = [r[0] for r in results] | |
| latencies = [r[1] for r in results] | |
| # Should not have server errors | |
| assert 500 not in status_codes | |
| # Average latency should still be reasonable | |
| avg_latency = statistics.mean(latencies) | |
| print(f"Concurrent reads - avg latency: {avg_latency*1000:.1f}ms") | |
| assert avg_latency < 2.0, "Concurrent request latency too high" | |
| def test_concurrent_writes(self, client, auth_headers): | |
| """Test that API handles concurrent write requests""" | |
| num_concurrent = 5 # Fewer concurrent writes | |
| def make_request(i): | |
| start = time.perf_counter() | |
| response = client.post("/api/export", json={ | |
| "format": "excel", | |
| "data": [{"hotel": f"Test {i}", "price": 100}] | |
| }, headers=auth_headers) | |
| end = time.perf_counter() | |
| return response.status_code, end - start | |
| with concurrent.futures.ThreadPoolExecutor(max_workers=num_concurrent) as executor: | |
| futures = [executor.submit(make_request, i) for i in range(num_concurrent)] | |
| results = [f.result() for f in futures] | |
| status_codes = [r[0] for r in results] | |
| # Should not crash | |
| assert 500 not in status_codes | |
| def test_connection_pool_under_load(self, client, auth_headers): | |
| """Test that connection pooling handles load""" | |
| num_requests = 50 | |
| successful = 0 | |
| for _ in range(num_requests): | |
| response = client.get("/api/health") | |
| if response.status_code == 200: | |
| successful += 1 | |
| success_rate = successful / num_requests | |
| print(f"Connection pool - success rate: {success_rate*100:.1f}%") | |
| assert success_rate >= 0.95, "Success rate too low under load" | |
| class TestScraperPerformance: | |
| """Tests for scraper job performance""" | |
| def test_scrape_request_timeout(self, client, auth_headers): | |
| """Test that scrape requests have appropriate timeouts""" | |
| start = time.perf_counter() | |
| response = client.post("/api/scrape", json={ | |
| "url": "https://example.com/slow-page" | |
| }, headers=auth_headers, timeout=30) | |
| end = time.perf_counter() | |
| elapsed = end - start | |
| # Should not take longer than timeout | |
| assert elapsed < 30, "Scrape request exceeded timeout" | |
| # Should return quickly with validation error or queued status | |
| if response.status_code != 404: | |
| assert elapsed < 5, "Scrape validation should be fast" | |
| def test_scraper_retry_backoff(self, client, auth_headers): | |
| """Test that scraper implements proper retry backoff""" | |
| # This is more of an integration test | |
| # Verifies that retries don't overwhelm the system | |
| response = client.post("/api/scrape", json={ | |
| "url": "https://httpstat.us/503" # Will return 503 | |
| }, headers=auth_headers) | |
| # Should handle gracefully | |
| assert response.status_code != 500 | |
| class TestDatabasePerformance: | |
| """Tests for database query performance""" | |
| def test_user_lookup_performance(self, client, auth_headers): | |
| """Test that user lookup is fast""" | |
| latencies = [] | |
| for _ in range(10): | |
| start = time.perf_counter() | |
| response = client.get("/api/auth/me", headers=auth_headers) | |
| end = time.perf_counter() | |
| if response.status_code == 200: | |
| latencies.append(end - start) | |
| if latencies: | |
| avg = statistics.mean(latencies) | |
| print(f"User lookup - avg: {avg*1000:.1f}ms") | |
| assert avg < 0.1, "User lookup too slow" | |
| def test_hotel_list_performance(self, client, auth_headers): | |
| """Test that hotel listing is performant""" | |
| latencies = [] | |
| for _ in range(10): | |
| start = time.perf_counter() | |
| response = client.get("/api/hotels", headers=auth_headers) | |
| end = time.perf_counter() | |
| if response.status_code in [200, 404]: | |
| latencies.append(end - start) | |
| if latencies: | |
| avg = statistics.mean(latencies) | |
| print(f"Hotel list - avg: {avg*1000:.1f}ms") | |
| assert avg < 0.5, "Hotel listing too slow" | |
| class TestResourceUsage: | |
| """Tests for resource usage patterns""" | |
| def test_memory_stable_under_load(self, client, auth_headers): | |
| """Test that memory usage is stable under repeated requests""" | |
| import gc | |
| # Force garbage collection before test | |
| gc.collect() | |
| # Make many requests | |
| for i in range(100): | |
| client.get("/api/health") | |
| if i % 20 == 0: | |
| gc.collect() | |
| # Should complete without memory issues | |
| # (Python's GC should handle this) | |
| def test_no_connection_leaks(self, client, auth_headers): | |
| """Test that connections are properly closed""" | |
| # Make requests in a loop | |
| for _ in range(50): | |
| response = client.get("/api/dashboard", headers=auth_headers) | |
| # Connection should be reused or properly closed | |
| # Final request should still work | |
| response = client.get("/api/health") | |
| assert response.status_code == 200 | |
| # Custom pytest markers | |
| def pytest_configure(config): | |
| config.addinivalue_line( | |
| "markers", "performance: mark test as performance test" | |
| ) | |
| config.addinivalue_line( | |
| "markers", "slow: mark test as slow running" | |
| ) | |