Files
knowledge-search-mcp/src/knowledge_search_mcp/clients/base.py
Anibal Angulo d39b8a6ea7
All checks were successful
CI / lint (pull_request) Successful in 10s
CI / typecheck (pull_request) Successful in 11s
CI / test (pull_request) Successful in 25s
Add CI
2026-03-05 21:59:22 +00:00

31 lines
1.0 KiB
Python

"""Base client with shared aiohttp session management."""
import aiohttp
class BaseGoogleCloudClient:
"""Base class with shared aiohttp session management."""
def __init__(self) -> None:
"""Initialize session tracking."""
self._aio_session: aiohttp.ClientSession | None = None
def _get_aio_session(self) -> aiohttp.ClientSession:
"""Get or create aiohttp session with connection pooling."""
if self._aio_session is None or self._aio_session.closed:
connector = aiohttp.TCPConnector(
limit=300,
limit_per_host=50,
)
timeout = aiohttp.ClientTimeout(total=60)
self._aio_session = aiohttp.ClientSession(
timeout=timeout,
connector=connector,
)
return self._aio_session
async def close(self) -> None:
"""Close aiohttp session if open."""
if self._aio_session and not self._aio_session.closed:
await self._aio_session.close()