"""Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose. Your use of it is subject to your agreement with Google. Data Loss Prevention service for obfuscating sensitive information. """ import logging import re from google.cloud import dlp_v2 from google.cloud.dlp_v2 import types from capa_de_integracion.config import Settings logger = logging.getLogger(__name__) class DLPService: """Service for detecting and obfuscating sensitive data using Google Cloud DLP. Integrates with the DLP API to scan text for PII and other sensitive information, then obfuscates findings based on their info type. """ def __init__(self, settings: Settings) -> None: """Initialize DLP service. Args: settings: Application settings """ self.settings = settings self.project_id = settings.gcp_project_id self.location = settings.gcp_location self.dlp_client = dlp_v2.DlpServiceAsyncClient() logger.info("DLP Service initialized") async def get_obfuscated_string(self, text: str, template_id: str) -> str: """Inspect text for sensitive data and obfuscate findings. Args: text: Text to inspect and obfuscate template_id: DLP inspect template ID Returns: Obfuscated text with sensitive data replaced Raises: Exception: If DLP API call fails (returns original text on error) """ try: # Build content item byte_content_item = types.ByteContentItem( type_=types.ByteContentItem.BytesType.TEXT_UTF8, data=text.encode("utf-8"), ) content_item = types.ContentItem(byte_item=byte_content_item) # Build inspect config finding_limits = types.InspectConfig.FindingLimits( max_findings_per_item=0, # No limit ) inspect_config = types.InspectConfig( min_likelihood=types.Likelihood.VERY_UNLIKELY, limits=finding_limits, include_quote=True, ) # Build request inspect_template_name = f"projects/{self.project_id}/locations/{self.location}/inspectTemplates/{template_id}" parent = f"projects/{self.project_id}/locations/{self.location}" request = types.InspectContentRequest( parent=parent, inspect_template_name=inspect_template_name, inspect_config=inspect_config, item=content_item, ) # Call DLP API response = await self.dlp_client.inspect_content(request=request) findings_count = len(response.result.findings) logger.info(f"DLP {template_id} Findings: {findings_count}") if findings_count > 0: return self._obfuscate_text(response, text) return text except Exception as e: logger.error( f"Error during DLP inspection: {e}. Returning original text.", exc_info=True, ) return text def _obfuscate_text(self, response: types.InspectContentResponse, text: str) -> str: """Obfuscate sensitive findings in text. Args: response: DLP inspect content response with findings text: Original text Returns: Text with sensitive data obfuscated """ # Filter findings by likelihood (> POSSIBLE, which is value 3) findings = [ finding for finding in response.result.findings if finding.likelihood.value > 3 ] # Sort by likelihood (descending) findings.sort(key=lambda f: f.likelihood.value, reverse=True) for finding in findings: quote = finding.quote info_type = finding.info_type.name logger.info( f"InfoType: {info_type} | Likelihood: {finding.likelihood.value}", ) # Obfuscate based on info type replacement = self._get_replacement(info_type, quote) if replacement: text = text.replace(quote, replacement) # Clean up consecutive DIRECCION tags return self._clean_direccion(text) def _get_replacement(self, info_type: str, quote: str) -> str | None: """Get replacement text for a given info type. Args: info_type: DLP info type name quote: Original sensitive text Returns: Replacement text or None to skip """ replacements = { "CREDIT_CARD_NUMBER": f"**** **** **** {self._get_last4(quote)}", "CREDIT_CARD_EXPIRATION_DATE": "[FECHA_VENCIMIENTO_TARJETA]", "FECHA_VENCIMIENTO": "[FECHA_VENCIMIENTO_TARJETA]", "CVV_NUMBER": "[CVV]", "CVV": "[CVV]", "EMAIL_ADDRESS": "[CORREO]", "PERSON_NAME": "[NOMBRE]", "PHONE_NUMBER": "[TELEFONO]", "DIRECCION": "[DIRECCION]", "DIR_COLONIA": "[DIRECCION]", "DIR_DEL_MUN": "[DIRECCION]", "DIR_INTERIOR": "[DIRECCION]", "DIR_ESQUINA": "[DIRECCION]", "DIR_CIUDAD_EDO": "[DIRECCION]", "DIR_CP": "[DIRECCION]", "CLABE_INTERBANCARIA": "[CLABE]", "CLAVE_RASTREO_SPEI": "[CLAVE_RASTREO]", "NIP": "[NIP]", "SALDO": "[SALDO]", "CUENTA": f"**************{self._get_last4(quote)}", "NUM_ACLARACION": "[NUM_ACLARACION]", } return replacements.get(info_type) def _get_last4(self, quote: str) -> str: """Extract last 4 characters from quote (removing spaces).""" clean_quote = quote.strip().replace(" ", "") if len(clean_quote) >= 4: return clean_quote[-4:] return clean_quote def _clean_direccion(self, text: str) -> str: """Clean up consecutive [DIRECCION] tags.""" # Replace multiple [DIRECCION] tags separated by commas or spaces with single tag pattern = r"\[DIRECCION\](?:(?:,\s*|\s+)\[DIRECCION\])*" return re.sub(pattern, "[DIRECCION]", text).strip() async def close(self) -> None: """Close DLP client.""" await self.dlp_client.transport.close() logger.info("DLP client closed")