forked from innovacion/searchbox
The fastmcp server code is now an optional dependency that can be installed with the "mcp" extra. Core vector search functionality is available without the MCP server dependency.
211 lines
7.9 KiB
Python
211 lines
7.9 KiB
Python
"""Abstract base engine for vector search operations.
|
|
|
|
This module defines the abstract interface for vector search engines using
|
|
generic types to ensure type safety across different backend implementations.
|
|
|
|
The BaseEngine class uses two generic type parameters:
|
|
- ResponseType: The raw response type returned by the backend's search API
|
|
- ConditionType: The backend-specific filter/condition type
|
|
|
|
This design allows each engine implementation to use its native types while
|
|
maintaining a consistent interface for the semantic search workflow.
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from typing import TypeVar
|
|
|
|
from ..models import Chunk, Condition, SearchRow
|
|
|
|
ResponseType = TypeVar("ResponseType")
|
|
ConditionType = TypeVar("ConditionType")
|
|
ChunkType = TypeVar("ChunkType")
|
|
|
|
__all__ = ["BaseEngine"]
|
|
|
|
|
|
class BaseEngine[ResponseType, ConditionType, ChunkType](ABC):
|
|
"""Abstract base class for vector search engines.
|
|
|
|
This class defines the interface that all vector search engine implementations
|
|
must follow. It uses generic types to ensure type safety while allowing
|
|
different backends to use their native response and condition types.
|
|
|
|
Type Parameters:
|
|
ResponseType: The raw response type returned by the backend's search API.
|
|
For example, list[ScoredPoint] for Qdrant.
|
|
ConditionType: The backend-specific filter/condition type.
|
|
For example, models.Filter for Qdrant.
|
|
ChunkType: The backend-specific chunk type.
|
|
For example, models.Point for Qdrant.
|
|
|
|
The class implements the Template Method pattern where semantic_search()
|
|
orchestrates calls to the abstract methods that subclasses must implement.
|
|
|
|
Example:
|
|
>>> class MyEngine(BaseEngine[MyResponse, MyCondition]):
|
|
... def transform_conditions(self, conditions):
|
|
... # Convert generic Condition objects to MyCondition
|
|
... return my_condition
|
|
...
|
|
... def transform_response(self, response):
|
|
... # Convert MyResponse to list[SearchRow]
|
|
... return search_rows
|
|
...
|
|
... async def run_similarity_query(self, embedding, collection, ...):
|
|
... # Execute backend-specific search
|
|
... return my_response
|
|
|
|
"""
|
|
|
|
@abstractmethod
|
|
def transform_conditions(
|
|
self, conditions: list[Condition] | None
|
|
) -> ConditionType | None:
|
|
"""Transform generic conditions to backend-specific filter format.
|
|
|
|
This method converts the generic Condition objects (Match, MatchAny,
|
|
MatchExclude) into the specific filter format required by the backend
|
|
vector database.
|
|
|
|
Args:
|
|
conditions: List of generic condition objects to apply, or None
|
|
for no filtering.
|
|
|
|
Returns:
|
|
Backend-specific filter object, or None if no conditions provided.
|
|
The exact type depends on the ConditionType generic parameter.
|
|
|
|
Example:
|
|
For Qdrant, this might convert:
|
|
>>> conditions = [Match(key="category", value="tech")]
|
|
>>> qdrant_filter = transform_conditions(conditions)
|
|
>>> # Returns models.Filter(must=[...])
|
|
|
|
"""
|
|
...
|
|
|
|
@abstractmethod
|
|
def transform_response(self, response: ResponseType) -> list[SearchRow]:
|
|
"""Transform backend-specific response to standardized SearchRow format.
|
|
|
|
This method converts the raw response from the backend vector database
|
|
into a list of SearchRow objects with standardized structure.
|
|
|
|
Args:
|
|
response: Raw response from the backend search API. The exact type
|
|
depends on the ResponseType generic parameter.
|
|
|
|
Returns:
|
|
List of SearchRow objects containing chunk_id, score, and payload
|
|
for each search result.
|
|
|
|
Example:
|
|
For Qdrant, this might convert:
|
|
>>> response = [ScoredPoint(id=1, score=0.9, payload={...})]
|
|
>>> search_rows = transform_response(response)
|
|
>>> # Returns [SearchRow(chunk_id="1", score=0.9, payload={...})]
|
|
|
|
"""
|
|
...
|
|
|
|
@abstractmethod
|
|
async def run_similarity_query(
|
|
self,
|
|
embedding: list[float],
|
|
collection: str,
|
|
limit: int = 10,
|
|
conditions: ConditionType | None = None,
|
|
threshold: float | None = None,
|
|
) -> ResponseType:
|
|
"""Execute similarity search query against the backend vector database.
|
|
|
|
This method performs the actual vector similarity search using the
|
|
backend's native API. It accepts backend-specific conditions and
|
|
returns the raw backend response.
|
|
|
|
Args:
|
|
embedding: Query vector as a list of floats.
|
|
collection: Name of the collection/index to search in.
|
|
limit: Maximum number of results to return. Defaults to 10.
|
|
conditions: Backend-specific filter conditions, or None for no filtering.
|
|
threshold: Minimum similarity score threshold, or None for no threshold.
|
|
|
|
Returns:
|
|
Raw response from the backend API. The exact type depends on the
|
|
ResponseType generic parameter.
|
|
|
|
Example:
|
|
For Qdrant:
|
|
>>> response = await run_similarity_query(
|
|
... embedding=[0.1, 0.2, 0.3],
|
|
... collection="documents",
|
|
... limit=5,
|
|
... conditions=models.Filter(...),
|
|
... threshold=0.7
|
|
... )
|
|
>>> # Returns list[models.ScoredPoint]
|
|
|
|
"""
|
|
...
|
|
|
|
async def semantic_search(
|
|
self,
|
|
embedding: list[float],
|
|
collection: str,
|
|
limit: int = 10,
|
|
conditions: list[Condition] | None = None,
|
|
threshold: float | None = None,
|
|
) -> list[SearchRow]:
|
|
"""Perform semantic search with generic interface.
|
|
|
|
This is the main public method that orchestrates the complete search
|
|
workflow. It handles the conversion between generic types and backend-
|
|
specific types, making it easy to use regardless of the underlying
|
|
vector database.
|
|
|
|
The method follows this workflow:
|
|
1. Transform generic conditions to backend-specific format
|
|
2. Execute the similarity query using backend API
|
|
3. Transform the response to standardized SearchRow format
|
|
|
|
Args:
|
|
embedding: Query vector as a list of floats.
|
|
collection: Name of the collection/index to search in.
|
|
limit: Maximum number of results to return. Defaults to 10.
|
|
conditions: List of generic filter conditions, or None for no filtering.
|
|
threshold: Minimum similarity score threshold, or None for no threshold.
|
|
|
|
Returns:
|
|
List of SearchRow objects with chunk_id, score, and payload.
|
|
|
|
Example:
|
|
>>> results = await engine.semantic_search(
|
|
... embedding=[0.1, 0.2, 0.3, 0.4, 0.5],
|
|
... collection="documents",
|
|
... limit=5,
|
|
... conditions=[Match(key="category", value="tech")],
|
|
... threshold=0.7
|
|
... )
|
|
>>> for result in results:
|
|
... print(f"ID: {result.chunk_id}, Score: {result.score}")
|
|
|
|
"""
|
|
transformed_conditions = self.transform_conditions(conditions)
|
|
response = await self.run_similarity_query(
|
|
embedding, collection, limit, transformed_conditions, threshold
|
|
)
|
|
return self.transform_response(response)
|
|
|
|
@abstractmethod
|
|
async def create_index(self, name: str, size: int) -> bool: ...
|
|
|
|
@abstractmethod
|
|
def transform_chunk(self, chunk: Chunk) -> ChunkType: ...
|
|
|
|
@abstractmethod
|
|
async def run_upload_chunk(self, index_name: str, chunk: ChunkType) -> bool: ...
|
|
|
|
async def upload_chunk(self, index_name: str, chunk: Chunk) -> bool:
|
|
transformed_chunk = self.transform_chunk(chunk)
|
|
return await self.run_upload_chunk(index_name, transformed_chunk)
|