import logging from typing import List, Dict, Any from fastapi import WebSocket, WebSocketDisconnect logger = logging.getLogger(__name__) class ConnectionManager: """Tracks active websocket connections and broadcasts events.""" def __init__(self) -> None: self.active_connections: List[WebSocket] = [] async def register(self, websocket: WebSocket) -> None: await websocket.accept() self.active_connections.append(websocket) logger.info("WebSocket connected. Total clients: %s", len(self.active_connections)) def unregister(self, websocket: WebSocket) -> None: if websocket in self.active_connections: self.active_connections.remove(websocket) logger.info("WebSocket disconnected. Total clients: %s", len(self.active_connections)) async def broadcast_event(self, event_type: str, data: Dict[str, Any]) -> None: """Broadcast a generic JSON event to all clients.""" payload = { "type": event_type, **data } stale_connections: list[WebSocket] = [] for connection in self.active_connections: try: await connection.send_json(payload) except Exception as exc: # pragma: no cover logger.warning("Failed sending payload to websocket: %s", exc) stale_connections.append(connection) for connection in stale_connections: self.unregister(connection) async def handle_connection(self, websocket: WebSocket) -> None: """Accept and keep the websocket connection alive.""" await self.register(websocket) try: while True: # We keep the websocket open; we do not expect client messages yet. await websocket.receive_text() except WebSocketDisconnect: self.unregister(websocket)