Files
luma/backend/app/services/file_service.py
2025-09-08 21:46:10 +00:00

243 lines
8.2 KiB
Python

import os
import re
from typing import Optional, Tuple
import logging
from ..services.azure_service import azure_service
logger = logging.getLogger(__name__)
class FileService:
"""
Servicio para lógica de negocio de archivos
"""
async def check_file_exists(self, filename: str, tema: str = "") -> bool:
"""
Verificar si un archivo ya existe en el almacenamiento
Args:
filename: Nombre del archivo
tema: Tema donde buscar el archivo
Returns:
bool: True si el archivo existe, False si no
"""
try:
await azure_service.get_file_info(filename, tema)
return True
except FileNotFoundError:
return False
except Exception as e:
logger.error(f"Error verificando existencia del archivo '{filename}': {e}")
return False
def generate_new_filename(self, original_filename: str, existing_files: list = None) -> str:
"""
Generar un nuevo nombre de archivo si ya existe uno igual
Args:
original_filename: Nombre original del archivo
existing_files: Lista de archivos existentes (opcional)
Returns:
str: Nuevo nombre de archivo (ej: archivo_1.pdf)
"""
# Separar nombre y extensión
name, extension = os.path.splitext(original_filename)
# Buscar si ya tiene un número al final (ej: archivo_1.pdf)
match = re.search(r'(.+)_(\d+)$', name)
if match:
base_name = match.group(1)
current_number = int(match.group(2))
else:
base_name = name
current_number = 0
# Incrementar número hasta encontrar uno disponible
counter = current_number + 1
new_filename = f"{base_name}_{counter}{extension}"
# Si tenemos lista de archivos existentes, verificar contra ella
if existing_files:
while new_filename in existing_files:
counter += 1
new_filename = f"{base_name}_{counter}{extension}"
return new_filename
async def handle_file_conflict(self, filename: str, tema: str = "") -> Tuple[bool, Optional[str]]:
"""
Manejar conflicto cuando un archivo ya existe
Args:
filename: Nombre del archivo
tema: Tema donde está el archivo
Returns:
Tuple[bool, Optional[str]]: (existe_conflicto, nombre_sugerido)
"""
exists = await self.check_file_exists(filename, tema)
if not exists:
return False, None
# Generar nombre alternativo
suggested_name = self.generate_new_filename(filename)
# Verificar que el nombre sugerido tampoco exista
counter = 1
while await self.check_file_exists(suggested_name, tema):
name, extension = os.path.splitext(filename)
# Buscar base sin número
match = re.search(r'(.+)_(\d+)$', name)
base_name = match.group(1) if match else name
counter += 1
suggested_name = f"{base_name}_{counter}{extension}"
return True, suggested_name
def validate_filename(self, filename: str) -> Tuple[bool, Optional[str]]:
"""
Validar que el nombre del archivo sea válido
Args:
filename: Nombre del archivo a validar
Returns:
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
"""
if not filename:
return False, "Nombre de archivo requerido"
# Verificar caracteres no permitidos
invalid_chars = r'[<>:"/\\|?*]'
if re.search(invalid_chars, filename):
return False, "El nombre contiene caracteres no permitidos: < > : \" / \\ | ? *"
# Verificar longitud
if len(filename) > 255:
return False, "Nombre de archivo demasiado largo (máximo 255 caracteres)"
# Verificar nombres reservados (Windows)
reserved_names = [
'CON', 'PRN', 'AUX', 'NUL',
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
]
name_without_ext = os.path.splitext(filename)[0].upper()
if name_without_ext in reserved_names:
return False, f"'{name_without_ext}' es un nombre reservado del sistema"
return True, None
def validate_file_extension(self, filename: str) -> Tuple[bool, Optional[str]]:
"""
Validar que la extensión del archivo esté permitida
Args:
filename: Nombre del archivo
Returns:
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
"""
allowed_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.csv']
file_extension = os.path.splitext(filename)[1].lower()
if not file_extension:
return False, "Archivo debe tener una extensión"
if file_extension not in allowed_extensions:
return False, f"Extensión no permitida. Permitidas: {', '.join(allowed_extensions)}"
return True, None
def validate_file_size(self, file_size: int) -> Tuple[bool, Optional[str]]:
"""
Validar que el tamaño del archivo esté dentro de los límites
Args:
file_size: Tamaño del archivo en bytes
Returns:
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
"""
max_size = 100 * 1024 * 1024 # 100MB
if file_size <= 0:
return False, "El archivo está vacío"
if file_size > max_size:
max_size_mb = max_size / (1024 * 1024)
return False, f"Archivo demasiado grande. Tamaño máximo: {max_size_mb}MB"
return True, None
def validate_tema(self, tema: str) -> Tuple[bool, Optional[str]]:
"""
Validar que el tema sea válido
Args:
tema: Nombre del tema
Returns:
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
"""
if not tema:
return True, None # Tema opcional
# Verificar caracteres permitidos
if not re.match(r'^[a-zA-Z0-9\-_\s]+$', tema):
return False, "Tema solo puede contener letras, números, guiones y espacios"
# Verificar longitud
if len(tema) > 50:
return False, "Nombre de tema demasiado largo (máximo 50 caracteres)"
return True, None
def clean_tema_name(self, tema: str) -> str:
"""
Limpiar nombre de tema para Azure Storage
Args:
tema: Nombre del tema original
Returns:
str: Nombre limpio para usar como carpeta
"""
if not tema:
return ""
# Convertir a minúsculas y reemplazar espacios con guiones
cleaned = tema.lower().strip()
cleaned = re.sub(r'\s+', '-', cleaned) # Espacios múltiples a un guion
cleaned = re.sub(r'[^a-z0-9\-_]', '', cleaned) # Solo caracteres permitidos
cleaned = re.sub(r'-+', '-', cleaned) # Guiones múltiples a uno
cleaned = cleaned.strip('-') # Quitar guiones al inicio/final
return cleaned
async def get_existing_files_in_tema(self, tema: str = "") -> list:
"""
Obtener lista de nombres de archivos existentes en un tema
Args:
tema: Tema donde buscar archivos
Returns:
list: Lista de nombres de archivos
"""
try:
files_data = await azure_service.list_files(tema)
return [file_data["name"] for file_data in files_data]
except Exception as e:
logger.error(f"Error obteniendo archivos del tema '{tema}': {e}")
return []
# Instancia global del servicio
file_service = FileService()