""" State Management Module Manages global state for MCP connections. Supports multiple simultaneous server connections. === BIDIRECTIONAL COMMUNICATION PROTOCOL === This module implements a custom protocol extension for Client → Server state broadcasting. This enables MCP Servers to monitor Client state and react proactively. Protocol Definition: Method: notifications/echo/client_state Params: - state: One of PROTOCOL_STATES - reason: Human-readable reason (optional) - context: Additional context data (optional) - timestamp: ISO format timestamp Example: { "jsonrpc": "2.0", "method": "notifications/echo/client_state", "params": { "state": "blocking", "reason": "user_confirmation_required", "context": {"action": "rm -rf /tmp"}, "timestamp": "2025-11-30T12:00:00" } } Use Case: - AgentBell MCP Server can listen for "blocking" state - When received, it can play a voice notification BEFORE user needs to act - This is proactive, not reactive """ from typing import Optional, Tuple, Dict, Any from datetime import datetime from enum import Enum from core.mcp_client import create_connection_from_config, MCPConnection # ============================================================================= # BIDIRECTIONAL PROTOCOL: Client State Definitions # ============================================================================= class ClientState(Enum): """ Protocol-defined client states. Any MCP Client implementing this protocol should use these states. """ IDLE = "idle" # Client is idle, waiting for user input PROCESSING = "processing" # Client is processing (LLM thinking) EXECUTING = "executing" # Client is executing a tool BLOCKING = "blocking" # Client is blocked, waiting for user confirmation ERROR = "error" # Client encountered an error # Current client state _current_state: ClientState = ClientState.IDLE _state_context: Dict[str, Any] = {} def get_current_state() -> Tuple[ClientState, Dict[str, Any]]: """Returns the current client state and context.""" return _current_state, _state_context def broadcast_client_state( state: ClientState, reason: str = "", context: Dict[str, Any] = None ) -> None: """ Broadcast client state change to all connected MCP servers. This is the core of our bidirectional communication protocol. Servers can listen for these notifications to react proactively. Args: state: The new client state reason: Human-readable reason for the state change context: Additional context (e.g., tool name, action details) """ global _current_state, _state_context _current_state = state _state_context = context or {} # Build the notification payload notification = { "jsonrpc": "2.0", "method": "notifications/echo/client_state", "params": { "state": state.value, "reason": reason, "context": _state_context, "timestamp": datetime.now().isoformat() } } # Log to Inspector (this is how we visualize bidirectional communication) add_inspector_log( direction="OUT", method="echo/client_state", payload=f"state={state.value}, reason={reason}" ) # Note: Actually sending to MCP servers would require the SDK to support # sending arbitrary notifications. For now, we log to Inspector as proof. # In a full implementation, we would iterate over _connections and send. # ============================================================================= # CONNECTION MANAGEMENT # ============================================================================= # Multiple connections - keyed by server name _connections: Dict[str, MCPConnection] = {} def get_connection(server_name: str = None) -> Optional[MCPConnection]: """ Returns a connection by server name. If no name provided, returns the first available connection (for backwards compatibility). """ global _connections if server_name: return _connections.get(server_name) # Return first connection if any exist if _connections: return next(iter(_connections.values())) return None def get_all_connections() -> Dict[str, MCPConnection]: """Returns all active connections.""" global _connections return _connections.copy() def is_connected(server_name: str) -> bool: """Check if a specific server is connected.""" global _connections return server_name in _connections async def connect_server(server_name: str) -> Tuple[bool, str]: """ Connect to a specific MCP server. Returns (success, message). """ global _connections # Already connected? if server_name in _connections: return True, "Already connected" try: connection = create_connection_from_config(server_name) await connection.connect() _connections[server_name] = connection return True, "Connected" except Exception as e: return False, str(e) async def disconnect_server(server_name: str) -> Tuple[bool, str]: """ Disconnect from a specific MCP server. Returns (success, message). """ global _connections if server_name not in _connections: return False, "Not connected" try: connection = _connections[server_name] await connection.disconnect() del _connections[server_name] return True, "Disconnected" except Exception as e: # Still remove from dict even if disconnect fails if server_name in _connections: del _connections[server_name] return False, str(e) async def disconnect_all() -> None: """Disconnect from all servers.""" global _connections for server_name in list(_connections.keys()): await disconnect_server(server_name) # Legacy function for backwards compatibility async def ensure_connection(server_name: str = "agentbell-voice") -> Tuple[bool, str]: """ Legacy function - ensures a connection exists. Now just calls connect_server. """ return await connect_server(server_name) # Inspector Logs _inspector_logs: list = [] def add_inspector_log(direction: str, method: str, payload: str): """Add a log entry to the inspector.""" global _inspector_logs entry = { "direction": direction, # "IN" or "OUT" "method": method, "payload": payload, "timestamp": "00:00:00" # TODO: Add real timestamp } _inspector_logs.append(entry) # Keep only last 100 logs if len(_inspector_logs) > 100: _inspector_logs.pop(0) def get_inspector_logs(): """Get all inspector logs.""" global _inspector_logs return _inspector_logs.copy() def clear_inspector_logs(): global _inspector_logs _inspector_logs = []