forked from innovacion/playground
Initial commit
This commit is contained in:
166
test/banana.py
Normal file
166
test/banana.py
Normal file
@@ -0,0 +1,166 @@
|
||||
import base64
|
||||
import json
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
import google.oauth2.service_account as sa
|
||||
import vertexai
|
||||
from vertexai.preview.vision_models import ImageGenerationModel
|
||||
|
||||
|
||||
class GeminiImageChat:
|
||||
"""Chat interactivo para generación de imágenes con Gemini"""
|
||||
|
||||
def __init__(self, credentials_path=None):
|
||||
"""Inicializa el cliente de Gemini"""
|
||||
# Buscar archivo de credenciales
|
||||
if credentials_path is None:
|
||||
credentials_path = Path(__file__).parent.parent / "bnt-ia-innovacion-new.json"
|
||||
|
||||
credentials_path = Path(credentials_path)
|
||||
|
||||
if not credentials_path.exists():
|
||||
raise ValueError(f"Archivo de credenciales no encontrado: {credentials_path}")
|
||||
|
||||
# Leer credenciales del archivo JSON
|
||||
with open(credentials_path, 'r') as f:
|
||||
credentials_json = json.load(f)
|
||||
|
||||
# Cargar credenciales usando el método correcto
|
||||
credentials = sa.Credentials.from_service_account_file(str(credentials_path))
|
||||
|
||||
# Inicializar Vertex AI
|
||||
vertexai.init(
|
||||
project=credentials_json.get('project_id'),
|
||||
credentials=credentials,
|
||||
location='us-central1'
|
||||
)
|
||||
|
||||
# Inicializar modelo de generación de imágenes
|
||||
self.model = ImageGenerationModel.from_pretrained("gemini-2.5-flash-image")
|
||||
self.output_dir = Path("generated_images")
|
||||
self.output_dir.mkdir(exist_ok=True)
|
||||
|
||||
def save_image(self, image_data, prompt):
|
||||
"""Guarda la imagen generada en disco"""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
# Sanitizar el prompt para usar como parte del nombre
|
||||
safe_prompt = "".join(c for c in prompt[:30] if c.isalnum() or c in (' ', '_')).rstrip()
|
||||
safe_prompt = safe_prompt.replace(' ', '_')
|
||||
|
||||
filename = f"{timestamp}_{safe_prompt}.png"
|
||||
filepath = self.output_dir / filename
|
||||
|
||||
# Decodificar y guardar la imagen
|
||||
if isinstance(image_data, str):
|
||||
# Si es base64
|
||||
image_bytes = base64.b64decode(image_data)
|
||||
else:
|
||||
image_bytes = image_data
|
||||
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(image_bytes)
|
||||
|
||||
return filepath
|
||||
|
||||
def generate_image(self, prompt, aspect_ratio="1:1", save=True):
|
||||
"""Genera una imagen basada en el prompt"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Generando imagen para: {prompt}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
try:
|
||||
# Generar imágenes usando Vertex AI
|
||||
images = self.model.generate_images(
|
||||
prompt=prompt,
|
||||
number_of_images=1,
|
||||
language="es",
|
||||
aspect_ratio=aspect_ratio,
|
||||
safety_filter_level="block_some",
|
||||
person_generation="allow_adult",
|
||||
)
|
||||
|
||||
# Guardar imágenes generadas
|
||||
saved_files = []
|
||||
if images and save:
|
||||
for idx, image in enumerate(images.images):
|
||||
# Convertir imagen a bytes
|
||||
image_bytes = image._image_bytes
|
||||
|
||||
# Guardar usando el método existente
|
||||
filepath = self.save_image(image_bytes, prompt)
|
||||
saved_files.append(filepath)
|
||||
print(f"✓ Imagen {idx + 1} guardada en: {filepath}")
|
||||
|
||||
return {
|
||||
"images": saved_files,
|
||||
"success": True
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n✗ Error al generar imagen: {str(e)}")
|
||||
return {
|
||||
"images": [],
|
||||
"success": False,
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
def start_chat(self):
|
||||
"""Inicia el chat interactivo"""
|
||||
print("\n" + "="*60)
|
||||
print("🎨 CHAT DE GENERACIÓN DE IMÁGENES CON GEMINI")
|
||||
print("="*60)
|
||||
print("\nComandos disponibles:")
|
||||
print(" - Escribe un prompt para generar una imagen")
|
||||
print(" - 'config' - Cambiar configuración de imagen")
|
||||
print(" - 'salir' - Salir del chat")
|
||||
print("="*60 + "\n")
|
||||
|
||||
aspect_ratio = "1:1"
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = input("\n💭 Prompt: ").strip()
|
||||
|
||||
if not user_input:
|
||||
continue
|
||||
|
||||
if user_input.lower() == 'salir':
|
||||
print("\n👋 ¡Hasta pronto!")
|
||||
break
|
||||
|
||||
elif user_input.lower() == 'config':
|
||||
print("\nConfiguración actual:")
|
||||
print(f" Aspect ratio: {aspect_ratio}")
|
||||
print("\nOpciones de aspect ratio: 1:1, 16:9, 9:16, 3:4, 4:3")
|
||||
|
||||
new_ratio = input("\nNuevo aspect ratio (Enter para mantener): ").strip()
|
||||
if new_ratio:
|
||||
aspect_ratio = new_ratio
|
||||
|
||||
print(f"\n✓ Configuración actualizada: {aspect_ratio}")
|
||||
|
||||
else:
|
||||
# Generar imagen
|
||||
self.generate_image(user_input, aspect_ratio)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n👋 ¡Hasta pronto!")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"\n✗ Error: {str(e)}")
|
||||
|
||||
|
||||
def main():
|
||||
"""Función principal"""
|
||||
try:
|
||||
chat = GeminiImageChat()
|
||||
chat.start_chat()
|
||||
except ValueError as e:
|
||||
print(f"✗ Error de configuración: {e}")
|
||||
print("\nAsegúrate de tener el archivo bnt-ia-innovacion-new.json en el directorio raíz")
|
||||
except Exception as e:
|
||||
print(f"✗ Error inesperado: {e}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
273
test/list_models.py
Normal file
273
test/list_models.py
Normal file
@@ -0,0 +1,273 @@
|
||||
import json
|
||||
import os
|
||||
import base64
|
||||
import mimetypes
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from google import genai
|
||||
from google.genai import types
|
||||
from google.genai.types import GenerateVideosConfig
|
||||
|
||||
# Leer credenciales
|
||||
credentials_path = Path(__file__).parent.parent / "bnt-ia-innovacion-new.json"
|
||||
|
||||
with open(credentials_path, 'r') as f:
|
||||
credentials_json = json.load(f)
|
||||
|
||||
# Configurar variable de entorno con las credenciales
|
||||
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = str(credentials_path)
|
||||
|
||||
# Crear cliente de genai
|
||||
client = genai.Client(
|
||||
vertexai=True,
|
||||
project=credentials_json.get('project_id'),
|
||||
location='us-central1'
|
||||
)
|
||||
|
||||
# Crear directorio para videos
|
||||
output_dir = Path("generated_videos")
|
||||
output_dir.mkdir(exist_ok=True)
|
||||
|
||||
print("="*60)
|
||||
print("CHAT DE GENERACIÓN DE VIDEOS CON GOOGLE VEO")
|
||||
print("="*60)
|
||||
print(f"\nProyecto: {credentials_json.get('project_id')}")
|
||||
print(f"Ubicación: us-central1")
|
||||
print(f"Modelo: veo-002")
|
||||
print(f"Directorio de salida: {output_dir}")
|
||||
print("="*60)
|
||||
|
||||
|
||||
def save_video(video_data, prompt, index=0):
|
||||
"""Guarda el video generado en disco"""
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
# Sanitizar el prompt para usar como parte del nombre
|
||||
safe_prompt = "".join(c for c in prompt[:30] if c.isalnum() or c in (' ', '_')).rstrip()
|
||||
safe_prompt = safe_prompt.replace(' ', '_')
|
||||
|
||||
filename = f"{timestamp}_{safe_prompt}_{index}.mp4"
|
||||
filepath = output_dir / filename
|
||||
|
||||
# Decodificar y guardar el video
|
||||
if isinstance(video_data, str):
|
||||
# Si es base64
|
||||
video_bytes = base64.b64decode(video_data)
|
||||
else:
|
||||
video_bytes = video_data
|
||||
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(video_bytes)
|
||||
|
||||
return filepath
|
||||
|
||||
|
||||
def load_reference_media(media_path):
|
||||
"""Carga una imagen o video de referencia y la convierte a Part"""
|
||||
try:
|
||||
media_path = Path(media_path)
|
||||
|
||||
if not media_path.exists():
|
||||
print(f"✗ Archivo no encontrado: {media_path}")
|
||||
return None
|
||||
|
||||
# Detectar tipo MIME
|
||||
mime_type, _ = mimetypes.guess_type(str(media_path))
|
||||
if mime_type is None:
|
||||
print(f"✗ No se pudo detectar el tipo de archivo: {media_path}")
|
||||
return None
|
||||
|
||||
if not (mime_type.startswith('image/') or mime_type.startswith('video/')):
|
||||
print(f"✗ Archivo no es una imagen o video válido: {media_path}")
|
||||
return None
|
||||
|
||||
# Leer archivo
|
||||
with open(media_path, 'rb') as f:
|
||||
media_bytes = f.read()
|
||||
|
||||
# Crear Part con el archivo
|
||||
return types.Part.from_bytes(
|
||||
data=media_bytes,
|
||||
mime_type=mime_type
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
print(f"✗ Error al cargar archivo: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def generate_video(prompt, reference_media=None, duration=5):
|
||||
"""Genera un video y lo guarda
|
||||
|
||||
Args:
|
||||
prompt: Texto descriptivo para generar el video
|
||||
reference_media: Lista de rutas a imágenes de referencia (opcional)
|
||||
duration: Duración del video en segundos (5 u 8)
|
||||
"""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Generando video: {prompt}")
|
||||
print(f"Duración: {duration} segundos")
|
||||
if reference_media:
|
||||
print(f"Con {len(reference_media)} imagen(es) de referencia")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
try:
|
||||
# Configurar bucket de salida (temporal en GCS)
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
safe_prompt = "".join(c for c in prompt[:20] if c.isalnum() or c in (' ', '_')).rstrip()
|
||||
safe_prompt = safe_prompt.replace(' ', '_')
|
||||
output_gcs_uri = f"gs://bnt-ia-innovacion-videos/veo_output/{timestamp}_{safe_prompt}/"
|
||||
|
||||
# Preparar imagen de referencia si existe
|
||||
reference_image = None
|
||||
if reference_media and len(reference_media) > 0:
|
||||
# Para Veo, solo se puede usar una imagen de referencia
|
||||
img_path = reference_media[0]
|
||||
print(f"📸 Cargando imagen de referencia: {img_path}")
|
||||
|
||||
# Subir imagen a GCS o usar directamente si ya está en GCS
|
||||
if img_path.startswith('gs://'):
|
||||
reference_image = types.Image(gcs_uri=img_path)
|
||||
print(f"✓ Usando imagen de GCS: {img_path}")
|
||||
else:
|
||||
# Por ahora mostrar advertencia
|
||||
print(f"⚠ La imagen debe estar en Google Cloud Storage (gs://)")
|
||||
print(f"⚠ Continuando sin imagen de referencia...")
|
||||
reference_image = None
|
||||
|
||||
print("\n⏳ Generando video con Veo (esto puede tardar varios minutos)...")
|
||||
print(f"📂 El video se guardará en: {output_gcs_uri}")
|
||||
start_time = time.time()
|
||||
|
||||
# Configuración de Veo
|
||||
config = GenerateVideosConfig(
|
||||
aspect_ratio="16:9",
|
||||
output_gcs_uri=output_gcs_uri,
|
||||
)
|
||||
|
||||
# Generar video con Veo
|
||||
if reference_image:
|
||||
operation = client.models.generate_videos(
|
||||
model="veo-3.1-generate-001",
|
||||
prompt=prompt,
|
||||
reference_image=reference_image,
|
||||
config=config,
|
||||
)
|
||||
else:
|
||||
operation = client.models.generate_videos(
|
||||
model="veo-3.1-generate-001",
|
||||
prompt=prompt,
|
||||
config=config,
|
||||
)
|
||||
|
||||
print(f"🔄 Operación iniciada: {operation.name}")
|
||||
print("⏳ Esperando a que se complete la generación...")
|
||||
|
||||
# Polling de la operación cada 15 segundos
|
||||
poll_count = 0
|
||||
while not operation.done:
|
||||
time.sleep(15)
|
||||
operation = client.operations.get(operation)
|
||||
poll_count += 1
|
||||
elapsed = time.time() - start_time
|
||||
print(f"⏱ {elapsed:.0f}s - Verificación #{poll_count} - Estado: {'Completado' if operation.done else 'En proceso...'}")
|
||||
|
||||
elapsed_time = time.time() - start_time
|
||||
print(f"\n✓ Generación completada en {elapsed_time:.2f} segundos")
|
||||
|
||||
# Obtener el resultado
|
||||
if hasattr(operation, 'result') and operation.result:
|
||||
if hasattr(operation.result, 'generated_videos') and operation.result.generated_videos:
|
||||
video_uri = operation.result.generated_videos[0].video.uri
|
||||
print(f"\n✓ Video generado exitosamente!")
|
||||
print(f"📹 URI del video en GCS: {video_uri}")
|
||||
|
||||
# Intentar descargar el video de GCS
|
||||
try:
|
||||
from google.cloud import storage
|
||||
|
||||
# Parsear la URI de GCS
|
||||
gcs_path = video_uri.replace('gs://', '')
|
||||
bucket_name = gcs_path.split('/')[0]
|
||||
blob_name = '/'.join(gcs_path.split('/')[1:])
|
||||
|
||||
print(f"\n⬇ Descargando video de GCS...")
|
||||
storage_client = storage.Client(project=credentials_json.get('project_id'))
|
||||
bucket = storage_client.bucket(bucket_name)
|
||||
blob = bucket.blob(blob_name)
|
||||
|
||||
# Guardar localmente
|
||||
local_filename = f"{timestamp}_{safe_prompt}.mp4"
|
||||
local_filepath = output_dir / local_filename
|
||||
blob.download_to_filename(str(local_filepath))
|
||||
|
||||
print(f"✓ Video descargado en: {local_filepath}")
|
||||
return True
|
||||
|
||||
except ImportError:
|
||||
print("\n⚠ Para descargar automáticamente, instala: pip install google-cloud-storage")
|
||||
print(f"📹 Puedes descargar el video manualmente desde: {video_uri}")
|
||||
return True
|
||||
except Exception as e:
|
||||
print(f"\n⚠ No se pudo descargar automáticamente: {e}")
|
||||
print(f"📹 Puedes descargar el video manualmente desde: {video_uri}")
|
||||
return True
|
||||
else:
|
||||
print("\n⚠ No se generaron videos en la respuesta")
|
||||
print(f"DEBUG - Resultado: {operation.result}")
|
||||
return False
|
||||
else:
|
||||
print("\n⚠ La operación no tiene resultado")
|
||||
print(f"DEBUG - Operación: {operation}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
print(f"\n✗ Error al generar video: {str(e)}")
|
||||
import traceback
|
||||
print("\nTraceback completo:")
|
||||
traceback.print_exc()
|
||||
return False
|
||||
|
||||
|
||||
# Chat interactivo
|
||||
print("\nComandos:")
|
||||
print(" - Escribe un prompt para generar un video")
|
||||
print(" - Para especificar duración: prompt [5s] o [8s]")
|
||||
print(" - Para usar archivos de referencia: prompt | ruta1.jpg | video.mp4")
|
||||
print(" - Ejemplo: un gato saltando en slow motion [8s] | referencia.jpg")
|
||||
print(" - 'salir' para terminar\n")
|
||||
|
||||
while True:
|
||||
try:
|
||||
user_input = input("\n🎬 Prompt: ").strip()
|
||||
|
||||
if not user_input:
|
||||
continue
|
||||
|
||||
if user_input.lower() == 'salir':
|
||||
print("\n👋 ¡Hasta pronto!")
|
||||
break
|
||||
|
||||
# Extraer duración si está especificada
|
||||
duration = 5 # Por defecto 5 segundos
|
||||
if '[5s]' in user_input:
|
||||
duration = 5
|
||||
user_input = user_input.replace('[5s]', '').strip()
|
||||
elif '[8s]' in user_input:
|
||||
duration = 8
|
||||
user_input = user_input.replace('[8s]', '').strip()
|
||||
|
||||
# Verificar si hay archivos de referencia
|
||||
if '|' in user_input:
|
||||
parts = [p.strip() for p in user_input.split('|')]
|
||||
prompt = parts[0]
|
||||
reference_media = parts[1:] if len(parts) > 1 else None
|
||||
generate_video(prompt, reference_media, duration)
|
||||
else:
|
||||
generate_video(user_input, duration=duration)
|
||||
|
||||
except KeyboardInterrupt:
|
||||
print("\n\n👋 ¡Hasta pronto!")
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"\n✗ Error: {str(e)}")
|
||||
Reference in New Issue
Block a user