V1 de backend funcional
This commit is contained in:
243
backend/app/services/file_service.py
Normal file
243
backend/app/services/file_service.py
Normal file
@@ -0,0 +1,243 @@
|
||||
import os
|
||||
import re
|
||||
from typing import Optional, Tuple
|
||||
import logging
|
||||
|
||||
from ..services.azure_service import azure_service
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class FileService:
|
||||
"""
|
||||
Servicio para lógica de negocio de archivos
|
||||
"""
|
||||
|
||||
async def check_file_exists(self, filename: str, tema: str = "") -> bool:
|
||||
"""
|
||||
Verificar si un archivo ya existe en el almacenamiento
|
||||
|
||||
Args:
|
||||
filename: Nombre del archivo
|
||||
tema: Tema donde buscar el archivo
|
||||
|
||||
Returns:
|
||||
bool: True si el archivo existe, False si no
|
||||
"""
|
||||
try:
|
||||
await azure_service.get_file_info(filename, tema)
|
||||
return True
|
||||
except FileNotFoundError:
|
||||
return False
|
||||
except Exception as e:
|
||||
logger.error(f"Error verificando existencia del archivo '{filename}': {e}")
|
||||
return False
|
||||
|
||||
def generate_new_filename(self, original_filename: str, existing_files: list = None) -> str:
|
||||
"""
|
||||
Generar un nuevo nombre de archivo si ya existe uno igual
|
||||
|
||||
Args:
|
||||
original_filename: Nombre original del archivo
|
||||
existing_files: Lista de archivos existentes (opcional)
|
||||
|
||||
Returns:
|
||||
str: Nuevo nombre de archivo (ej: archivo_1.pdf)
|
||||
"""
|
||||
# Separar nombre y extensión
|
||||
name, extension = os.path.splitext(original_filename)
|
||||
|
||||
# Buscar si ya tiene un número al final (ej: archivo_1.pdf)
|
||||
match = re.search(r'(.+)_(\d+)$', name)
|
||||
if match:
|
||||
base_name = match.group(1)
|
||||
current_number = int(match.group(2))
|
||||
else:
|
||||
base_name = name
|
||||
current_number = 0
|
||||
|
||||
# Incrementar número hasta encontrar uno disponible
|
||||
counter = current_number + 1
|
||||
new_filename = f"{base_name}_{counter}{extension}"
|
||||
|
||||
# Si tenemos lista de archivos existentes, verificar contra ella
|
||||
if existing_files:
|
||||
while new_filename in existing_files:
|
||||
counter += 1
|
||||
new_filename = f"{base_name}_{counter}{extension}"
|
||||
|
||||
return new_filename
|
||||
|
||||
async def handle_file_conflict(self, filename: str, tema: str = "") -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Manejar conflicto cuando un archivo ya existe
|
||||
|
||||
Args:
|
||||
filename: Nombre del archivo
|
||||
tema: Tema donde está el archivo
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Optional[str]]: (existe_conflicto, nombre_sugerido)
|
||||
"""
|
||||
exists = await self.check_file_exists(filename, tema)
|
||||
|
||||
if not exists:
|
||||
return False, None
|
||||
|
||||
# Generar nombre alternativo
|
||||
suggested_name = self.generate_new_filename(filename)
|
||||
|
||||
# Verificar que el nombre sugerido tampoco exista
|
||||
counter = 1
|
||||
while await self.check_file_exists(suggested_name, tema):
|
||||
name, extension = os.path.splitext(filename)
|
||||
# Buscar base sin número
|
||||
match = re.search(r'(.+)_(\d+)$', name)
|
||||
base_name = match.group(1) if match else name
|
||||
counter += 1
|
||||
suggested_name = f"{base_name}_{counter}{extension}"
|
||||
|
||||
return True, suggested_name
|
||||
|
||||
def validate_filename(self, filename: str) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Validar que el nombre del archivo sea válido
|
||||
|
||||
Args:
|
||||
filename: Nombre del archivo a validar
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
||||
"""
|
||||
if not filename:
|
||||
return False, "Nombre de archivo requerido"
|
||||
|
||||
# Verificar caracteres no permitidos
|
||||
invalid_chars = r'[<>:"/\\|?*]'
|
||||
if re.search(invalid_chars, filename):
|
||||
return False, "El nombre contiene caracteres no permitidos: < > : \" / \\ | ? *"
|
||||
|
||||
# Verificar longitud
|
||||
if len(filename) > 255:
|
||||
return False, "Nombre de archivo demasiado largo (máximo 255 caracteres)"
|
||||
|
||||
# Verificar nombres reservados (Windows)
|
||||
reserved_names = [
|
||||
'CON', 'PRN', 'AUX', 'NUL',
|
||||
'COM1', 'COM2', 'COM3', 'COM4', 'COM5', 'COM6', 'COM7', 'COM8', 'COM9',
|
||||
'LPT1', 'LPT2', 'LPT3', 'LPT4', 'LPT5', 'LPT6', 'LPT7', 'LPT8', 'LPT9'
|
||||
]
|
||||
name_without_ext = os.path.splitext(filename)[0].upper()
|
||||
if name_without_ext in reserved_names:
|
||||
return False, f"'{name_without_ext}' es un nombre reservado del sistema"
|
||||
|
||||
return True, None
|
||||
|
||||
def validate_file_extension(self, filename: str) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Validar que la extensión del archivo esté permitida
|
||||
|
||||
Args:
|
||||
filename: Nombre del archivo
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
||||
"""
|
||||
allowed_extensions = ['.pdf', '.doc', '.docx', '.xls', '.xlsx', '.ppt', '.pptx', '.txt', '.csv']
|
||||
|
||||
file_extension = os.path.splitext(filename)[1].lower()
|
||||
|
||||
if not file_extension:
|
||||
return False, "Archivo debe tener una extensión"
|
||||
|
||||
if file_extension not in allowed_extensions:
|
||||
return False, f"Extensión no permitida. Permitidas: {', '.join(allowed_extensions)}"
|
||||
|
||||
return True, None
|
||||
|
||||
def validate_file_size(self, file_size: int) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Validar que el tamaño del archivo esté dentro de los límites
|
||||
|
||||
Args:
|
||||
file_size: Tamaño del archivo en bytes
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
||||
"""
|
||||
max_size = 100 * 1024 * 1024 # 100MB
|
||||
|
||||
if file_size <= 0:
|
||||
return False, "El archivo está vacío"
|
||||
|
||||
if file_size > max_size:
|
||||
max_size_mb = max_size / (1024 * 1024)
|
||||
return False, f"Archivo demasiado grande. Tamaño máximo: {max_size_mb}MB"
|
||||
|
||||
return True, None
|
||||
|
||||
def validate_tema(self, tema: str) -> Tuple[bool, Optional[str]]:
|
||||
"""
|
||||
Validar que el tema sea válido
|
||||
|
||||
Args:
|
||||
tema: Nombre del tema
|
||||
|
||||
Returns:
|
||||
Tuple[bool, Optional[str]]: (es_valido, mensaje_error)
|
||||
"""
|
||||
if not tema:
|
||||
return True, None # Tema opcional
|
||||
|
||||
# Verificar caracteres permitidos
|
||||
if not re.match(r'^[a-zA-Z0-9\-_\s]+$', tema):
|
||||
return False, "Tema solo puede contener letras, números, guiones y espacios"
|
||||
|
||||
# Verificar longitud
|
||||
if len(tema) > 50:
|
||||
return False, "Nombre de tema demasiado largo (máximo 50 caracteres)"
|
||||
|
||||
return True, None
|
||||
|
||||
def clean_tema_name(self, tema: str) -> str:
|
||||
"""
|
||||
Limpiar nombre de tema para Azure Storage
|
||||
|
||||
Args:
|
||||
tema: Nombre del tema original
|
||||
|
||||
Returns:
|
||||
str: Nombre limpio para usar como carpeta
|
||||
"""
|
||||
if not tema:
|
||||
return ""
|
||||
|
||||
# Convertir a minúsculas y reemplazar espacios con guiones
|
||||
cleaned = tema.lower().strip()
|
||||
cleaned = re.sub(r'\s+', '-', cleaned) # Espacios múltiples a un guion
|
||||
cleaned = re.sub(r'[^a-z0-9\-_]', '', cleaned) # Solo caracteres permitidos
|
||||
cleaned = re.sub(r'-+', '-', cleaned) # Guiones múltiples a uno
|
||||
cleaned = cleaned.strip('-') # Quitar guiones al inicio/final
|
||||
|
||||
return cleaned
|
||||
|
||||
async def get_existing_files_in_tema(self, tema: str = "") -> list:
|
||||
"""
|
||||
Obtener lista de nombres de archivos existentes en un tema
|
||||
|
||||
Args:
|
||||
tema: Tema donde buscar archivos
|
||||
|
||||
Returns:
|
||||
list: Lista de nombres de archivos
|
||||
"""
|
||||
try:
|
||||
files_data = await azure_service.list_files(tema)
|
||||
return [file_data["name"] for file_data in files_data]
|
||||
except Exception as e:
|
||||
logger.error(f"Error obteniendo archivos del tema '{tema}': {e}")
|
||||
return []
|
||||
|
||||
|
||||
# Instancia global del servicio
|
||||
file_service = FileService()
|
||||
Reference in New Issue
Block a user