Files
searchbox/tests/test_mcp/conftest.py
Anibal Angulo b1021bcbd5 Make MCP server optional dependency
The fastmcp server code is now an optional dependency that can be
installed with the "mcp" extra. Core vector search functionality is
available without the MCP server dependency.
2025-09-26 17:12:37 +00:00

68 lines
1.9 KiB
Python

import subprocess
import time
import pytest
from fastembed import TextEmbedding
from qdrant_client import QdrantClient
from qdrant_client.models import Distance, PointStruct, VectorParams
@pytest.fixture(scope="session")
def embedding_model():
return TextEmbedding()
@pytest.fixture(scope="session")
def qdrant_client(embedding_model: TextEmbedding):
client = QdrantClient(":memory:")
documents: list[str] = [
"Rick es el mas guapo",
"Los pulpos tienen tres corazones y sangre azul",
"Las cucarachas pueden vivir hasta una semana sin cabeza",
"Los koalas tienen huellas dactilares casi idénticas a las humanas",
"La miel nunca se echa a perder, incluso después de miles de años",
]
embeddings = list(embedding_model.embed(documents))
size = len(embeddings[0])
_ = client.recreate_collection(
collection_name="dummy_collection",
vectors_config=VectorParams(distance=Distance.COSINE, size=size),
)
for idx, (emb, document) in enumerate(zip(embeddings, documents)):
_ = client.upsert(
collection_name="dummy_collection",
points=[
PointStruct(id=idx, vector=emb.tolist(), payload={"text": document})
],
)
yield client
@pytest.fixture(scope="session", autouse=True)
def run_mcp():
# Start the MCP server in the background
process = subprocess.Popen(
["uv", "run", "vector-search-mcp"],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
# Give the server a moment to start up
time.sleep(2)
try:
yield "http://localhost:8000/sse"
finally:
# Clean up the process when tests are done
process.terminate()
try:
_ = process.wait(timeout=5)
except subprocess.TimeoutExpired:
process.kill()
_ = process.wait()