Source code for mcp_multi_server.sync_client

"""
Synchronous wrapper for MCP MultiServerClient with background event loop.

This module provides SyncMultiServerClient, a context manager that wraps
the async MultiServerClient from mcp_multi_server in a synchronous interface
using a background thread with persistent event loop.
"""

import asyncio
import atexit
import logging
import threading
from concurrent.futures import TimeoutError as FuturesTimeoutError
from datetime import timedelta
from pathlib import Path
from typing import (
    Any,
    Dict,
    Literal,
    Optional,
    Union,
)

from pydantic import AnyUrl

from mcp.shared.session import ProgressFnT
from mcp.types import (
    CallToolResult,
    EmptyResult,
    GetPromptResult,
    ListPromptsResult,
    ListResourcesResult,
    ListResourceTemplatesResult,
    ListToolsResult,
    LoggingLevel,
    PaginatedRequestParams,
    ReadResourceResult,
    TextContent,
)

from .client import MultiServerClient


logger = logging.getLogger(__name__)


[docs] class SyncMultiServerClient: """Manages MCP multi server client in a background thread with persistent event loop. This class provides a synchronous interface to the async MultiServerClient, making it easier to integrate with synchronous code while maintaining the efficiency of async operations. Usage: # Context manager (recommended) with SyncMultiServerClient(config_path) as client: tools = client.list_tools() result = client.call_tool("tool_name", {"arg": "value"}) # Manual lifecycle management client = SyncMultiServerClient(config_path) tools = client.list_tools() # ... use client ... client.shutdown() Thread Safety: All public methods are thread-safe, using asyncio.run_coroutine_threadsafe() to schedule operations on the background event loop. """
[docs] def __init__( self, config_path: Optional[Union[str, Path]] = None, config_dict: Optional[Dict[str, Any]] = None, ): """Initialize SyncMultiServerClient. Starts background thread and initializes MCP client during construction. Automatically registers cleanup handler to ensure proper shutdown on program exit. Args: config_path: Path to MCP configuration file. config_dict: Configuration dictionary (alternative to config_path). Raises: ValueError: If neither or both config_path and config_dict are provided. Exception: If MCP client initialization fails. """ if (config_path is None) == (config_dict is None): raise ValueError("Exactly one of config_path or config_dict must be provided") self.config_path = config_path self.config_dict = config_dict self.mcp_client: Optional[MultiServerClient] = None self.loop: Optional[asyncio.AbstractEventLoop] = None self.thread: Optional[threading.Thread] = None self._shutdown = False self._init_complete = threading.Event() # For cross-thread signaling self._loop_ready = threading.Event() # Signal when event loop is ready self._lifecycle_future = None # Will hold the lifecycle task future # Start background thread self._start_background_loop() # Start long-running lifecycle management task self._lifecycle_future = asyncio.run_coroutine_threadsafe( self._manage_client_lifecycle(), self.loop # type: ignore ) # Wait for initialization to complete (blocks until MCP client is ready) if not self._init_complete.wait(timeout=30): raise RuntimeError("MCP client initialization timed out after 30 seconds") # Check if lifecycle task failed during initialization # (future is done = error occurred, future still running = success) if self._lifecycle_future.done(): exc = self._lifecycle_future.exception() if exc: raise exc # Register automatic cleanup on program exit # This ensures MCP client is properly shutdown when the program terminates, atexit.register(self.shutdown)
[docs] @classmethod def from_config(cls, config_path: Union[str, Path]) -> "SyncMultiServerClient": """Create a client from a configuration file path. This is a convenience class method that's equivalent to calling the constructor with a config_path argument. Args: config_path: Path to the JSON configuration file. Returns: A new SyncMultiServerClient instance. Examples: >>> client = SyncMultiServerClient.from_config("mcp_config.json") """ return cls(config_path=config_path)
[docs] @classmethod def from_dict(cls, config_dict: Dict[str, Any]) -> "SyncMultiServerClient": """Create a client from a configuration dictionary. This method allows programmatic configuration without needing a JSON file. Args: config_dict: Dictionary containing server configurations in the same format as the JSON file (with "mcpServers" key). Returns: A new SyncMultiServerClient instance with the provided configuration. Examples: >>> config = { ... "mcpServers": { ... "tool_server": { ... "command": "python", ... "args": ["-m", "my_package.tool_server"] ... } ... } ... } >>> client = SyncMultiServerClient.from_dict(config) """ return cls(config_dict=config_dict)
def _start_background_loop(self) -> None: """Start background thread with event loop.""" self.thread = threading.Thread(target=self._run_event_loop, daemon=True, name="MCPClientThread") self.thread.start() # Wait for loop to be ready (blocks efficiently, no CPU burn) self._loop_ready.wait() def _run_event_loop(self) -> None: """Run persistent event loop in background thread.""" self.loop = asyncio.new_event_loop() asyncio.set_event_loop(self.loop) self._loop_ready.set() # Signal that loop is ready self.loop.run_forever() async def _manage_client_lifecycle(self) -> None: """Long-running task that manages the entire MCP client context lifecycle. This ensures __aenter__ and __aexit__ happen in the same async task, preventing "cancel scope in different task" errors. Flow: 1. Enter MCP client async context (__aenter__) 2. Signal initialization complete 3. Stay alive until shutdown requested 4. Exit context (__aexit__) in the SAME task """ try: # Enter the MCP client context (creates cancel scope in THIS task) if self.config_path is not None: self.mcp_client = MultiServerClient.from_config(self.config_path) else: self.mcp_client = MultiServerClient.from_dict(self.config_dict) # type: ignore await self.mcp_client.__aenter__() # Signal that initialization is complete self._init_complete.set() # Stay alive until shutdown is requested # This keeps the cancel scope alive in this task while not self._shutdown: await asyncio.sleep(0.1) except Exception as e: logger.error("Error in MCP client lifecycle: %s", e) self._init_complete.set() # Unblock __init__ even on error raise finally: # Exit the context in the SAME task (no cancel scope error!) if self.mcp_client is not None: try: await self.mcp_client.__aexit__(None, None, None) except Exception as e: logger.error("Error closing MCP client: %s", e)
[docs] def shutdown(self) -> None: """Shutdown background thread and cleanup MCP client. Safe to call multiple times. Waits up to 10 seconds for graceful cleanup. """ logger.debug("Shutting down SyncMultiServerClient...") if self.loop is not None and not self._shutdown: self._shutdown = True # Deadlock prevention: if called from event loop thread, # we can't block waiting on the lifecycle future if threading.current_thread() is self.thread: self.loop.call_soon(self.loop.stop) return try: # Signal shutdown and wait for lifecycle task to complete # The task will exit the MCP client context properly if self._lifecycle_future is not None: self._lifecycle_future.result(timeout=10) logger.debug("MCP client closed successfully") except Exception as e: # Errors expected during interpreter shutdown logger.warning("Error during MCP client shutdown: %s", e) try: # Stop event loop self.loop.call_soon_threadsafe(self.loop.stop) # Wait for thread to finish if self.thread is not None: self.thread.join(timeout=5) except Exception: # Thread might already be stopped during interpreter shutdown pass
def __enter__(self) -> "SyncMultiServerClient": """Enter context manager.""" return self def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> Literal[False]: """Exit context manager and cleanup.""" self.shutdown() return False # Don't suppress exceptions
[docs] def set_logging_level(self, level: LoggingLevel) -> EmptyResult: """Set the logging level for the multi-server client and the MCP connected servers. Args: level: Logging level as a string in lower case (e.g., "debug", "info", "notice", "warning", "error", "critical", "alert", "emergency") as defined in MCP LoggingLevel. Note: The following mappings of MCP to Python logging leves are applied: - "notice" -> "WARNING" - "alert" and "emergency" -> "CRITICAL" Examples: >>> SyncMultiServerClient.set_logging_level("debug") """ if level not in {"debug", "info", "notice", "warning", "error", "critical", "alert", "emergency"}: raise ValueError( f"""" Invalid logging level: {level}. See: https://modelcontextprotocol.github.io/python-sdk/api/#mcp.ClientSession.set_logging_level") """ ) if level == "notice": level = "warning" elif level in ("alert", "emergency"): level = "critical" if self.loop is None or self.mcp_client is None: raise RuntimeError("MCP client not initialized") # Schedule async call on background event loop future = asyncio.run_coroutine_threadsafe(self._set_logging_level_async(level), self.loop) future.result() # Wait for completion return EmptyResult()
async def _set_logging_level_async(self, level: LoggingLevel) -> None: """Async implementation of set_logging_level (runs in background loop).""" if self.mcp_client is None: raise ValueError("MCP client not initialized") await self.mcp_client.set_logging_level(level=level) @property def capabilities(self) -> Dict[str, Any]: """Get combined capabilities from all connected MCP servers. Returns: Dictionary containing combined capabilities from all servers. Returns empty dict if client not initialized or error occurs. """ if self.mcp_client is None: return {} try: return self.mcp_client.capabilities except Exception as e: logger.error("Error getting MCP capabilities: %s", e) return {}
[docs] def list_tools( self, cursor: Optional[str] = None, *, params: Optional[PaginatedRequestParams] = None, **kwargs: Any ) -> ListToolsResult: """List available MCP tools in raw MCP format. Returns: List of MCP Tool objects (not converted to OpenAI format). Returns empty list if client not initialized or error occurs. """ if self.mcp_client is None: return ListToolsResult(tools=[]) try: return self.mcp_client.list_tools(cursor=cursor, params=params, **kwargs) except Exception as e: logger.error("Error listing MCP tools: %s", e) return ListToolsResult(tools=[])
[docs] def list_prompts( self, cursor: Optional[str] = None, *, params: Optional[PaginatedRequestParams] = None, **kwargs: Any ) -> ListPromptsResult: """Get combined list of all prompts from all servers. Returns: ListPromptsResult containing all prompts from all servers with server attribution in the serverName meta field. Returns empty list if client not initialized or error occurs. """ if self.mcp_client is None: return ListPromptsResult(prompts=[], nextCursor=None) try: return self.mcp_client.list_prompts(cursor=cursor, params=params, **kwargs) except Exception as e: logger.error("Error listing prompts: %s", e) return ListPromptsResult(prompts=[], nextCursor=None)
[docs] def list_resources( self, cursor: Optional[str] = None, *, params: Optional[PaginatedRequestParams] = None, use_namespace: bool = True, **kwargs: Any, ) -> ListResourcesResult: """Get combined list of all resources from all servers. Args: use_namespace: Whether to namespace URIs with server name (default True). When True, URIs are in format "server_name:original_uri" for auto-routing. Returns: ListResourcesResult containing all resources from all servers with server attribution in the serverName meta field. Returns empty list if client not initialized or error occurs. """ if self.mcp_client is None: return ListResourcesResult(resources=[], nextCursor=None) try: return self.mcp_client.list_resources(cursor=cursor, params=params, use_namespace=use_namespace, **kwargs) except Exception as e: logger.error("Error listing resources: %s", e) return ListResourcesResult(resources=[], nextCursor=None)
[docs] def list_resource_templates( self, cursor: Optional[str] = None, *, params: Optional[PaginatedRequestParams] = None, use_namespace: bool = True, **kwargs: Any, ) -> ListResourceTemplatesResult: """Get combined list of all resource templates from all servers. Args: use_namespace: Whether to namespace URI templates with server name (default True). When True, templates are in format "server_name:original_template". Returns: ListResourceTemplatesResult containing all templates from all servers with server attribution in the serverName meta field. Returns empty list if client not initialized or error occurs. """ if self.mcp_client is None: return ListResourceTemplatesResult(resourceTemplates=[], nextCursor=None) try: return self.mcp_client.list_resource_templates( cursor=cursor, params=params, use_namespace=use_namespace, **kwargs ) except Exception as e: logger.error("Error listing resource templates: %s", e) return ListResourceTemplatesResult(resourceTemplates=[], nextCursor=None)
def _create_error_result(self, error_message: str) -> CallToolResult: """Create a CallToolResult indicating an error. Args: error_message: The error message to include in the result. Returns: CallToolResult with isError=True and the error message in content. """ return CallToolResult( content=[TextContent(type="text", text=error_message)], isError=True, )
[docs] def call_tool( self, name: str, arguments: Dict, read_timeout_seconds: Optional[timedelta] = None, progress_callback: Optional[ProgressFnT] = None, timeout: Optional[float] = None, *, meta: Optional[dict[str, Any]] = None, server_name: Optional[str] = None, **kwargs: Any, ) -> CallToolResult: """Call MCP tool synchronously with optional timeout. Args: name: Name of the tool to call arguments: Tool arguments as dictionary timeout: Maximum seconds to wait. None means wait forever. Returns: CallToolResult object. If timeout occurs, returns an error result. """ if self.loop is None or self.mcp_client is None: return self._create_error_result("MCP client not initialized") future = asyncio.run_coroutine_threadsafe( self._call_tool_async( name, arguments, read_timeout_seconds=read_timeout_seconds, progress_callback=progress_callback, meta=meta, server_name=server_name, **kwargs, ), self.loop, ) try: return future.result(timeout=timeout) except FuturesTimeoutError: return self._create_error_result(f"MCP tool '{name}' timed out after {timeout} seconds")
async def _call_tool_async( self, name: str, arguments: Dict, read_timeout_seconds: Optional[timedelta] = None, progress_callback: Optional[ProgressFnT] = None, *, meta: Optional[dict[str, Any]] = None, server_name: Optional[str] = None, **kwargs: Any, ) -> CallToolResult: """Async implementation of tool call (runs in background loop). Args: name: Name of the tool to call arguments: Tool arguments Returns: Processed tool result as CallToolResult object """ try: if self.mcp_client is None: raise ValueError("MCP client not initialized") return await self.mcp_client.call_tool( name=name, arguments=arguments, read_timeout_seconds=read_timeout_seconds, progress_callback=progress_callback, meta=meta, server_name=server_name, **kwargs, ) except Exception as e: logger.error("Error calling MCP tool '%s': %s", name, e) return self._create_error_result(f"Error calling MCP tool '{name}': {e}")
[docs] def read_resource( self, uri: Union[str, AnyUrl], server_name: Optional[str] = None, timeout: Optional[float] = None, **kwargs: Any, ) -> ReadResourceResult: """Read a resource with optional auto-routing via namespaced URIs. Args: uri: Resource URI. Can be namespaced as "server:uri" for auto-routing. URIs from list_resources() are already namespaced for convenience. server_name: Optional explicit server name. If provided, assumes that there is no namespace in the provided URI. timeout: Maximum seconds to wait. None means wait forever. Returns: ReadResourceResult containing the resource content. Returns empty result if client not initialized or timeout occurs. Examples: >>> # Auto-routing with namespaced URI (from list_resources()) >>> resources = client.list_resources().resources >>> result = client.read_resource(resources[0].uri) >>> # Explicit server (no namespace in URI) >>> result = client.read_resource("file:///path", server_name="filesystem") >>> # Manual namespacing >>> result = client.read_resource("filesystem:file:///path") """ if self.loop is None or self.mcp_client is None: return ReadResourceResult(contents=[]) future = asyncio.run_coroutine_threadsafe( self._read_resource_async(uri=uri, server_name=server_name, **kwargs), self.loop ) try: return future.result(timeout=timeout) except FuturesTimeoutError: logger.error("Read resource timed out after %s seconds", timeout) return ReadResourceResult(contents=[])
async def _read_resource_async( self, uri: Union[str, AnyUrl], server_name: Optional[str], **kwargs: Any ) -> ReadResourceResult: """Async implementation of read_resource (runs in background loop).""" if self.mcp_client is None: raise ValueError("MCP client not initialized") return await self.mcp_client.read_resource(uri=uri, server_name=server_name, **kwargs)
[docs] def get_prompt( self, name: str, arguments: Optional[Dict[str, str]] = None, server_name: Optional[str] = None, timeout: Optional[float] = None, **kwargs: Any, ) -> GetPromptResult: """Get a prompt by automatically routing to the appropriate server. Args: name: Name of the prompt to get. arguments: Optional arguments for the prompt. server_name: Optional server name to explicitly specify which server to use. If not provided, the server will be automatically determined from the prompt name. timeout: Maximum seconds to wait. None means wait forever. Returns: GetPromptResult containing the prompt messages. Returns empty result if client not initialized or timeout occurs. Examples: >>> # Auto-routing (prompt name determines server) >>> result = client.get_prompt("greeting", arguments={"name": "World"}) >>> # Explicit server >>> result = client.get_prompt("greeting", server_name="prompts_server") """ if self.loop is None or self.mcp_client is None: return GetPromptResult(messages=[]) future = asyncio.run_coroutine_threadsafe( self._get_prompt_async(name=name, arguments=arguments, server_name=server_name, **kwargs), self.loop ) try: return future.result(timeout=timeout) except FuturesTimeoutError: logger.error("Get prompt '%s' timed out after %s seconds", name, timeout) return GetPromptResult(messages=[])
async def _get_prompt_async( self, name: str, arguments: Optional[Dict[str, str]], server_name: Optional[str], **kwargs: Any, ) -> GetPromptResult: """Async implementation of get_prompt (runs in background loop).""" if self.mcp_client is None: raise ValueError("MCP client not initialized") return await self.mcp_client.get_prompt(name, arguments=arguments, server_name=server_name, **kwargs)