# agents/expert_agent.py from llama_index.core.tools import FunctionTool from agents.nebius_simple import create_nebius_llm import datetime import logging import re import concurrent.futures logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def _create_top3_selection_tool() -> FunctionTool: """Инструмент выбора топ-3 фильмов (из вашего старого файла)""" def select_top3_movies(evaluated_movies: list, **kwargs) -> dict: """Select top 3 movies based on comprehensive scores""" try: # Сортировка по итоговому скору sorted_movies = sorted( evaluated_movies, key=lambda x: x.get('final_score', 0), reverse=True ) top3 = sorted_movies[:3] trimmed_for_log = [ ExpertAgent.trim_movie_data(m.get("movie_data", {})) for m in top3 ] logger.info("TOP-3 (trimmed): %s", trimmed_for_log) return { "top3_movies": top3, "selection_criteria": "Comprehensive weighted scoring", "total_evaluated": len(evaluated_movies), "score_range": { "highest": top3[0].get('final_score', 0) if top3 else 0, "lowest": top3[-1].get('final_score', 0) if top3 else 0 }, "selected_at": datetime.datetime.utcnow().isoformat() + "Z" } except Exception as e: return { "error": str(e), "top3_movies": evaluated_movies[:3] if evaluated_movies else [] } return FunctionTool.from_defaults( fn=select_top3_movies, name="select_top3", description="Choose top 3 movies from evaluated list" ) class ExpertAgent: def __init__(self, nebius_api_key: str): self.llm = create_nebius_llm( api_key=nebius_api_key, model="meta-llama/Llama-3.3-70B-Instruct-fast", temperature=0.2 ) # ✅ ВАЖНО: Подключаем системный промпт! # Теперь LLM будет знать свою роль во всех запросах self.llm.system_prompt = self._get_system_prompt() # Инициализация инструментов self.evaluation_tool = self._create_comprehensive_evaluation_tool() self.selection_tool = _create_top3_selection_tool() self.justification_tool = self._create_justification_tool() # Tools map для совместимости self.tools_map = { "comprehensive_evaluation": self.evaluation_tool, "select_top3": self.selection_tool, "create_justification": self.justification_tool } @staticmethod def _get_system_prompt() -> str: """Ваш оригинальный системный промпт""" return """You are an Expert Film Analysis Agent specializing in movie recommendation and analysis. Your responsibilities: 1. Analyze the semantic and narrative similarity between user queries and movies 2. Select the 3 most relevant movies from search results 3. Provide detailed justifications for your choices 4. Highlight specific plot, thematic, and structural similarities Use the Thought-Action-Observation cycle: - Think about the key elements of the user's query (themes, plot structure, genre, tone) - Analyze each movie candidate for multiple types of relevance - Score and rank movies based on comprehensive factors - Generate compelling explanations that demonstrate deep understanding Focus on: narrative structure, thematic resonance, character dynamics, emotional tone, genre elements, and plot mechanics. IMPORTANT: - Each justification **must be unique**; compare it with previously generated ones and re-write if too similar. - Avoid generic phrases like "strong alignment". Provide concrete plot or structural overlaps. """ @staticmethod def _trim_movie_data(movie_data: dict) -> dict: wanted = {"id", "title", "narrative_features"} return {k: movie_data.get(k) for k in wanted} @classmethod def trim_movie_data(cls, movie_data: dict) -> dict: return cls._trim_movie_data(movie_data) def analyze_and_recommend(self, user_query: str, search_results: list[dict]) -> dict: """ ПАРАЛЛЕЛЬНАЯ обработка с использованием оригинальных инструментов. """ # Берем топ-10 кандидатов для глубокого анализа candidates_to_analyze = search_results[:10] logger.info(f"⏳ Starting parallel analysis for {len(candidates_to_analyze)} candidates...") # 1. ПАРАЛЛЕЛЬНАЯ оценка (Scoring) evaluated = [] eval_fn = self.evaluation_tool.fn # Используем ThreadPool. # max_workers=5, так как внутри каждого вызова у вас делается еще 2 запроса к LLM (Genre + Title). # Итого на 10 фильмов будет 20 запросов. 5 потоков - безопасный баланс. with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: future_to_movie = {} for item in candidates_to_analyze: # Подготовка данных (копирование, чтобы не испортить ссылку) movie_data = dict(item.get("movie_data", {})) movie_data["semantic_score"] = item.get("semantic_score", 0) movie_data["narrative_similarity"] = item.get("narrative_similarity", 0) # Запуск future = executor.submit(eval_fn, user_query, movie_data) future_to_movie[future] = movie_data for future in concurrent.futures.as_completed(future_to_movie): md = future_to_movie[future] try: result = future.result() # Восстанавливаем movie_data для следующего шага result["movie_data"] = md evaluated.append(result) except Exception as e: logger.error(f"Error evaluating movie {md.get('title')}: {e}") # 2. Выбор топ-3 (локальная операция) top3_res = self.selection_tool.fn(evaluated) top3 = top3_res.get("top3_movies", []) logger.info("🏆 TOP-3 chosen: %s", [m.get("movie_title", "Unknown") for m in top3]) # 3. ПАРАЛЛЕЛЬНАЯ генерация обоснований (Justifications) # Используем ваш инструмент create_justification just_fn = self.justification_tool.fn cards = [None] * len(top3) top3_details = [None] * len(top3) with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor: future_to_idx = {} for idx, ev in enumerate(top3): md = ev.get("movie_data", {}) # Запускаем генерацию future = executor.submit(just_fn, user_query, md, ev) future_to_idx[future] = (idx, md, ev) for future in concurrent.futures.as_completed(future_to_idx): idx, md, ev = future_to_idx[future] try: just_res = future.result() justification = just_res.get("justification", "Error") # Формируем карточку (ваш метод) card = self._format_movie_card(md, justification, idx + 1) cards[idx] = card top3_details[idx] = { "rank": idx + 1, "movie_data": md, "evaluation": ev, "justification": justification, } except Exception as e: logger.error(f"Error generating justification for rank {idx + 1}: {e}") cards[idx] = f"Error generating details for movie {idx + 1}" # 4. Финальный ответ cards = [c for c in cards if c] top3_details = [d for d in top3_details if d] return { "selected_movies": top3_details, "explanations": "\n\n---\n\n".join(cards), "analysis_complete": True, "methodology": "Parallel optimized processing (Original prompts)", "evaluated_at": datetime.datetime.utcnow().isoformat() + "Z", } # =============================================================== # ИНСТРУМЕНТЫ def _create_comprehensive_evaluation_tool(self) -> FunctionTool: """Инструмент комплексной оценки фильмов по новой формуле (ВАШ КОД)""" llm = self.llm # Теперь у llm есть system_prompt from typing import Annotated def evaluate_movie_comprehensive(user_query: Annotated[str, "Original user plot"], movie_data: Annotated[dict, "Full JSON of ONE movie"]) -> dict: if "title" not in movie_data and isinstance(movie_data, dict): movie_data = dict(next(iter(movie_data.values()))) try: movie_title = movie_data.get('title', 'Unknown') genres = movie_data.get('genres', '') vote_average = float(movie_data.get('vote_average', 0)) imdb_rating = float(movie_data.get('imdb_rating', 0)) semantic_score = float(movie_data.get('semantic_score', 0)) narrative_similarity = float(movie_data.get('narrative_similarity', 0)) # 1. Оценка жанров (ВАШ ПРОМПТ) genre_prompt = f""" Evaluate genre alignment between user query and movie (0.0-1.0): User Query: "{user_query}" Movie Genres: "{genres}" IGNORE specific names, locations, characters. Focus on thematic content. Return only a number between 0.0 and 1.0. """ genre_response = llm.complete(genre_prompt) try: match = re.search(r'[0-9]*\.?[0-9]+', genre_response.text) genre_alignment = float(match.group()) if match else 0.5 genre_alignment = max(0.0, min(1.0, genre_alignment)) except: genre_alignment = 0.5 # 2. Оценка названия (ВАШ ПРОМПТ) title_prompt = f""" Evaluate title relevance to user query (0.0-1.0): User Query: "{user_query}" Movie Title: "{movie_title}" IGNORE exact name matches. Focus on thematic and conceptual relevance. Return only a number between 0.0 and 1.0. """ title_response = llm.complete(title_prompt) try: match = re.search(r'[0-9]*\.?[0-9]+', title_response.text) title_relevance = float(match.group()) if match else 0.3 title_relevance = max(0.0, min(1.0, title_relevance)) except: title_relevance = 0.3 # Нормализация normalized_vote_avg = vote_average / 10.0 if vote_average > 0 else 0.5 normalized_imdb = imdb_rating / 10.0 if imdb_rating > 0 else 0.5 # Формула final_score = ( semantic_score * 0.65 + narrative_similarity * 0.15 + genre_alignment * 0.04 + title_relevance * 0.04 + normalized_vote_avg * 0.02 + normalized_imdb * 0.10 ) return { "movie_title": movie_title, "final_score": round(final_score, 4), # movie_data добавим снаружи во wrapper-е } except Exception as e: logger.error(f"Error evaluating {movie_data.get('title', 'Unknown')}: {e}") return {"movie_title": "Error", "final_score": 0} return FunctionTool.from_defaults( fn=evaluate_movie_comprehensive, name="comprehensive_evaluation", description="Evaluate one movie using comprehensive weighted formula" ) def _create_justification_tool(self) -> FunctionTool: """Инструмент создания обоснований""" llm = self.llm def create_detailed_justification(user_query: str, movie_data: dict, evaluation_data: dict, **kwargs) -> dict: try: if "title" not in movie_data and isinstance(movie_data, dict): movie_data = dict(next(iter(movie_data.values()))) movie_title = movie_data.get("title", "Unknown") overview = movie_data.get("overview", "")[:220] genres = movie_data.get("genres", "Unknown") vote_average = movie_data.get('vote_average', 0) imdb_rating = movie_data.get('imdb_rating', 0) # ВАШ ОРИГИНАЛЬНЫЙ ПРОМПТ justification_prompt = f""" You are a seasoned film critic. Write an ENGLISH explanation (exactly 4-5 sentences, one blank line, then a signature line). USER QUERY: "{user_query}" MOVIE DATA Title : {movie_title} Genres : {genres} Overview (cut) : {overview} TMDB / IMDb : {vote_average}/10 • {imdb_rating}/10 RelevanceScore : {evaluation_data.get('final_score', 0)} WRITING RULES 1. Output **only the finished justification**. 2. NO planning words like "Next", "Then", "Need to", "Make sure", etc. 3. NO meta-instructions or bullet lists. 4. 1st-4th sentences must cover: • direct plot / theme overlap • genre & narrative alignment • one unique shared element • (optionally) quality note via rating 5. After a single blank line add EXACTLY: "The relevance level of the film {movie_title} to your description is {evaluation_data.get('final_score', 0)}" """ response = llm.complete(justification_prompt) justification_text = response.text.strip() return { "movie_title": movie_title, "justification": justification_text, "evaluation_score": evaluation_data.get('final_score', 0), "created_at": datetime.datetime.utcnow().isoformat() + "Z" } except Exception as e: return { "movie_title": movie_data.get('title', 'Unknown'), "justification": f"Error creating justification: {str(e)}", "error": str(e) } return FunctionTool.from_defaults( fn=create_detailed_justification, name="create_justification", description="Create detailed justification for movie recommendation" ) # --- Вспомогательные методы (Точно как в old версии) --- @staticmethod def _extract_quality_notes(vote_average, imdb_rating): notes = [] if vote_average >= 8.0: notes.append("Высокий рейтинг TMDB") if imdb_rating >= 8.0: notes.append("Высокий рейтинг IMDb") if vote_average >= 7.0 and imdb_rating >= 7.0: notes.append("Стабильно высокие оценки") return notes @staticmethod def _safe_year(release_date) -> str: from datetime import date, datetime if isinstance(release_date, (date, datetime)): return str(release_date.year) if isinstance(release_date, str) and len(release_date) >= 4: return release_date[:4] return "Unknown" @staticmethod def _format_movie_card(movie_data: dict, justification: str, rank: int) -> str: if "title" not in movie_data and isinstance(movie_data, dict): movie_data = dict(next(iter(movie_data.values()))) title = movie_data.get('title', 'Unknown') original_title = movie_data.get('original_title', '') release_date = movie_data.get('release_date', 'Unknown') year = ExpertAgent._safe_year(release_date) overview = movie_data.get('overview', 'No overview available') genres = movie_data.get('genres', 'Unknown') tagline = movie_data.get('tagline', '') vote_average = movie_data.get('vote_average', 0) vote_count = movie_data.get('vote_count', 0) imdb_rating = movie_data.get('imdb_rating', 0) popularity = movie_data.get('popularity', 0) runtime = movie_data.get('runtime', 0) budget = movie_data.get('budget', 0) revenue = movie_data.get('revenue', 0) director = movie_data.get('director', 'Unknown') cast = movie_data.get('cast', 'Unknown') return f"""**{rank}. {title}** ({year}) *{original_title}* {f'• {tagline}' if tagline else ''} **Genres:** {genres} **Overview:** {overview} **📊 Ratings:** • ⭐ TMDB: {vote_average}/10 ({vote_count:,} голосов) • 🎬 IMDb: {imdb_rating}/10 • 📈 Popularity: {popularity:.0f} **🎥 Technical data:** • ⏱️ Runtime: {runtime} мин • 💰 Budget: ${budget:,} USD • 💵 Revenue: ${revenue:,} USD **👥 Cast:** • 🎬 Director: {director} • 🎭 Cast: {cast} **🎯 Justification:** {justification}"""