cinematch-ai / agents /modal_orchestrator.py
dbadeev's picture
Upload 25 files
16ce932 verified
# agents/modal_orchestrator.py
import logging
import modal
from memory.session_store import SessionStore
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ModalMovieSearchOrchestrator:
def __init__(self):
# 1. ПОДКЛЮЧЕНИЕ К УДАЛЕННЫМ АГЕНТАМ (Lookup)
# Мы ищем функции в задеплоенном приложении "tmdb-project"
try:
# Функции управления лимитами и слотами
self.fn_check_limit = modal.Function.from_name("tmdb-project", "check_daily_limit_remote")
self.fn_acquire_slot = modal.Function.from_name("tmdb-project", "try_acquire_slot")
self.fn_release_slot = modal.Function.from_name("tmdb-project", "release_slot")
# Функции координатора
self.fn_coordinator_check = modal.Function.from_name("tmdb-project", "process_coordinator_check")
self.fn_coordinator_suggestion = modal.Function.from_name("tmdb-project", "process_coordinator_suggestion")
# Функция полного пайплайна (Editor -> Critic -> Retriever -> Expert -> ShadowEval)
self.fn_full_pipeline = modal.Function.from_name("tmdb-project", "process_full_pipeline")
logger.info("✅ All Modal functions connected successfully via Lookup")
self.functions_available = True
except Exception as e:
logger.error(f"❌ Failed to connect to Modal functions: {e}")
self.functions_available = False
# 2. Инициализация хранилища сессий (локально на HF)
self.session_store = SessionStore()
self.sid = self.session_store.create_session()
def check_and_update_daily_limit(self, max_daily_requests=100) -> bool:
"""Проверяет лимит через удаленную функцию"""
if not self.functions_available: return True
try:
return self.fn_check_limit.remote(max_limit=max_daily_requests)
except Exception as e:
logger.error(f"Failed to check limit: {e}")
return True
async def process_user_input(self, user_text: str) -> dict:
"""
Главный цикл обработки.
"""
logger.info(f"Processing input: {user_text}")
# 1. ПРОВЕРКА ЛИМИТА
if not self.check_and_update_daily_limit(max_daily_requests=100):
return {
"status": "error",
"message": "⚠️ **Daily Demo Limit Reached**\nPlease come back tomorrow!"
}
self.session_store.add_user_input(self.sid, user_text)
session = self.session_store.get_session(self.sid)
# ------------------------------------------------------------------
# ЭТАП 1: Обработка ответа на предложение (1/0/2)
# ------------------------------------------------------------------
if session.get("suggested_plot"):
user_input_cleaned = user_text.strip().lower()
if user_input_cleaned == "1": # YES
user_text = session["suggested_plot"]
self.session_store.update_state(self.sid, "suggested_plot", None)
self.session_store.reset_attempts(self.sid)
self.session_store.set_custom_mode(self.sid, False)
elif user_input_cleaned == "0": # NO
self.session_store.update_state(self.sid, "suggested_plot", None)
self.session_store.increment_attempts(self.sid)
self.session_store.set_custom_mode(self.sid, False)
elif user_input_cleaned == "2": # CUSTOM
self.session_store.update_state(self.sid, "suggested_plot", None)
self.session_store.set_custom_mode(self.sid, True)
self.session_store.reset_attempts(self.sid)
msg = "**📝 Custom Plot Entry Mode**\nPlease provide your own movie plot description (min 50 words)."
self.session_store.add_history(self.sid, user_text, msg)
return {"status": "awaiting_custom_plot", "message": msg, "custom_mode": True}
else: # IMPLICIT AGREE/CUSTOM
self.session_store.update_state(self.sid, "suggested_plot", None)
self.session_store.set_custom_mode(self.sid, True) # Treat as custom input
self.session_store.reset_attempts(self.sid)
session = self.session_store.get_session(self.sid)
# ------------------------------------------------------------------
# ЭТАП 2: Логика предложений (Suggestions)
# ------------------------------------------------------------------
attempts = session.get("attempts", 0)
user_inputs = session.get("user_inputs", [])
if attempts == 2:
logger.info("Suggesting Romantic")
# Вызов удаленной функции
suggestion = await self.fn_coordinator_suggestion.remote.aio(user_inputs, "romantic")
self.session_store.update_state(self.sid, "suggested_plot", suggestion["suggested_story"])
msg = f"{suggestion['message']}\n\n**Proposed Plot (Romantic):**\n_{suggestion['suggested_story']}_\n\n{'='*60}\n**📍 Options:** (1) Accept, (0) Next Genre, (2) Custom Plot"
self.session_store.add_history(self.sid, user_text, msg)
return {"status": "suggestion", "message": msg, "is_suggestion": True}
if attempts == 3:
logger.info("Suggesting Humorous")
suggestion = await self.fn_coordinator_suggestion.remote.aio(user_inputs, "humorous")
self.session_store.update_state(self.sid, "suggested_plot", suggestion["suggested_story"])
msg = f"{suggestion['message']}\n\n**Proposed Plot (Humorous):**\n_{suggestion['suggested_story']}_\n\n{'='*60}\n**📍 Options:** (1) Accept, (0) End Session, (2) Custom Plot"
self.session_store.add_history(self.sid, user_text, msg)
return {"status": "suggestion", "message": msg, "is_suggestion": True}
if attempts >= 4:
msg = "**👋 Session Ended**\n\nIt seems we can't find the right story today."
self.session_store.add_history(self.sid, user_text, msg)
return {"status": "end_session", "message": msg, "end_session": True}
# ------------------------------------------------------------------
# ЭТАП 3: Строгая проверка (Custom Mode)
# ------------------------------------------------------------------
if session.get("custom_plot_mode", False):
word_count = len(user_text.split())
if word_count < 50:
msg = f"**❌ Too Short ({word_count} words)**\n\nRequired: 50+ words."
self.session_store.add_history(self.sid, user_text, msg)
self.session_store.set_custom_mode(self.sid, False)
return {"status": "custom_plot_too_short", "message": msg, "end_session": True}
try:
# Удаленный вызов
analysis = await self.fn_coordinator_check.remote.aio(user_text, attempts)
if analysis["status"] == "insufficient":
msg = f"**❌ Validation Failed**\n\n{analysis.get('message')}"
self.session_store.add_history(self.sid, user_text, msg)
self.session_store.set_custom_mode(self.sid, False)
self.session_store.reset_attempts(self.sid)
return {"status": "custom_plot_rejected", "message": msg, "end_session": True}
self.session_store.set_custom_mode(self.sid, False)
self.session_store.reset_attempts(self.sid)
except Exception as e:
logger.error(f"Error in custom mode: {e}")
self.session_store.set_custom_mode(self.sid, False)
return {"status": "error", "message": "System error", "end_session": True}
# ------------------------------------------------------------------
# ЭТАП 4: Стандартная проверка
# ------------------------------------------------------------------
else:
try:
# Удаленный вызов
analysis = await self.fn_coordinator_check.remote.aio(user_text, attempts)
if analysis["status"] == "insufficient":
self.session_store.increment_attempts(self.sid)
attempts_left = 2 - (attempts)
warning_msg = analysis.get("message", "Please add more details.")
if attempts_left > 0:
warning_msg += f"\n\n_You have {attempts_left} attempt(s) left before I suggest a plot._"
self.session_store.add_history(self.sid, user_text, warning_msg)
return {"status": "insufficient_length", "message": warning_msg, "original_plot": user_text}
self.session_store.reset_attempts(self.sid)
except Exception as e:
logger.error(f"Coordinator check failed: {e}")
# ------------------------------------------------------------------
# ЭТАП 5: Пайплайн Поиска (Тяжелая артиллерия)
# ------------------------------------------------------------------
# 1. ЗАНИМАЕМ СЛОТ (Удаленный вызов)
slot_result = await self.fn_acquire_slot.remote.aio(self.sid, 3)
if not slot_result["success"]:
return {"status": "error", "message": slot_result["message"]}
try:
# 2. ЗАПУСК ПОЛНОГО ПАЙПЛАЙНА (Удаленный вызов)
# Вся логика (Editor -> Critic -> Retriever -> Expert) теперь там
result = await self.fn_full_pipeline.remote.aio(
session_id=self.sid,
user_text=user_text,
session_data=session
)
# Обновляем локальное состояние результатами
self.session_store.update_state(self.sid, "original_plot", result.get("original_plot"))
self.session_store.update_state(self.sid, "improved_plot", result.get("improved_plot"))
self.session_store.update_state(self.sid, "movie_overview", result.get("movie_overview"))
self.session_store.update_state(self.sid, "final_recommendations", result.get("recommendations"))
return result
except Exception as e:
logger.error(f"Pipeline error: {e}")
return {"status": "error", "message": f"System error: {e}"}
finally:
# 3. ОСВОБОЖДАЕМ СЛОТ (Удаленный вызов - spawn)
self.fn_release_slot.spawn(self.sid)
def get_conversation_summary(self):
session = self.session_store.get_session(self.sid)
if not session: return {}
return {
"session_id": session.get("session_id"),
"current_step": "completed" if session.get("final_recommendations") else "processing",
"attempts": session.get("attempts", 0),
"has_plot": bool(session.get("original_plot")),
"has_recommendations": bool(session.get("final_recommendations")),
"total_search_results": len(session.get("search_results", []))
}
def reset_conversation(self):
self.session_store.clear_session_data(self.sid)