217 lines
7.0 KiB
Python
217 lines
7.0 KiB
Python
"""Tests for DLPService."""
|
|
|
|
from unittest.mock import AsyncMock, Mock, patch
|
|
|
|
import pytest
|
|
from google.cloud.dlp_v2 import types
|
|
|
|
from capa_de_integracion.config import Settings
|
|
from capa_de_integracion.services import DLPService
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_settings():
|
|
"""Create mock settings for testing."""
|
|
settings = Mock(spec=Settings)
|
|
settings.gcp_project_id = "test-project"
|
|
settings.gcp_location = "us-central1"
|
|
return settings
|
|
|
|
|
|
@pytest.fixture
|
|
def service(mock_settings):
|
|
"""Create DLPService instance with mocked client."""
|
|
with patch("capa_de_integracion.services.dlp.dlp_v2.DlpServiceAsyncClient"):
|
|
return DLPService(mock_settings)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_obfuscated_string_no_findings(service):
|
|
"""Test get_obfuscated_string with no findings."""
|
|
mock_response = Mock()
|
|
mock_response.result.findings = []
|
|
|
|
service.dlp_client.inspect_content = AsyncMock(return_value=mock_response)
|
|
|
|
text = "This is a safe text"
|
|
result = await service.get_obfuscated_string(text, "test-template")
|
|
|
|
assert result == text
|
|
service.dlp_client.inspect_content.assert_called_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_obfuscated_string_with_credit_card(service):
|
|
"""Test get_obfuscated_string obfuscates credit card."""
|
|
# Create mock finding
|
|
finding = Mock()
|
|
finding.quote = "4532123456789012"
|
|
finding.info_type.name = "CREDIT_CARD_NUMBER"
|
|
finding.likelihood.value = 4 # LIKELY (above threshold)
|
|
|
|
mock_response = Mock()
|
|
mock_response.result.findings = [finding]
|
|
|
|
service.dlp_client.inspect_content = AsyncMock(return_value=mock_response)
|
|
|
|
text = "My card number is 4532123456789012"
|
|
result = await service.get_obfuscated_string(text, "test-template")
|
|
|
|
assert "**** **** **** 9012" in result
|
|
assert "4532123456789012" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_obfuscated_string_with_email(service):
|
|
"""Test get_obfuscated_string obfuscates email."""
|
|
finding = Mock()
|
|
finding.quote = "user@example.com"
|
|
finding.info_type.name = "EMAIL_ADDRESS"
|
|
finding.likelihood.value = 5 # VERY_LIKELY
|
|
|
|
mock_response = Mock()
|
|
mock_response.result.findings = [finding]
|
|
|
|
service.dlp_client.inspect_content = AsyncMock(return_value=mock_response)
|
|
|
|
text = "Contact me at user@example.com"
|
|
result = await service.get_obfuscated_string(text, "test-template")
|
|
|
|
assert "[CORREO]" in result
|
|
assert "user@example.com" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_obfuscated_string_with_phone(service):
|
|
"""Test get_obfuscated_string obfuscates phone number."""
|
|
finding = Mock()
|
|
finding.quote = "555-1234"
|
|
finding.info_type.name = "PHONE_NUMBER"
|
|
finding.likelihood.value = 4
|
|
|
|
mock_response = Mock()
|
|
mock_response.result.findings = [finding]
|
|
|
|
service.dlp_client.inspect_content = AsyncMock(return_value=mock_response)
|
|
|
|
text = "Call me at 555-1234"
|
|
result = await service.get_obfuscated_string(text, "test-template")
|
|
|
|
assert "[TELEFONO]" in result
|
|
assert "555-1234" not in result
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_obfuscated_string_filters_low_likelihood(service):
|
|
"""Test that findings below likelihood threshold are ignored."""
|
|
finding = Mock()
|
|
finding.quote = "maybe@test.com"
|
|
finding.info_type.name = "EMAIL_ADDRESS"
|
|
finding.likelihood.value = 2 # UNLIKELY (below threshold of 3)
|
|
|
|
mock_response = Mock()
|
|
mock_response.result.findings = [finding]
|
|
|
|
service.dlp_client.inspect_content = AsyncMock(return_value=mock_response)
|
|
|
|
text = "Email: maybe@test.com"
|
|
result = await service.get_obfuscated_string(text, "test-template")
|
|
|
|
# Should not be obfuscated due to low likelihood
|
|
assert result == text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_obfuscated_string_handles_direccion(service):
|
|
"""Test get_obfuscated_string handles multiple DIRECCION tags."""
|
|
findings = []
|
|
for info_type in ["DIRECCION", "DIR_COLONIA", "DIR_CP"]:
|
|
finding = Mock()
|
|
finding.quote = f"test_{info_type}"
|
|
finding.info_type.name = info_type
|
|
finding.likelihood.value = 4
|
|
findings.append(finding)
|
|
|
|
mock_response = Mock()
|
|
mock_response.result.findings = findings
|
|
|
|
service.dlp_client.inspect_content = AsyncMock(return_value=mock_response)
|
|
|
|
text = "Address: test_DIRECCION, test_DIR_COLONIA, test_DIR_CP"
|
|
result = await service.get_obfuscated_string(text, "test-template")
|
|
|
|
# Multiple [DIRECCION] tags should be cleaned up
|
|
assert result == "Address: [DIRECCION]"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_get_obfuscated_string_error_returns_original(service):
|
|
"""Test that errors return original text."""
|
|
service.dlp_client.inspect_content = AsyncMock(
|
|
side_effect=Exception("DLP API error"),
|
|
)
|
|
|
|
text = "Original text"
|
|
result = await service.get_obfuscated_string(text, "test-template")
|
|
|
|
assert result == text
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_close(service):
|
|
"""Test close method."""
|
|
service.dlp_client.transport.close = AsyncMock()
|
|
|
|
await service.close()
|
|
|
|
service.dlp_client.transport.close.assert_called_once()
|
|
|
|
|
|
def test_get_last4_normal(service):
|
|
"""Test _get_last4 with normal input."""
|
|
assert service._get_last4("1234567890") == "7890"
|
|
assert service._get_last4("1234 5678 9012 3456") == "3456"
|
|
|
|
|
|
def test_get_last4_short(service):
|
|
"""Test _get_last4 with short input."""
|
|
assert service._get_last4("123") == "123"
|
|
assert service._get_last4("12") == "12"
|
|
|
|
|
|
def test_get_last4_empty(service):
|
|
"""Test _get_last4 with empty input."""
|
|
assert service._get_last4("") == ""
|
|
assert service._get_last4(" ") == ""
|
|
|
|
|
|
def test_clean_direccion(service):
|
|
"""Test _clean_direccion method."""
|
|
assert service._clean_direccion("[DIRECCION], [DIRECCION]") == "[DIRECCION]"
|
|
assert service._clean_direccion("[DIRECCION] [DIRECCION]") == "[DIRECCION]"
|
|
assert service._clean_direccion("[DIRECCION], [DIRECCION], [DIRECCION]") == "[DIRECCION]"
|
|
assert service._clean_direccion("Text [DIRECCION] more text") == "Text [DIRECCION] more text"
|
|
|
|
|
|
def test_get_replacement(service):
|
|
"""Test _get_replacement method."""
|
|
assert service._get_replacement("EMAIL_ADDRESS", "test@example.com") == "[CORREO]"
|
|
assert service._get_replacement("PERSON_NAME", "John Doe") == "[NOMBRE]"
|
|
assert service._get_replacement("CVV_NUMBER", "123") == "[CVV]"
|
|
assert service._get_replacement("NIP", "1234") == "[NIP]"
|
|
assert service._get_replacement("SALDO", "1000.00") == "[SALDO]"
|
|
assert service._get_replacement("CLABE_INTERBANCARIA", "012345678901234567") == "[CLABE]"
|
|
assert service._get_replacement("UNKNOWN_TYPE", "value") is None
|
|
|
|
|
|
def test_get_replacement_credit_card(service):
|
|
"""Test _get_replacement for credit card."""
|
|
result = service._get_replacement("CREDIT_CARD_NUMBER", "4532 1234 5678 9012")
|
|
assert result == "**** **** **** 9012"
|
|
|
|
|
|
def test_get_replacement_cuenta(service):
|
|
"""Test _get_replacement for account number."""
|
|
result = service._get_replacement("CUENTA", "12345678901234")
|
|
assert result == "**************1234"
|