.
This commit is contained in:
199
src/capa_de_integracion/services/dlp_service.py
Normal file
199
src/capa_de_integracion/services/dlp_service.py
Normal file
@@ -0,0 +1,199 @@
|
||||
"""
|
||||
Copyright 2025 Google. This software is provided as-is, without warranty or
|
||||
representation for any use or purpose. Your use of it is subject to your
|
||||
agreement with Google.
|
||||
|
||||
Data Loss Prevention service for obfuscating sensitive information.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import re
|
||||
from google.cloud import dlp_v2
|
||||
from google.cloud.dlp_v2 import types
|
||||
|
||||
from ..config import Settings
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DLPService:
|
||||
"""
|
||||
Service for detecting and obfuscating sensitive data using Google Cloud DLP.
|
||||
|
||||
Integrates with the DLP API to scan text for PII and other sensitive information,
|
||||
then obfuscates findings based on their info type.
|
||||
"""
|
||||
|
||||
def __init__(self, settings: Settings):
|
||||
"""
|
||||
Initialize DLP service.
|
||||
|
||||
Args:
|
||||
settings: Application settings
|
||||
"""
|
||||
self.settings = settings
|
||||
self.project_id = settings.gcp_project_id
|
||||
self.location = settings.gcp_location
|
||||
self.dlp_client = dlp_v2.DlpServiceAsyncClient()
|
||||
|
||||
logger.info("DLP Service initialized")
|
||||
|
||||
async def get_obfuscated_string(self, text: str, template_id: str) -> str:
|
||||
"""
|
||||
Inspect text for sensitive data and obfuscate findings.
|
||||
|
||||
Args:
|
||||
text: Text to inspect and obfuscate
|
||||
template_id: DLP inspect template ID
|
||||
|
||||
Returns:
|
||||
Obfuscated text with sensitive data replaced
|
||||
|
||||
Raises:
|
||||
Exception: If DLP API call fails (returns original text on error)
|
||||
"""
|
||||
if not text or not text.strip():
|
||||
return text
|
||||
|
||||
try:
|
||||
# Build content item
|
||||
byte_content_item = types.ByteContentItem(
|
||||
type_=types.ByteContentItem.BytesType.TEXT_UTF8,
|
||||
data=text.encode("utf-8"),
|
||||
)
|
||||
content_item = types.ContentItem(byte_item=byte_content_item)
|
||||
|
||||
# Build inspect config
|
||||
finding_limits = types.InspectConfig.FindingLimits(
|
||||
max_findings_per_item=0 # No limit
|
||||
)
|
||||
|
||||
inspect_config = types.InspectConfig(
|
||||
min_likelihood=types.Likelihood.VERY_UNLIKELY,
|
||||
limits=finding_limits,
|
||||
include_quote=True,
|
||||
)
|
||||
|
||||
# Build request
|
||||
inspect_template_name = f"projects/{self.project_id}/locations/{self.location}/inspectTemplates/{template_id}"
|
||||
parent = f"projects/{self.project_id}/locations/{self.location}"
|
||||
|
||||
request = types.InspectContentRequest(
|
||||
parent=parent,
|
||||
inspect_template_name=inspect_template_name,
|
||||
inspect_config=inspect_config,
|
||||
item=content_item,
|
||||
)
|
||||
|
||||
# Call DLP API
|
||||
response = await self.dlp_client.inspect_content(request=request)
|
||||
|
||||
findings_count = len(response.result.findings)
|
||||
logger.info(f"DLP {template_id} Findings: {findings_count}")
|
||||
|
||||
if findings_count > 0:
|
||||
return self._obfuscate_text(response, text)
|
||||
else:
|
||||
return text
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
f"Error during DLP inspection: {e}. Returning original text.",
|
||||
exc_info=True,
|
||||
)
|
||||
return text
|
||||
|
||||
def _obfuscate_text(self, response: types.InspectContentResponse, text: str) -> str:
|
||||
"""
|
||||
Obfuscate sensitive findings in text.
|
||||
|
||||
Args:
|
||||
response: DLP inspect content response with findings
|
||||
text: Original text
|
||||
|
||||
Returns:
|
||||
Text with sensitive data obfuscated
|
||||
"""
|
||||
# Filter findings by likelihood (> POSSIBLE, which is value 3)
|
||||
findings = [
|
||||
finding
|
||||
for finding in response.result.findings
|
||||
if finding.likelihood.value > 3
|
||||
]
|
||||
|
||||
# Sort by likelihood (descending)
|
||||
findings.sort(key=lambda f: f.likelihood.value, reverse=True)
|
||||
|
||||
for finding in findings:
|
||||
quote = finding.quote
|
||||
info_type = finding.info_type.name
|
||||
|
||||
logger.info(
|
||||
f"InfoType: {info_type} | Likelihood: {finding.likelihood.value}"
|
||||
)
|
||||
|
||||
# Obfuscate based on info type
|
||||
replacement = self._get_replacement(info_type, quote)
|
||||
if replacement:
|
||||
text = text.replace(quote, replacement)
|
||||
|
||||
# Clean up consecutive DIRECCION tags
|
||||
text = self._clean_direccion(text)
|
||||
|
||||
return text
|
||||
|
||||
def _get_replacement(self, info_type: str, quote: str) -> str | None:
|
||||
"""
|
||||
Get replacement text for a given info type.
|
||||
|
||||
Args:
|
||||
info_type: DLP info type name
|
||||
quote: Original sensitive text
|
||||
|
||||
Returns:
|
||||
Replacement text or None to skip
|
||||
"""
|
||||
replacements = {
|
||||
"CREDIT_CARD_NUMBER": f"**** **** **** {self._get_last4(quote)}",
|
||||
"CREDIT_CARD_EXPIRATION_DATE": "[FECHA_VENCIMIENTO_TARJETA]",
|
||||
"FECHA_VENCIMIENTO": "[FECHA_VENCIMIENTO_TARJETA]",
|
||||
"CVV_NUMBER": "[CVV]",
|
||||
"CVV": "[CVV]",
|
||||
"EMAIL_ADDRESS": "[CORREO]",
|
||||
"PERSON_NAME": "[NOMBRE]",
|
||||
"PHONE_NUMBER": "[TELEFONO]",
|
||||
"DIRECCION": "[DIRECCION]",
|
||||
"DIR_COLONIA": "[DIRECCION]",
|
||||
"DIR_DEL_MUN": "[DIRECCION]",
|
||||
"DIR_INTERIOR": "[DIRECCION]",
|
||||
"DIR_ESQUINA": "[DIRECCION]",
|
||||
"DIR_CIUDAD_EDO": "[DIRECCION]",
|
||||
"DIR_CP": "[DIRECCION]",
|
||||
"CLABE_INTERBANCARIA": "[CLABE]",
|
||||
"CLAVE_RASTREO_SPEI": "[CLAVE_RASTREO]",
|
||||
"NIP": "[NIP]",
|
||||
"SALDO": "[SALDO]",
|
||||
"CUENTA": f"**************{self._get_last4(quote)}",
|
||||
"NUM_ACLARACION": "[NUM_ACLARACION]",
|
||||
}
|
||||
|
||||
return replacements.get(info_type)
|
||||
|
||||
def _get_last4(self, quote: str) -> str:
|
||||
"""Extract last 4 characters from quote (removing spaces)."""
|
||||
clean_quote = quote.strip().replace(" ", "")
|
||||
if len(clean_quote) >= 4:
|
||||
return clean_quote[-4:]
|
||||
return clean_quote
|
||||
|
||||
def _clean_direccion(self, text: str) -> str:
|
||||
"""Clean up consecutive [DIRECCION] tags."""
|
||||
# Replace multiple [DIRECCION] tags separated by commas or spaces with single tag
|
||||
pattern = r"\[DIRECCION\](?:(?:,\s*|\s+)\[DIRECCION\])*"
|
||||
return re.sub(pattern, "[DIRECCION]", text).strip()
|
||||
|
||||
async def close(self):
|
||||
"""Close DLP client."""
|
||||
await self.dlp_client.transport.close()
|
||||
logger.info("DLP client closed")
|
||||
Reference in New Issue
Block a user