Data-Science-Agent / src /progress_manager.py
Pulastya B
DEBUG: Log exceptions when queuing events to subscribers
b9c1a6b
"""
Global Progress Event Manager for Real-Time SSE Streaming
This module provides a singleton ProgressManager that captures all workflow progress
events and broadcasts them to connected SSE clients in real-time.
"""
import asyncio
import json
from typing import Dict, List, Any, Optional
from datetime import datetime
from collections import defaultdict
class ProgressManager:
"""
Manages progress events for active analysis sessions.
Features:
- Emit events to multiple subscribers simultaneously
- Store event history for late-joining clients
- Automatic cleanup of dead connections
- Thread-safe event broadcasting
"""
def __init__(self):
self._queues: Dict[str, List[asyncio.Queue]] = defaultdict(list)
self._history: Dict[str, List[Dict]] = defaultdict(list)
self._lock = asyncio.Lock()
def emit(self, session_id: str, event: Dict[str, Any]):
"""
Emit a progress event to all subscribers.
Args:
session_id: Session identifier
event: Event data (must include 'type' and 'message')
"""
print(f"[SSE] PROGRESS_MANAGER EMIT: session={session_id}, event_type={event.get('type')}, msg={event.get('message', '')[:50]}")
# Add timestamp
event['timestamp'] = datetime.now().isoformat()
# Store in history
self._history[session_id].append(event)
# Limit history size to prevent memory leaks
if len(self._history[session_id]) > 500:
self._history[session_id] = self._history[session_id][-500:]
print(f"[SSE] History stored, total events for {session_id}: {len(self._history[session_id])}")
# Send to all active subscribers
if session_id in self._queues:
print(f"[SSE] Found {len(self._queues[session_id])} subscribers for {session_id}")
dead_queues = []
for i, queue in enumerate(self._queues[session_id]):
try:
queue.put_nowait(event)
print(f"[SSE] Successfully queued event to subscriber {i+1}")
except asyncio.QueueFull:
print(f"[SSE] ERROR: Queue full for subscriber {i+1}")
dead_queues.append(queue)
except Exception as e:
print(f"[SSE] ERROR: Exception queuing event to subscriber {i+1}: {type(e).__name__}: {e}")
dead_queues.append(queue)
# Remove dead queues
for dead_queue in dead_queues:
if dead_queue in self._queues[session_id]:
self._queues[session_id].remove(dead_queue)
async def subscribe(self, session_id: str):
"""
Subscribe to progress events for a session.
Args:
session_id: Session identifier
Yields:
Progress events as they occur
"""
queue = asyncio.Queue(maxsize=100)
self._queues[session_id].append(queue)
try:
while True:
event = await queue.get()
print(f"[SSE] YIELDING event to client: type={event.get('type')}, msg={event.get('message', '')[:50]}")
yield event
except asyncio.CancelledError:
# Client disconnected
pass
finally:
# Cleanup
if session_id in self._queues and queue in self._queues[session_id]:
self._queues[session_id].remove(queue)
def get_history(self, session_id: str) -> List[Dict]:
"""
Get all past events for a session.
Args:
session_id: Session identifier
Returns:
List of past events
"""
return self._history.get(session_id, [])
def clear(self, session_id: str):
"""
Clear history and disconnect all subscribers for a session.
Args:
session_id: Session identifier
"""
if session_id in self._history:
del self._history[session_id]
if session_id in self._queues:
# Close all queues
for queue in self._queues[session_id]:
try:
queue.put_nowait({'type': 'session_cleared', 'message': 'Session ended'})
except:
pass
del self._queues[session_id]
def get_active_sessions(self) -> List[str]:
"""Get list of sessions with active subscribers."""
return [sid for sid, queues in self._queues.items() if len(queues) > 0]
def get_subscriber_count(self, session_id: str) -> int:
"""Get number of active subscribers for a session."""
return len(self._queues.get(session_id, []))
# Global singleton instance
progress_manager = ProgressManager()