MnemoCore / tests /test_api_security_limits.py
Granis87's picture
Upload folder using huggingface_hub
c3a3710 verified
"""
API Security Limits Tests
========================
Comprehensive tests for input validation and rate limiting.
"""
import pytest
from fastapi.testclient import TestClient
from unittest.mock import MagicMock, patch, AsyncMock
import sys
import os
# Ensure path is set
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from mnemocore.core.config import reset_config
API_KEY = "test-key"
# Setup mocks before importing app
mock_engine_cls = MagicMock()
mock_engine_instance = MagicMock()
mock_engine_instance.get_stats = AsyncMock(return_value={"status": "ok"})
mock_engine_instance.get_memory = AsyncMock(return_value=None)
mock_engine_instance.delete_memory = AsyncMock(return_value=True)
mock_engine_instance.store = AsyncMock(return_value="mem_id_123")
mock_engine_instance.query = AsyncMock(return_value=[("mem_id_123", 0.9)])
mock_engine_instance.initialize = AsyncMock(return_value=None)
mock_engine_instance.close = AsyncMock(return_value=None)
mock_engine_instance.define_concept = AsyncMock(return_value=None)
mock_engine_instance.reason_by_analogy = AsyncMock(return_value=[("result1", 0.8)])
mock_engine_cls.return_value = mock_engine_instance
# Mock container
mock_container = MagicMock()
mock_container.redis_storage = AsyncMock()
mock_container.redis_storage.check_health = AsyncMock(return_value=True)
mock_container.redis_storage.store_memory = AsyncMock()
mock_container.redis_storage.publish_event = AsyncMock()
mock_container.redis_storage.retrieve_memory = AsyncMock(return_value=None)
mock_container.redis_storage.delete_memory = AsyncMock()
mock_container.redis_storage.close = AsyncMock()
mock_container.qdrant_store = MagicMock()
# Setup pipeline mock
mock_pipeline = MagicMock()
mock_pipeline.__aenter__ = AsyncMock(return_value=mock_pipeline)
mock_pipeline.__aexit__ = AsyncMock(return_value=None)
mock_pipeline.incr = MagicMock()
mock_pipeline.expire = MagicMock()
mock_pipeline.execute = AsyncMock(return_value=[1, True])
mock_redis_client = MagicMock()
mock_redis_client.pipeline.return_value = mock_pipeline
mock_container.redis_storage.redis_client = mock_redis_client
# Patch before import
patcher1 = patch("mnemocore.api.main.HAIMEngine", mock_engine_cls)
patcher2 = patch("mnemocore.api.main.build_container", return_value=mock_container)
patcher1.start()
patcher2.start()
from mnemocore.api.main import app
@pytest.fixture(autouse=True)
def setup_env(monkeypatch):
monkeypatch.setenv("HAIM_API_KEY", API_KEY)
reset_config()
# Mock app state
app.state.engine = mock_engine_instance
app.state.container = mock_container
yield
reset_config()
# ============================================================================
# INPUT VALIDATION TESTS - Store Endpoint
# ============================================================================
def test_store_content_too_large():
"""Verify that content larger than 100,000 chars is rejected."""
with TestClient(app) as client:
large_content = "a" * 100001
response = client.post(
"/store",
json={"content": large_content},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "String should have at most 100000 characters" in response.text
def test_store_content_valid():
"""Verify that content within limit is accepted."""
mock_memory = MagicMock(
id="mem_1", content="a" * 1000, metadata={}, ltp_strength=0.5,
created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00"))
)
mock_engine_instance.get_memory.return_value = mock_memory
mock_engine_instance.store.return_value = "mem_1"
with TestClient(app) as client:
valid_content = "a" * 1000
response = client.post(
"/store",
json={"content": valid_content},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 200
def test_store_content_empty():
"""Verify that empty content is rejected."""
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": ""},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_store_content_whitespace_only():
"""Verify that whitespace-only content is rejected."""
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": " \n\t "},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_store_metadata_too_many_keys():
"""Verify that metadata with too many keys is rejected."""
with TestClient(app) as client:
many_metadata = {f"k{i}": "v" for i in range(51)}
response = client.post(
"/store",
json={"content": "foo", "metadata": many_metadata},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "Too many metadata keys" in response.text
def test_store_metadata_key_too_long():
"""Verify that metadata key longer than 64 chars is rejected."""
with TestClient(app) as client:
long_key = "k" * 65
response = client.post(
"/store",
json={"content": "foo", "metadata": {long_key: "val"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "too long" in response.text
def test_store_metadata_value_too_long():
"""Verify that metadata value longer than 1000 chars is rejected."""
with TestClient(app) as client:
long_value = "v" * 1001
response = client.post(
"/store",
json={"content": "foo", "metadata": {"key": long_value}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "too long" in response.text
def test_store_metadata_invalid_key_characters():
"""Verify that metadata key with invalid characters is rejected."""
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": "foo", "metadata": {"key$invalid": "val"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "invalid characters" in response.text
def test_store_metadata_nested_structure():
"""Verify that nested metadata values are rejected."""
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": "foo", "metadata": {"nested": {"key": "value"}}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "primitive type" in response.text
def test_store_agent_id_too_long():
"""Verify that agent_id longer than 256 chars is rejected."""
with TestClient(app) as client:
long_agent_id = "a" * 257
response = client.post(
"/store",
json={"content": "foo", "agent_id": long_agent_id},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_store_agent_id_invalid_characters():
"""Verify that agent_id with invalid characters is rejected."""
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": "foo", "agent_id": "agent$invalid"},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_store_ttl_out_of_range():
"""Verify that TTL outside valid range is rejected."""
with TestClient(app) as client:
# TTL too small
response = client.post(
"/store",
json={"content": "foo", "ttl": 0},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
# TTL too large (> 1 year)
response = client.post(
"/store",
json={"content": "foo", "ttl": 86400 * 365 + 1},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
# ============================================================================
# INPUT VALIDATION TESTS - Query Endpoint
# ============================================================================
def test_query_too_large():
"""Verify query string limits."""
with TestClient(app) as client:
large_query = "q" * 20000
response = client.post(
"/query",
json={"query": large_query},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "String should have at most 10000 characters" in response.text
def test_query_valid():
"""Verify query within limit is accepted."""
# Setup mock memory with tier attribute
mock_memory = MagicMock(
id="mem_1", content="test result", metadata={}, ltp_strength=0.5,
created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00")),
tier="hot"
)
mock_engine_instance.get_memory.return_value = mock_memory
mock_engine_instance.query.return_value = [("mem_1", 0.9)]
with TestClient(app) as client:
valid_query = "hello world"
response = client.post(
"/query",
json={"query": valid_query},
headers={"X-API-Key": API_KEY}
)
# It might return 200 or 500 depending on engine state, but NOT 422
assert response.status_code != 422
def test_query_empty():
"""Verify that empty query is rejected."""
with TestClient(app) as client:
response = client.post(
"/query",
json={"query": ""},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_query_whitespace_only():
"""Verify that whitespace-only query is rejected."""
with TestClient(app) as client:
response = client.post(
"/query",
json={"query": " \n\t "},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_query_top_k_out_of_range():
"""Verify that top_k outside valid range is rejected."""
with TestClient(app) as client:
# top_k too small
response = client.post(
"/query",
json={"query": "test", "top_k": 0},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
# top_k too large
response = client.post(
"/query",
json={"query": "test", "top_k": 101},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
# ============================================================================
# INPUT VALIDATION TESTS - Concept Endpoint
# ============================================================================
def test_concept_name_too_large():
"""Verify concept name limit."""
with TestClient(app) as client:
large_name = "n" * 10000
response = client.post(
"/concept",
json={"name": large_name, "attributes": {"key": "value"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "String should have at most 256 characters" in response.text
def test_concept_name_empty():
"""Verify that empty concept name is rejected."""
with TestClient(app) as client:
response = client.post(
"/concept",
json={"name": "", "attributes": {"key": "value"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_concept_name_invalid_characters():
"""Verify that concept name with invalid characters is rejected."""
with TestClient(app) as client:
response = client.post(
"/concept",
json={"name": "concept$invalid", "attributes": {"key": "value"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_concept_attributes_too_many():
"""Verify that too many attributes are rejected."""
with TestClient(app) as client:
many_attributes = {f"k{i}": "v" for i in range(51)}
response = client.post(
"/concept",
json={"name": "test", "attributes": many_attributes},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "Too many attributes" in response.text
def test_concept_attributes_empty():
"""Verify that empty attributes are rejected."""
with TestClient(app) as client:
response = client.post(
"/concept",
json={"name": "test", "attributes": {}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_concept_attribute_key_too_long():
"""Verify that attribute key longer than 64 chars is rejected."""
with TestClient(app) as client:
long_key = "k" * 65
response = client.post(
"/concept",
json={"name": "test", "attributes": {long_key: "val"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "too long" in response.text
def test_concept_attribute_key_invalid_characters():
"""Verify that attribute key with invalid characters is rejected."""
with TestClient(app) as client:
response = client.post(
"/concept",
json={"name": "test", "attributes": {"key$invalid": "val"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_concept_attribute_value_too_long():
"""Verify that attribute value longer than 1000 chars is rejected."""
with TestClient(app) as client:
long_value = "v" * 1001
response = client.post(
"/concept",
json={"name": "test", "attributes": {"key": long_value}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
# ============================================================================
# INPUT VALIDATION TESTS - Analogy Endpoint
# ============================================================================
def test_analogy_source_concept_too_large():
"""Verify analogy source concept limit."""
with TestClient(app) as client:
large_str = "a" * 10000
response = client.post(
"/analogy",
json={
"source_concept": large_str,
"source_value": "val",
"target_concept": "target"
},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
assert "String should have at most 256 characters" in response.text
def test_analogy_empty_concept():
"""Verify that empty concept is rejected."""
with TestClient(app) as client:
response = client.post(
"/analogy",
json={
"source_concept": "",
"source_value": "val",
"target_concept": "target"
},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_analogy_empty_value():
"""Verify that empty value is rejected."""
with TestClient(app) as client:
response = client.post(
"/analogy",
json={
"source_concept": "source",
"source_value": "",
"target_concept": "target"
},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
def test_analogy_target_concept_too_large():
"""Verify analogy target concept limit."""
with TestClient(app) as client:
large_str = "a" * 10000
response = client.post(
"/analogy",
json={
"source_concept": "source",
"source_value": "val",
"target_concept": large_str
},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 422
# ============================================================================
# RATE LIMITING TESTS - Store (100/minute)
# ============================================================================
def test_store_rate_limiter_within_limit():
"""Verify store requests within limit succeed."""
# Ensure pipeline execute returns count < limit (100 for store)
mock_pipeline.execute.return_value = [1, True]
mock_memory = MagicMock(
id="mem_1", content="test", metadata={}, ltp_strength=0.5,
created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00"))
)
mock_engine_instance.get_memory.return_value = mock_memory
mock_engine_instance.store.return_value = "mem_1"
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": "test"},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 200
assert response.json()["ok"] is True
def test_store_rate_limiter_exceeded():
"""Verify store rate limit returns 429 with Retry-After header."""
# Simulate return value [count=101, expire_result=True] (Limit is 100 for store)
mock_pipeline.execute.return_value = [101, True]
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": "test"},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 429
assert "Rate limit exceeded" in response.json()["detail"]
assert "Retry-After" in response.headers
def test_store_rate_limiter_retry_after_value():
"""Verify Retry-After header contains valid seconds."""
mock_pipeline.execute.return_value = [101, True]
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": "test"},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 429
retry_after = response.headers.get("Retry-After")
assert retry_after is not None
# Should be a positive integer
assert int(retry_after) > 0
assert int(retry_after) <= 60 # Max window size
# ============================================================================
# RATE LIMITING TESTS - Query (500/minute)
# ============================================================================
def test_query_rate_limiter_within_limit():
"""Verify query requests within limit succeed."""
# Ensure pipeline execute returns count < limit (500 for query)
mock_pipeline.execute.return_value = [100, True]
# Setup mock memory with tier attribute
mock_memory = MagicMock(
id="mem_1", content="test result", metadata={}, ltp_strength=0.5,
created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00")),
tier="hot"
)
mock_engine_instance.get_memory.return_value = mock_memory
mock_engine_instance.query.return_value = [("mem_1", 0.9)]
with TestClient(app) as client:
response = client.post(
"/query",
json={"query": "test"},
headers={"X-API-Key": API_KEY}
)
# Should not be 429
assert response.status_code != 429
def test_query_rate_limiter_exceeded():
"""Verify query rate limit returns 429 with Retry-After header."""
# Simulate return value [count=501, expire_result=True] (Limit is 500 for query)
mock_pipeline.execute.return_value = [501, True]
with TestClient(app) as client:
response = client.post(
"/query",
json={"query": "test"},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 429
assert "Rate limit exceeded" in response.json()["detail"]
assert "Retry-After" in response.headers
# ============================================================================
# RATE LIMITING TESTS - Concept (100/minute)
# ============================================================================
def test_concept_rate_limiter_within_limit():
"""Verify concept requests within limit succeed."""
mock_pipeline.execute.return_value = [50, True]
with TestClient(app) as client:
response = client.post(
"/concept",
json={"name": "test", "attributes": {"key": "value"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code != 429
def test_concept_rate_limiter_exceeded():
"""Verify concept rate limit returns 429."""
mock_pipeline.execute.return_value = [101, True]
with TestClient(app) as client:
response = client.post(
"/concept",
json={"name": "test", "attributes": {"key": "value"}},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 429
# ============================================================================
# RATE LIMITING TESTS - Analogy (100/minute)
# ============================================================================
def test_analogy_rate_limiter_within_limit():
"""Verify analogy requests within limit succeed."""
mock_pipeline.execute.return_value = [50, True]
with TestClient(app) as client:
response = client.post(
"/analogy",
json={
"source_concept": "source",
"source_value": "val",
"target_concept": "target"
},
headers={"X-API-Key": API_KEY}
)
assert response.status_code != 429
def test_analogy_rate_limiter_exceeded():
"""Verify analogy rate limit returns 429."""
mock_pipeline.execute.return_value = [101, True]
with TestClient(app) as client:
response = client.post(
"/analogy",
json={
"source_concept": "source",
"source_value": "val",
"target_concept": "target"
},
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 429
# ============================================================================
# RATE LIMITING - Differentiated Limits Per Category
# ============================================================================
def test_rate_limit_different_categories():
"""Verify that different endpoints have different rate limits."""
with TestClient(app) as client:
# Get rate limit configuration
response = client.get("/rate-limits")
assert response.status_code == 200
limits = response.json()["limits"]
# Store: 100/min
assert limits["store"]["requests"] == 100
assert limits["store"]["window_seconds"] == 60
# Query: 500/min
assert limits["query"]["requests"] == 500
assert limits["query"]["window_seconds"] == 60
# Concept: 100/min
assert limits["concept"]["requests"] == 100
assert limits["concept"]["window_seconds"] == 60
# Analogy: 100/min
assert limits["analogy"]["requests"] == 100
assert limits["analogy"]["window_seconds"] == 60
def test_rate_limit_x_forwarded_for():
"""Verify that X-Forwarded-For header is used for client IP."""
mock_pipeline.execute.return_value = [1, True]
mock_memory = MagicMock(
id="mem_1", content="test", metadata={}, ltp_strength=0.5,
created_at=MagicMock(isoformat=MagicMock(return_value="2024-01-01T00:00:00"))
)
mock_engine_instance.get_memory.return_value = mock_memory
mock_engine_instance.store.return_value = "mem_1"
with TestClient(app) as client:
response = client.post(
"/store",
json={"content": "test"},
headers={
"X-API-Key": API_KEY,
"X-Forwarded-For": "10.0.0.1, 192.168.1.1"
}
)
# Should succeed (rate limit check should pass)
assert response.status_code == 200
# ============================================================================
# EDGE CASES - Memory ID Validation
# ============================================================================
def test_get_memory_invalid_id_empty():
"""Verify that empty memory_id is rejected."""
with TestClient(app) as client:
response = client.get(
"/memory/",
headers={"X-API-Key": API_KEY}
)
# Should return 404 or 405, not 500
assert response.status_code in [404, 405]
def test_get_memory_invalid_id_too_long():
"""Verify that memory_id longer than 256 chars is rejected."""
with TestClient(app) as client:
long_id = "a" * 300
response = client.get(
f"/memory/{long_id}",
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 400
def test_delete_memory_invalid_id_too_long():
"""Verify that memory_id longer than 256 chars is rejected for delete."""
with TestClient(app) as client:
long_id = "a" * 300
response = client.delete(
f"/memory/{long_id}",
headers={"X-API-Key": API_KEY}
)
assert response.status_code == 400