cinematch-ai / modal_app.py
dbadeev's picture
Upload 24 files
da524e0 verified
import modal
from modal import Image, App, Volume, Secret
import logging
# import modal
import faiss
import numpy as np
import pandas as pd
import pickle
from sentence_transformers import SentenceTransformer
import torch
import os
from agents.modal_agents import app as agents_app
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
image = (
Image.from_registry(
"nvidia/cuda:12.8.1-devel-ubuntu22.04",
add_python="3.11"
)
.apt_install(
"build-essential",
"python3-dev",
"gcc",
"g++",
"cmake",
"wget", # Добавляем wget
"unzip" # Добавляем unzip для распаковки
)
.pip_install_from_requirements("requirements_modal.txt")
.add_local_file(
local_path="setup_image.py",
remote_path="/root/setup_image.py",
copy=True
)
# ✅ ДОБАВЛЕНО: Добавляем скрипт извлечения punkt данных
.add_local_file(
local_path="setup_punkt_extraction.py",
remote_path="/root/setup_punkt_extraction.py",
copy=True
)
.run_commands("python /root/setup_image.py")
.run_commands("python -m spacy download en_core_web_lg")
.run_commands(
# Скачиваем ресурс punkt
"wget -O /tmp/punkt.zip https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/packages/tokenizers/punkt.zip",
"unzip /tmp/punkt.zip -d /tmp",
# Создаем структуру директорий
"mkdir -p /root/nltk_data/tokenizers/punkt_tab/english",
# Копируем основные файлы
"cp /tmp/punkt/PY3/english.pickle /root/nltk_data/tokenizers/punkt_tab/english/",
"cp /tmp/punkt/README /root/nltk_data/tokenizers/punkt_tab/",
"cp -r /tmp/punkt/PY3 /root/nltk_data/tokenizers/punkt_tab/",
# ✅ КЛЮЧЕВОЕ ИЗМЕНЕНИЕ: Запускаем скрипт извлечения данных
"python /root/setup_punkt_extraction.py",
# Удаляем временные файлы
"rm -rf /tmp/punkt*"
)
.add_local_dir("modal_utils", remote_path="/root/modal_utils")
# .add_local_dir("local_utils", remote_path="/root/local_utils")
.add_local_dir("agents", remote_path="/root/agents")
.add_local_dir("evaluation", remote_path="/root/evaluation")
)
app = App(
name="tmdb-project",
image=image,
secrets=[
Secret.from_name("my-env"), # Для конфиденциальных данных
Secret.from_name("nebius-secret")
]
)
# Включаем все функции агентов в основной app
app.include(agents_app)
volume = Volume.from_name("tmdb-data", create_if_missing=True)
# ✅ ДОБАВЛЯЕМ СЛОВАРЬ ДЛЯ СЕМАФОРА
# Он будет хранить активные сессии: {session_id: timestamp}
active_users_dict = modal.Dict.from_name("cinematch-active-users", create_if_missing=True)
@app.function(
volumes={"/data": volume},
gpu="A10G",
timeout=3600
)
def process_movies():
"""Основная функция обработки фильмов"""
# Импорт внутри функции для работы с добавленными директориями
from modal_utils.cloud_operations import heavy_computation
return heavy_computation()
@app.function(volumes={"/data": volume})
def upload_file(data_str: str):
import shutil
import os
volume.listdir(path='/', recursive=True)
print(volume.listdir(path='/', recursive=True))
# local_file_path = 'temp_sample.csv' # Используйте временный файл
remote_file_path = '/data/input.csv' # Путь в Volume
print(1)
# Создаем директорию, если нужно
os.makedirs(os.path.dirname(remote_file_path), exist_ok=True)
# Записываем данные напрямую в файл
with open(remote_file_path, "w") as f:
f.write(data_str)
print(f"Данные успешно записаны в Volume: {remote_file_path}")
return remote_file_path
@app.function(
image=image,
gpu="A10G", # было any
volumes={"/data": volume},
timeout=120 # было 1800 == 30 минут на батч
)
def process_batch(batch: list[tuple]):
"""
Обрабатывает батч данных на GPU
Вход: список кортежей (processed_text, original_text)
Выход: список JSON-строк с признаками
"""
import spacy
from textacy.extract import keyterms
from textblob import TextBlob
import numpy as np
import json
import en_core_web_lg # Прямой импорт модели
import torch
from concurrent.futures import ThreadPoolExecutor
torch.set_num_threads(1) # Уменьшаем число CPU потоков
spacy.prefer_gpu() # Активирует GPU для spaCy
# Загружаем модель
nlp = en_core_web_lg.load()
# Добавляем sentencizer, если его нет в пайплайне
if "sentencizer" not in nlp.pipe_names:
nlp.add_pipe("sentencizer")
processed_texts = [item[0] for item in batch]
original_texts = [item[1] for item in batch]
# Обработка предобработанных текстов (Оптимизированная обработка spaCy)
# processed_docs = list(nlp.pipe(processed_texts, batch_size=128))
processed_docs = list(nlp.pipe(processed_texts, batch_size=4096)) # Увеличьте для GPU - было 400 для CPU
# Функция для параллельного вычисления эмоциональной вариативности (sentiment variance)
def compute_sentiment_variance(text):
if not text or len(text) < 20:
return 0.0
try:
blob = TextBlob(text)
if len(blob.sentences) > 1:
sentiments = [s.sentiment.polarity for s in blob.sentences]
return float(np.var(sentiments))
return 0.0
except:
return 0.0
# Параллельное вычисление для всего батча
with ThreadPoolExecutor(max_workers=16) as executor:
sentiment_variances = list(executor.map(compute_sentiment_variance, original_texts))
# Предварительно вычисляем plot_turns для всего батча
turn_keywords = {"but", "however", "though", "although", "nevertheless",
"suddenly", "unexpectedly", "surprisingly", "abruptly"}
# Векторизованный расчет plot_turns.
# Используем более эффективный метод
lower_texts = [text.lower() for text in original_texts]
plot_turns_counts = [
sum(text.count(kw) for kw in turn_keywords)
if text and len(text) >= 20 else 0
for text in lower_texts
]
results = []
for i, (processed_doc, original_text) in enumerate(zip(processed_docs, original_texts)):
features = {
"conflict_keywords": [],
"plot_turns": plot_turns_counts[i], # Используем предвычисленное значение (было 0, вычислялось позже)
"sentiment_variance": sentiment_variances[i], # Используем предвычисленное значение (было 0.0)
"action_density": 0.0
}
try:
# 1. Ключевые слова конфликта
if processed_texts[i] and len(processed_texts[i]) >= 20:
conflict_terms = [
term for term, score in keyterms.textrank(
processed_doc,
topn=5,
window_size=10,
edge_weighting="count",
position_bias=False
) if term and term.strip()
]
features["conflict_keywords"] = conflict_terms
# Плотность действий
if processed_doc and len(processed_doc) > 0:
action_verbs = sum(1 for token in processed_doc if token.pos_ == "VERB")
features["action_density"] = action_verbs / len(processed_doc)
except Exception as e:
print(f"Error processing item {i}: {str(e)[:100]}")
results.append(json.dumps(features))
return results
@app.function(
image=image,
volumes={"/data": volume},
# memory=6144, # Увеличиваем память до 6 ГБ
timeout=600 # 150 минут вместо 60 секунд
)
def load_data(max_rows: int = None):
"""Загружает данные из CSV на Volume"""
import pandas as pd
# file_path = "/data/data/output.csv"
file_path = "/data/data/output.parquet" # Теперь используем Parquet
print(f"Loading data from {file_path}...")
# Чтение данных с возможностью ограничения количества строк
if max_rows:
# df = pd.read_csv(file_path, nrows=max_rows)
df = pd.read_parquet(file_path, rows=max_rows)
else:
# Чтение всего файла
df = pd.read_parquet(file_path)
# df = pd.read_csv(file_path)
print(f"Loaded {len(df)} records")
# Проверка необходимых столбцов
required_columns = ['processed_overview', 'overview']
for col in required_columns:
if col not in df.columns:
raise ValueError(f"Column '{col}' not found in dataset")
print(f'Columns check finished')
# Заполнение пропущенных значений
df['processed_overview'] = df['processed_overview'].fillna('')
df['overview'] = df['overview'].fillna('')
print(f'Missing values filling is finished')
return df
@app.function(
image=image,
volumes={"/data": volume},
timeout=300 # 5 минут вместо 60 секунд
)
def save_results(df, output_path):
"""Сохраняет результаты на Volume"""
print(f"Saving results to {output_path}...")
# df.to_parquet(output_path, index=False)
df.to_parquet(output_path, index=False, engine='pyarrow') # или engine='fastparquet'
print(f"✅ Results saved to {output_path}")
@app.local_entrypoint()
def process_test_batch(batch_size: int = 1000):
"""Обрабатывает тестовый батч из Volume"""
import json
# Загрузка данных
df = load_data.remote(max_rows=batch_size)
# Формирование батча
batch_data = list(zip(
df['processed_overview'].astype(str),
df['overview'].astype(str)
))
# Обработка батча
print(f"Processing test batch ({len(batch_data)} records) on GPU...")
results = process_batch.remote(batch_data)
# Добавление результатов
df['narrative_features'] = results
df['features_decoded'] = df['narrative_features'].apply(json.loads)
# Сохранение результатов
output_path = f"/data/data/test_batch_results_{batch_size}.parquet"
save_results.remote(df, output_path)
# Вывод статистики
print("\nProcessing statistics:")
print(
f"Conflict_keywords (non-empty): {sum(1 for x in df['features_decoded'] if x['conflict_keywords'])}/{len(df)}")
print(f"Avg plot_turns: {df['features_decoded'].apply(lambda x: x['plot_turns']).mean():.2f}")
print(f"Avg sentiment_variance: {df['features_decoded'].apply(lambda x: x['sentiment_variance']).mean():.4f}")
print(f"Avg action_density: {df['features_decoded'].apply(lambda x: x['action_density']).mean():.2f}")
print("\n✅ Test batch processing complete!")
@app.local_entrypoint()
def show_sample_results(file_path: str = "/data/test_batch_results_1000.parquet"):
"""Показывает примеры результатов из файла на Volume"""
import json
# Загрузка результатов
@app.function(volumes={"/data": volume})
def load_results(path):
import pandas as pd
return pd.read_parquet(path)
df = load_results.remote(file_path)
# Добавление декодированных признаков
if 'narrative_features' in df.columns:
df['features_decoded'] = df['narrative_features'].apply(json.loads)
print(f"Results from {file_path} ({len(df)} records):")
# Вывод примеров
sample_size = min(3, len(df))
print(f"\nSample of {sample_size} records:")
for i, row in df.head(sample_size).iterrows():
print(f"\nRecord {i}:")
print(f"Processed: {row['processed_overview'][:100]}...")
print(f"Original: {row['overview'][:100]}...")
print("Features:")
features = row['features_decoded'] if 'features_decoded' in row else json.loads(row['narrative_features'])
for k, v in features.items():
print(f" {k}: {v}")
# Общая статистика
if 'features_decoded' in df.columns:
print("\nDataset statistics:")
print(f"Avg plot_turns: {df['features_decoded'].apply(lambda x: x['plot_turns']).mean():.2f}")
print(f"Avg sentiment_variance: {df['features_decoded'].apply(lambda x: x['sentiment_variance']).mean():.4f}")
print(f"Avg action_density: {df['features_decoded'].apply(lambda x: x['action_density']).mean():.2f}")
@app.function(
image=image,
volumes={"/data": volume},
timeout=3600, # 1 час на конвертацию
memory=8192 # 8 ГБ памяти
)
def convert_csv_to_parquet():
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv as pc
from pathlib import Path
import time
start_time = time.time()
input_path = "/data/data/output.csv"
output_path = "/data/data/output.parquet"
print(f"Starting conversion: {input_path} -> {output_path}")
# Создаем директорию если нужно
Path(output_path).parent.mkdir(parents=True, exist_ok=True)
# Читаем CSV с помощью PyArrow (оптимизировано для больших файлов)
reader = pc.open_csv(
input_path,
read_options=pc.ReadOptions(block_size=128 * 1024 * 1024), # 128MB блоки
parse_options=pc.ParseOptions(delimiter=",")
)
# Схема для записи Parquet
writer = None
# Обрабатываем данные порциями
batch_count = 0
while True:
try:
batch = reader.read_next_batch()
if not batch:
break
df = batch.to_pandas()
if writer is None:
# Инициализируем writer при первом батче
writer = pq.ParquetWriter(
output_path,
pa.Table.from_pandas(df).schema,
compression='SNAPPY'
)
# Конвертируем в pyarrow Table и записываем
table = pa.Table.from_pandas(df)
writer.write_table(table)
batch_count += 1
print(f"Processed batch {batch_count} ({df.shape[0]} rows)")
except StopIteration:
break
# Финализируем запись
if writer:
writer.close()
duration = time.time() - start_time
print(f"✅ Conversion complete! Saved to {output_path}")
print(f"Total batches: {batch_count}")
print(f"Total time: {duration:.2f} seconds")
return output_path
@app.function(
image=image,
volumes={"/data": volume},
timeout=3600
)
def rebuild_parquet_with_row_index():
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
input_path = "/data/data/output.parquet"
output_path = "/data/data/output_indexed.parquet"
# Читаем исходные данные
df = pd.read_parquet(input_path)
# Добавляем индекс строки
df.reset_index(inplace=True)
df.rename(columns={'index': 'row_id'}, inplace=True)
# Сохраняем с разбивкой по строкам
table = pa.Table.from_pandas(df)
pq.write_table(table, output_path, row_group_size=15000)
return output_path
@app.local_entrypoint()
def process_full_dataset(batch_size: int = 15000):
"""Обрабатывает и сохраняет результаты напрямую в Volume"""
from tqdm import tqdm
import math
# 1. Получаем только метаданные (количество строк)
total_records = get_row_count.remote()
print(f"Total records to process: {total_records}")
# 2. Рассчитываем количество батчей
num_batches = math.ceil(total_records / batch_size)
print(f"Processing in {num_batches} batches of {batch_size} records")
# 3. Создаем временную директорию для частичных результатов
partial_dir = "/data/partial_results"
# 4. Объединяем результаты
final_path = "/data/data/full_dataset_results.parquet"
combine_results.remote(partial_dir, final_path)
print("\n✅ Full dataset processing complete!")
print(f"Results saved to {final_path}")
@app.function(volumes={"/data": volume})
def init_partial_dir(partial_dir: str):
"""Создает директорию для частичных результатов"""
import os
os.makedirs(partial_dir, exist_ok=True)
return f"Created directory {partial_dir}"
@app.function(
image=image,
gpu="A10G",
volumes={"/data": volume},
timeout=300
)
def process_and_save_batch(start_row: int, end_row: int, batch_idx: int, partial_dir: str):
"""Обрабатывает батч и сохраняет результаты в отдельный файл"""
import pandas as pd
import pyarrow.parquet as pq
import os
# 0. Создаем директорию, если ее нет
os.makedirs(partial_dir, exist_ok=True)
# 1. Чтение данных
file_path = "/data/data/output.parquet"
# Альтернативный метод чтения без row groups
df = pd.read_parquet(file_path)
df = df.iloc[start_row:end_row]
# 2. Подготовка данных
df['processed_overview'] = df['processed_overview'].fillna('')
df['overview'] = df['overview'].fillna('')
# 3. Формирование батча
batch_data = list(zip(
df['processed_overview'].astype(str),
df['overview'].astype(str)
))
# 4. Обработка батча
results = process_batch.remote(batch_data)
# 5. Сохранение результатов
result_df = pd.DataFrame({'narrative_features': results})
output_path = os.path.join(partial_dir, f"batch_{batch_idx}.parquet")
result_df.to_parquet(output_path)
return f"Saved batch {batch_idx} to {output_path}"
@app.function(volumes={"/data": volume})
def combine_results(partial_dir: str, final_path: str):
"""Объединяет частичные результаты в финальный файл"""
import pandas as pd
import os
from glob import glob
# import pyarrow.parquet as pq
# 1. Сбор всех частичных файлов
# partial_files = glob(os.path.join(partial_dir, "*.parquet"))
partial_files = sorted(
glob(os.path.join(partial_dir, "*.parquet")),
key=lambda x: int(os.path.basename(x).split('_')[1].split('.')[0])
)
print(partial_files)
# 2. Чтение и объединение
full_results = []
for file_path in partial_files:
df = pd.read_parquet(file_path)
full_results.extend(df['narrative_features'].tolist())
print(f'len(full_results) = {len(full_results)}')
# 3. Чтение исходных данных
source_df = pd.read_parquet("/data/data/output.parquet")
print({source_df.info()})
# 4. Добавляем результаты
source_df['narrative_features'] = full_results
# 5. Сохранение финального результата
source_df.to_parquet(final_path)
# 6. Очистка временных файлов
for file_path in partial_files:
os.remove(file_path)
os.rmdir(partial_dir)
return f"Combined {len(partial_files)} batches into {final_path}"
@app.function(volumes={"/data": volume})
def get_row_count():
"""Возвращает общее количество строк в Parquet файле"""
import pyarrow.parquet as pq
file_path = "/data/data/output.parquet"
return pq.read_metadata(file_path).num_rows
@app.function(
image=image,
volumes={"/data": volume},
gpu="A10G",
timeout=3600,
memory=16384
)
def build_faiss_index():
"""
Построение FAISS индекса с учетом совместимости CUDA 12.8
Исправленная версия для эмбеддингов в формате строкового Python списка
"""
import ast
print("Проверка доступности CUDA...")
print(f"CUDA доступна: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f"CUDA устройств: {torch.cuda.device_count()}")
print(f"Текущее устройство: {torch.cuda.current_device()}")
# Загрузка данных
df = pd.read_parquet("/data/data/full_dataset_results.parquet")
print(f"Загружено {len(df)} фильмов")
# Анализ формата первого эмбеддинга
sample_embedding = df['processed_overview_embedding'].iloc[0]
print(f"Пример эмбеддинга: {str(sample_embedding)[:100]}...")
print(f"Тип данных: {type(sample_embedding)}")
# Извлечение эмбеддингов
embeddings_list = []
valid_indices = []
parse_errors = 0
print("Начинаем обработку эмбеддингов...")
for idx, (_, row) in enumerate(df.iterrows()):
try:
embedding_data = row['processed_overview_embedding']
# Обработка различных форматов хранения эмбеддингов.
# А именно - парсинг строкового представления Python списка
if isinstance(embedding_data, str):
try:
# Безопасный парсинг с помощью ast.literal_eval
parsed_list = ast.literal_eval(embedding_data.strip())
if isinstance(parsed_list, list):
embedding = np.array(parsed_list, dtype=np.float32)
else:
parse_errors += 1
continue
except (ValueError, SyntaxError):
parse_errors += 1
continue
elif isinstance(embedding_data, list):
embedding = np.array(embedding_data, dtype=np.float32)
elif isinstance(embedding_data, np.ndarray):
embedding = embedding_data.astype(np.float32)
else:
parse_errors += 1
continue
# Проверка размерности (Размерность all-MiniLM-L6-v2 = 384)
if len(embedding) == 384:
embeddings_list.append(embedding.astype(np.float32))
valid_indices.append(idx)
else:
parse_errors += 1
except Exception as e:
parse_errors += 1
if parse_errors <= 5: # Выводим первые несколько ошибок
print(f"Ошибка обработки эмбеддинга {idx}: {e}")
continue
# Прогресс каждые 50000 записей
if (idx + 1) % 50000 == 0:
print(f"Обработано {idx + 1}/{len(df)} записей, валидных: {len(embeddings_list)}")
print(f"Успешно обработано {len(embeddings_list)} эмбеддингов из {len(df)}")
print(f"Ошибок парсинга: {parse_errors}")
print(f"Успешность обработки: {len(embeddings_list) / len(df) * 100:.2f}%")
if not embeddings_list:
raise ValueError(f"Не найдено валидных эмбеддингов. Всего ошибок: {parse_errors}")
# Создание матрицы эмбеддингов
embeddings_matrix = np.vstack(embeddings_list)
print(f"Подготовлено {len(embeddings_matrix)} эмбеддингов")
print(f"Создана матрица эмбеддингов: {embeddings_matrix.shape}")
# Нормализация для косинусного сходства
faiss.normalize_L2(embeddings_matrix)
print("Эмбеддинги нормализованы")
# Создание FAISS индекса с поддержкой GPU
dimension = embeddings_matrix.shape[1]
# Проверяем доступность GPU для FAISS
if faiss.get_num_gpus() > 0:
print("Используем GPU для построения FAISS индекса")
# GPU ресурсы
res = faiss.StandardGpuResources()
# CPU индекс
cpu_index = faiss.IndexFlatIP(dimension)
# Перенос на GPU
gpu_index = faiss.index_cpu_to_gpu(res, 0, cpu_index)
gpu_index.add(embeddings_matrix)
# Возврат на CPU для сохранения
index = faiss.index_gpu_to_cpu(gpu_index)
print("FAISS индекс построен на GPU и перенесен на CPU для сохранения")
else:
print("Используем CPU для FAISS")
index = faiss.IndexFlatIP(dimension)
index.add(embeddings_matrix)
# Сохранение результатов
print("Сохранение FAISS индекса...")
faiss.write_index(index, "/data/data/movie_embeddings.index")
# Сохранение метаданных
print("Сохранение метаданных фильмов...")
valid_movies_df = df.iloc[valid_indices].reset_index(drop=True)
valid_movies_df.to_parquet("/data/data/indexed_movies_metadata.parquet")
result = {
"status": "success",
"total_movies": len(valid_movies_df),
"original_dataset_size": len(df),
"index_size": index.ntotal,
"dimension": dimension,
"gpu_used": faiss.get_num_gpus() > 0,
"processing_success_rate": len(valid_indices) / len(df),
"parse_errors": parse_errors
}
print("=" * 50)
print("ПОСТРОЕНИЕ ИНДЕКСА ЗАВЕРШЕНО")
print(f"Обработано фильмов: {result['total_movies']} из {result['original_dataset_size']}")
print(f"Размерность векторов: {result['dimension']}")
print(f"Успешность: {result['processing_success_rate'] * 100:.2f}%")
print("=" * 50)
return result
@app.function(
image=image,
volumes={"/data": volume},
timeout=300
)
def test_embedding_parsing(num_samples=100):
"""
Тестирование парсинга эмбеддингов на небольшой выборке данных
"""
import ast
df = pd.read_parquet("/data/data/full_dataset_results.parquet")
print(f"Загружено {len(df)} фильмов для тестирования")
test_sample = df.head(num_samples)
successful_parses = 0
failed_parses = 0
print("Тестирование парсинга эмбеддингов...")
for idx, row in test_sample.iterrows():
embedding_data = row['processed_overview_embedding']
try:
if isinstance(embedding_data, str):
parsed_list = ast.literal_eval(embedding_data.strip())
if isinstance(parsed_list, list):
embedding = np.array(parsed_list, dtype=np.float32)
if len(embedding) == 384:
successful_parses += 1
else:
print(f"Неправильная размерность {len(embedding)} для индекса {idx}")
failed_parses += 1
else:
print(f"Парсинг не дал список для индекса {idx}: {type(parsed_list)}")
failed_parses += 1
else:
print(f"Неожиданный тип данных для индекса {idx}: {type(embedding_data)}")
failed_parses += 1
except Exception as e:
print(f"Ошибка парсинга для индекса {idx}: {e}")
failed_parses += 1
print(f"\nРезультаты тестирования:")
print(f"Успешных парсингов: {successful_parses}")
print(f"Неудачных парсингов: {failed_parses}")
print(f"Успешность: {successful_parses / (successful_parses + failed_parses) * 100:.2f}%")
return {
"successful_parses": successful_parses,
"failed_parses": failed_parses,
"success_rate": successful_parses / (successful_parses + failed_parses)
}
@app.function(
image=image,
gpu="A10G",
timeout=300,
max_containers=1 # Макс 3 одновременных кодирования
)
def encode_user_query(query_text: str, remove_entities: bool = True):
"""
Генерация эмбеддинга для пользовательского описания с опциональным удалением именованных сущностей
"""
import spacy
import tempfile
# Импорт внутри функции для работы с добавленными директориями
from modal_utils.cloud_operations import (clean_text, prepare_text_for_embedding,
encode_user_query_fallback, extract_narrative_features_consistent)
# Проверка входных данных
if not query_text or not query_text.strip():
raise ValueError("Пустой запрос не может быть обработан")
# Определение устройства
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Используется устройство: {device}")
# Инициализация spaCy модели (та же, что использовалась для обработки фильмов)
try:
try:
# Загрузка spaCy с проверкой GPU
if torch.cuda.is_available():
spacy.prefer_gpu()
import en_core_web_lg
nlp = en_core_web_lg.load()
# nlp = spacy.load("en_core_web_lg")
# Добавляем sentencizer, если его нет
if "sentencizer" not in nlp.pipe_names:
nlp.add_pipe("sentencizer")
print("SpaCy модель загружена успешно")
except Exception as e:
print(f"Ошибка загрузки spaCy: {e}")
# Fallback к простой обработке
return encode_user_query_fallback(query_text, device)
# Инициализация модели для кодирования
try:
# Определяем кэш-директорию в зависимости от ОС
cache_dir = os.path.join(tempfile.gettempdir(), "sentence_transformer_cache")
# Создаем директорию, если не существует
os.makedirs(cache_dir, exist_ok=True)
print(f"Using cache directory: {cache_dir}")
model = SentenceTransformer(
'all-MiniLM-L6-v2',
device=device,
cache_folder=cache_dir
)
# Оптимизация для GPU
if torch.cuda.is_available():
model = model.half()
print("Using half-precision model")
except Exception as e:
return {"error": f"model_init_error: {str(e)}"}
# Применяем тот же процесс обработки, что и для фильмов
# Опциональное удаление именованных сущностей для фокуса на сюжете
if remove_entities:
processed_query = prepare_text_for_embedding(query_text, nlp)
# Проверка, что после обработки остался текст
if not processed_query.strip():
print("Предупреждение: После удаления сущностей текст стал пустым, используем очищенную версию")
processed_query = clean_text(query_text)
else:
processed_query = clean_text(query_text)
# Финальная проверка
if not processed_query.strip():
processed_query = query_text.lower().strip()
print(f"Исходное описание: '{query_text}'")
print(f"Обработанное описание: '{processed_query}'")
print(f"Исходное описание: {query_text}")
print(f"Обработанное описание: {processed_query}")
# Генерация эмбеддинга
query_embedding = model.encode(
[processed_query],
convert_to_tensor=False,
batch_size=1,
show_progress_bar=False
)[0]
# Извлечение нарративных признаков, консистентных с базой данных
narrative_features = extract_narrative_features_consistent(query_text, processed_query, nlp)
return {
"original_query": query_text,
"processed_query": processed_query,
"embedding": query_embedding.tolist(),
"embedding_dimension": len(query_embedding),
"narrative_features": narrative_features,
"device_used": device,
"preprocessing_applied": remove_entities
}
except Exception as e:
print(f"Ошибка в основной обработке: {e}, переключаемся на fallback")
return encode_user_query_fallback(query_text, device)
@app.function(
image=image,
timeout=300
)
def test_text_processing_consistency():
"""
Тестирование консистентности обработки текста между фильмами и описанием пошьзователя
Запуск из командной строки на локальном комп-ре:
$ modal run modal_app.py::app.test_text_processing_consistency
"""
import spacy
# Импорт внутри функции для работы с добавленными директориями
from modal_utils.cloud_operations import clean_text, prepare_text_for_embedding, encode_user_query_fallback
nlp = spacy.load("en_core_web_lg")
# Тестовые примеры
test_texts = [
"A young wizard named Harry Potter discovers his magical heritage.",
"In New York City, a detective investigates a mysterious crime.",
"The story follows John Smith as he travels through time.",
"An epic adventure in the Star Wars universe with Luke Skywalker."
]
print("Тестирование обработки текста:")
print("=" * 60)
for text in test_texts:
processed = prepare_text_for_embedding(text, nlp)
print(f"Исходный: {text}")
print(f"Обработанный: {processed}")
print("-" * 40)
return {"test_completed": True, "samples_processed": len(test_texts)}
# Глобальная переменная для кэширования GPU индекса
_gpu_index_cache = None
_gpu_resources_cache = None
@app.function(
image=image,
volumes={"/data": volume},
gpu="A10G",
timeout=300,
min_containers=1, # Поддерживаем контейнер активным
max_containers=1 # Макс 3 одновременных кодирования
)
def search_similar_movies(
query_embedding: list,
query_narrative_features: dict,
top_k: int = 50,
rerank_top_n: int = 10):
"""
Поиск похожих фильмов с использованием FAISS и консистентных нарративных признаков
дополнительным ранжированием
по нарративным признакам. Оптимизированная версия с кэшированием GPU
индекса для избежания повторных переносов
"""
global _gpu_index_cache, _gpu_resources_cache
import time
from modal_utils.cloud_operations import (rerank_by_narrative_features,
calculate_narrative_similarity)
start_time = time.time()
search_index = None # Инициализируем переменную
# Загрузка FAISS индекса
movies_df = pd.read_parquet("/data/data/indexed_movies_metadata.parquet")
# Инициализация GPU индекса (только при первом вызове)
if _gpu_index_cache is None and faiss.get_num_gpus() > 0:
print("Первая инициализация GPU индекса...")
# Загрузка CPU индекса
cpu_index = faiss.read_index("/data/data/movie_embeddings.index")
load_time = time.time() - start_time
print(f"Загрузка данных: {load_time:.3f}s")
# Создание GPU ресурсов
_gpu_resources_cache = faiss.StandardGpuResources()
_gpu_resources_cache.setTempMemory(1024 * 1024 * 1024) # 1GB temp memory
# Перенос на GPU
_gpu_index_cache = faiss.index_cpu_to_gpu(_gpu_resources_cache, 0, cpu_index)
logger.info(f"GPU индекс кэширован и готов к использованию")
print("GPU индекс кэширован и готов к использованию")
using_gpu = True
elif _gpu_index_cache is not None:
logger.info(f"Используем кэшированный GPU индекс")
print("Используем кэшированный GPU индекс")
using_gpu = True
else:
logger.info(f"GPU недоступен, используем CPU")
print("GPU недоступен, используем CPU")
cpu_index = faiss.read_index("/data/data/movie_embeddings.index")
search_index = cpu_index
using_gpu = False
if using_gpu:
search_index = _gpu_index_cache
# Семантический поиск, Подготовка запроса
query_vector = np.array([query_embedding], dtype=np.float32)
faiss.normalize_L2(query_vector)
# Выполнение поиска ближайших соседей
search_start = time.time()
distances, indices = search_index.search(query_vector, top_k)
search_time = time.time() - search_start
logger.info(f"Время поиска ({'GPU' if using_gpu else 'CPU'}): {search_time:.3f}s")
print(f"Время поиска ({'GPU' if using_gpu else 'CPU'}): {search_time:.3f}s")
# Обработка результатов
process_start = time.time()
candidates = []
for i, (dist, idx) in enumerate(zip(distances[0], indices[0])):
if idx < len(movies_df):
movie = movies_df.iloc[idx]
# Вычисление нарративного сходства с исправленной функцией
narrative_similarity = calculate_narrative_similarity(
query_narrative_features,
movie.get('narrative_features', '{}')
)
candidates.append({
'index': idx,
'semantic_score': float(dist),
'narrative_similarity': narrative_similarity,
'movie_data': movie.to_dict()
})
# Дополнительное ранжирование с учетом нарративных признаков
reranked_candidates = rerank_by_narrative_features(candidates)
process_time = time.time() - process_start
total_time = time.time() - start_time
# Подготавливаем необходимые для инфо поля и выводим через logger
# filtered = {}
desired_movie_keys = {'id', 'title', 'narrative_features'}
if reranked_candidates: # список не пуст
first = reranked_candidates[0] # это dict
movie_info = first.get("movie_data", {}) # dict с данными фильма
filtered = {k: movie_info.get(k) for k in desired_movie_keys if k in movie_info}
logger.info(f"First re-ranked candidate (filtered): {filtered}")
else:
logger.warning("reranked_candidates is empty, nothing to log")
return {
"results": reranked_candidates[:rerank_top_n],
"performance_metrics": {
"using_gpu": using_gpu,
"search_time": search_time,
"process_time": process_time,
"total_time": total_time,
"cached_gpu_index": _gpu_index_cache is not None
}
}