243 lines
8.2 KiB
Python
243 lines
8.2 KiB
Python
import os
|
|
import re
|
|
from typing import Optional, Tuple
|
|
import logging
|
|
|
|
from ..services.azure_service import azure_service
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class FileService:
|
|
"""
|
|
Servicio para lógica de negocio de archivos
|
|
"""
|
|
|
|
async def check_file_exists(self, filename: str, tema: str = "") -> bool:
|
|
"""
|
|
Verificar si un archivo ya existe en el almacenamiento
|
|
|
|
Args:
|
|
filename: Nombre del archivo
|
|
tema: Tema donde buscar el archivo
|
|
|
|
Returns:
|
|
bool: True si el archivo existe, False si no
|
|
"""
|
|
try:
|
|
await azure_service.get_file_info(filename, tema)
|
|
return True
|
|
except FileNotFoundError:
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"Error verificando existencia del archivo '{filename}': {e}")
|
|
return False
|
|
|
|
def generate_new_filename(self, original_filename: str, existing_files: list = None) -> str:
|
|
"""
|
|
Generar un nuevo nombre de archivo si ya existe uno igual
|
|
|
|
Args:
|
|
original_filename: Nombre original del archivo
|
|
existing_files: Lista de archivos existentes (opcional)
|
|
|
|
Returns:
|
|
str: Nuevo nombre de archivo (ej: archivo_1.pdf)
|
|
"""
|
|
# Separar nombre y extensión
|
|
name, extension = os.path.splitext(original_filename)
|
|
|
|
# Buscar si ya tiene un número al final (ej: archivo_1.pdf)
|
|
match = re.search(r'(.+)_(\d+)$', name)
|
|
if match:
|
|
base_name = match.group(1)
|
|
current_number = int(match.group(2))
|
|
else:
|
|
base_name = name
|
|
current_number = 0
|
|
|
|
# Incrementar número hasta encontrar uno disponible
|
|
counter = current_number + 1
|
|
new_filename = f"{base_name}_{counter}{extension}"
|
|
|
|
# Si tenemos lista de archivos existentes, verificar contra ella
|
|
if existing_files:
|
|
while new_filename in existing_files:
|
|
counter += 1
|
|
new_filename = f"{base_name}_{counter}{extension}"
|
|
|
|
return new_filename
|
|
|
|
async def handle_file_conflict(self, filename: str, tema: str = "") -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Manejar conflicto cuando un archivo ya existe
|
|
|
|
Args:
|
|
filename: Nombre del archivo
|
|
tema: Tema donde está el archivo
|
|
|
|
Returns:
|
|
Tuple[bool, Optional[str]]: (existe_conflicto, nombre_sugerido)
|
|
"""
|
|
exists = await self.check_file_exists(filename, tema)
|
|
|
|
if not exists:
|
|
return False, None
|
|
|
|
# Generar nombre alternativo
|
|
suggested_name = self.generate_new_filename(filename)
|
|
|
|
# Verificar que el nombre sugerido tampoco exista
|
|
counter = 1
|
|
while await self.check_file_exists(suggested_name, tema):
|
|
name, extension = os.path.splitext(filename)
|
|
# Buscar base sin número
|
|
match = re.search(r'(.+)_(\d+)$', name)
|
|
base_name = match.group(1) if match else name
|
|
counter += 1
|
|
suggested_name = f"{base_name}_{counter}{extension}"
|
|
|
|
return True, suggested_name
|
|
|
|
def validate_filename(self, filename: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validar que el nombre del archivo sea válido
|
|
|
|
Args:
|
|
filename: Nombre del archivo a validar
|
|
|
|
Returns:
|
|
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
|
"""
|
|
if not filename:
|
|
return False, "Nombre de archivo requerido"
|
|
|
|
# Verificar caracteres no permitidos
|
|
invalid_chars = r'[<>:"/\\|?*]'
|
|
if re.search(invalid_chars, filename):
|
|
return False, "El nombre contiene caracteres no permitidos: < > : \" / \\ | ? *"
|
|
|
|
# Verificar longitud
|
|
if len(filename) > 255:
|
|
return False, "Nombre de archivo demasiado largo (máximo 255 caracteres)"
|
|
|
|
# Verificar nombres reservados (Windows)
|
|
reserved_names = [
|
|
'CON', 'PRN', 'AUX', 'NUL',
|
|
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
|
|
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
|
|
]
|
|
name_without_ext = os.path.splitext(filename)[0].upper()
|
|
if name_without_ext in reserved_names:
|
|
return False, f"'{name_without_ext}' es un nombre reservado del sistema"
|
|
|
|
return True, None
|
|
|
|
def validate_file_extension(self, filename: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validar que la extensión del archivo esté permitida
|
|
|
|
Args:
|
|
filename: Nombre del archivo
|
|
|
|
Returns:
|
|
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
|
"""
|
|
allowed_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.csv']
|
|
|
|
file_extension = os.path.splitext(filename)[1].lower()
|
|
|
|
if not file_extension:
|
|
return False, "Archivo debe tener una extensión"
|
|
|
|
if file_extension not in allowed_extensions:
|
|
return False, f"Extensión no permitida. Permitidas: {', '.join(allowed_extensions)}"
|
|
|
|
return True, None
|
|
|
|
def validate_file_size(self, file_size: int) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validar que el tamaño del archivo esté dentro de los límites
|
|
|
|
Args:
|
|
file_size: Tamaño del archivo en bytes
|
|
|
|
Returns:
|
|
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
|
"""
|
|
max_size = 100 * 1024 * 1024 # 100MB
|
|
|
|
if file_size <= 0:
|
|
return False, "El archivo está vacío"
|
|
|
|
if file_size > max_size:
|
|
max_size_mb = max_size / (1024 * 1024)
|
|
return False, f"Archivo demasiado grande. Tamaño máximo: {max_size_mb}MB"
|
|
|
|
return True, None
|
|
|
|
def validate_tema(self, tema: str) -> Tuple[bool, Optional[str]]:
|
|
"""
|
|
Validar que el tema sea válido
|
|
|
|
Args:
|
|
tema: Nombre del tema
|
|
|
|
Returns:
|
|
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
|
"""
|
|
if not tema:
|
|
return True, None # Tema opcional
|
|
|
|
# Verificar caracteres permitidos
|
|
if not re.match(r'^[a-zA-Z0-9\-_\s]+$', tema):
|
|
return False, "Tema solo puede contener letras, números, guiones y espacios"
|
|
|
|
# Verificar longitud
|
|
if len(tema) > 50:
|
|
return False, "Nombre de tema demasiado largo (máximo 50 caracteres)"
|
|
|
|
return True, None
|
|
|
|
def clean_tema_name(self, tema: str) -> str:
|
|
"""
|
|
Limpiar nombre de tema para Azure Storage
|
|
|
|
Args:
|
|
tema: Nombre del tema original
|
|
|
|
Returns:
|
|
str: Nombre limpio para usar como carpeta
|
|
"""
|
|
if not tema:
|
|
return ""
|
|
|
|
# Convertir a minúsculas y reemplazar espacios con guiones
|
|
cleaned = tema.lower().strip()
|
|
cleaned = re.sub(r'\s+', '-', cleaned) # Espacios múltiples a un guion
|
|
cleaned = re.sub(r'[^a-z0-9\-_]', '', cleaned) # Solo caracteres permitidos
|
|
cleaned = re.sub(r'-+', '-', cleaned) # Guiones múltiples a uno
|
|
cleaned = cleaned.strip('-') # Quitar guiones al inicio/final
|
|
|
|
return cleaned
|
|
|
|
async def get_existing_files_in_tema(self, tema: str = "") -> list:
|
|
"""
|
|
Obtener lista de nombres de archivos existentes en un tema
|
|
|
|
Args:
|
|
tema: Tema donde buscar archivos
|
|
|
|
Returns:
|
|
list: Lista de nombres de archivos
|
|
"""
|
|
try:
|
|
files_data = await azure_service.list_files(tema)
|
|
return [file_data["name"] for file_data in files_data]
|
|
except Exception as e:
|
|
logger.error(f"Error obteniendo archivos del tema '{tema}': {e}")
|
|
return []
|
|
|
|
|
|
# Instancia global del servicio
|
|
file_service = FileService() |