SaleAgent / customer_monitor.py
NomosRUS's picture
Upload 20 files
e1b7616 verified
# customer_monitor.py
# -*- coding: utf-8 -*-
"""
Модуль 5: Customer Monitor
Формирует ежедневный план контактов на основе CRM parquet данных (через CRMReader)
и печатает удобную таблицу в консоль.
Примеры:
python customer_monitor.py --data-dir ./Data --out today.csv --print
python customer_monitor.py --data-dir ./Data --out today.parquet --out-format parquet --limits 3 3 3 3 3
"""
from __future__ import annotations
import argparse
import sys
from typing import Dict, List
import pandas as pd
from access_parquet import CRMReader, WON_STAGE
from llm_client import LLMClient # Добавлено для генерации сводки
# --------------------------- ВСПОМОГАТЕЛЬНОЕ ---------------------------------
def _safe_to_utc(ts) -> pd.Timestamp | None:
if ts is None or pd.isna(ts):
return None
t = pd.to_datetime(ts, errors="coerce", utc=True)
return None if pd.isna(t) else t
def _fmt_dt(ts) -> str:
t = _safe_to_utc(ts)
return t.tz_convert("Europe/Amsterdam").strftime("%Y-%m-%d %H:%M") if t is not None else "—"
def _fmt_money(x) -> str:
if x is None or pd.isna(x):
return "—"
try:
return f"{float(x):,.0f}".replace(",", " ")
except Exception:
return str(x)
def _print_markdown_table(df: pd.DataFrame, title: str = "План дня"):
"""Печатает DataFrame как красивую markdown-таблицу."""
if df.empty:
print(f"\n**{title}**: результатов нет.")
return
printable = df.copy()
# Форматирование для вывода
printable["last_deal_amount"] = printable["last_deal_amount"].map(_fmt_money)
printable["recent_interactions"] = printable["recent_interactions"].astype(int)
# Заголовки таблицы
headers = {
"client": "Клиент",
"contact": "Контакт",
"recent_interactions": "Взаимодействия (30д)",
"last_deal_amount": "Сумма посл. сделки",
"last_stage": "Стадия",
"reason": "Приоритет",
"last_notes": "Заметки",
}
printable = printable.rename(columns=headers)
printable = printable[list(headers.values())]
# Конвертация в markdown
md_table = printable.to_markdown(index=False, tablefmt="pipe")
# Вывод в stdout
output = f"\n### {title}\n\n{md_table}\n"
sys.stdout.buffer.write(output.encode("utf-8"))
# ------------------------------ МОНИТОР --------------------------------------
def generate_plan_summary(llm: LLMClient, limits: dict) -> str:
"""Генерирует текстовое описание логики выборки клиентов с помощью LLM."""
if not llm:
return ""
criteria_prompt = f"""
Кратко, в 2-3 предложениях, опиши для менеджера по продажам, по какому принципу был сформирован план контактов на сегодня.
Используй дружелюбный, но деловой тон.
Критерии и лимиты выборки:
1. Давно не связывались (last_contact_at): {limits['long_no_contact']} клиента(ов).
Цель: предложить новые продукты.
2. Ближайший запланированный контакт (next_contact_at): {limits['upcoming_contact']} клиента(ов).
Цель: напомнить о встрече.
3. Направлено предложение (stage=proposal): {limits['proposal_stage']} клиента(ов).
Цель: получить обратную связь.
4. Недавняя успешная сделка (stage=won): {limits['recent_won']} клиента(ов).
Цель: предложить сопутствующие товары.
5. Высокая вероятность закрытия (probability > 80%): {limits['high_prob']} клиента(ов).
Цель: договориться о дате сделки.
"""
try:
response = llm.chat(
messages=[{"role": "user", "content": criteria_prompt}],
system_prompt="Ты — AI-ассистент, который помогает менеджеру по продажам понять его план работы."
)
return response.text.strip() if response.text else ""
except Exception as e:
print(f"[!] Не удалось сгенерировать сводку: {e}", file=sys.stderr)
return ""
class CustomerMonitor:
"""
Строит due-список по 5 правилам:
1) Давно не связывались — сорт. по last_contact_at ↑
2) Ближайший запланированный контакт — сорт. по next_contact_at ↑
3) Stage = proposal — ближайшие по next_contact_at (или last_contact_at)
4) Недавно won (30 дней) — предложить сопутствующие
5) Высокая вероятность > threshold — договориться о дате
"""
def __init__(self, reader: CRMReader, now: pd.Timestamp | None = None):
self.reader = reader
self.now = pd.Timestamp.now(tz="UTC") if now is None else pd.to_datetime(now, utc=True)
def _recent_interactions_count(self, client_id: int, days: int = 30) -> int:
cutoff = self.now - pd.Timedelta(days=days)
return int(self.reader.get_recent_interactions(client_id, since=cutoff).shape[0])
def _last_deal_info(self, client_id: int):
deals = self.reader.get_client_deals(client_id).sort_values("updated_at", ascending=False)
if deals.empty:
return None, None
d = deals.iloc[0]
return d.get("amount"), d.get("stage")
def _pack(self, row: pd.Series, reason: str) -> dict:
amount, stage = self._last_deal_info(int(row["client_id"]))
return {
"client_id": int(row["client_id"]),
"client": row.get("organisation") or row.get("name"),
"contact": row.get("contact_name"),
"recent_interactions": self._recent_interactions_count(int(row["client_id"])),
"last_notes": row.get("notes", ""),
"last_deal_amount": amount,
"last_stage": stage,
"reason": reason,
}
def build_due_list_df(
self,
limits: Dict[str, int] | None = None,
won_days: int = 30,
high_prob_threshold: float = 0.80
) -> pd.DataFrame:
limits = limits or {
"long_no_contact": 2,
"upcoming_contact": 2,
"proposal_stage": 2,
"recent_won": 2,
"high_prob": 2,
}
clients = self.reader.clients()
deals = self.reader.deals()
due_rows: List[dict] = []
# 1) Давно не связывались
if "last_contact_at" in clients.columns:
df = clients.sort_values("last_contact_at", ascending=True)
for _, row in df.head(limits["long_no_contact"]).iterrows():
due_rows.append(self._pack(row, "Давно не связывались"))
# 2) Ближайший запланированный контакт
if "next_contact_at" in clients.columns:
df = clients.sort_values("next_contact_at", ascending=True)
for _, row in df.head(limits["upcoming_contact"]).iterrows():
due_rows.append(self._pack(row, "Напоминание о встрече"))
# 3) Stage = proposal
if not deals.empty and "stage" in deals.columns:
prop = deals[deals["stage"] == "proposal"]
# упорядочим по ближайшему событию (если есть), иначе по последнему контакту
cids = prop["client_id"].unique()
subset = clients[clients["client_id"].isin(cids)].copy()
if "next_contact_at" in subset.columns:
subset = subset.sort_values("next_contact_at", ascending=True)
elif "last_contact_at" in subset.columns:
subset = subset.sort_values("last_contact_at", ascending=True)
for _, row in subset.head(limits["proposal_stage"]).iterrows():
due_rows.append(self._pack(row, "Есть предложение — нужна обратная связь"))
# 4) Недавно won (за won_days)
if not deals.empty and {"stage", "updated_at"}.issubset(deals.columns):
cutoff = self.now - pd.Timedelta(days=won_days)
won_recent = deals[(deals["stage"] == WON_STAGE) & (deals["updated_at"] >= cutoff)]
cids = won_recent["client_id"].unique()
subset = clients[clients["client_id"].isin(cids)].copy()
# чтобы не предлагать тех, кто уже был в первых двух списках, позже будет dedup
if "last_contact_at" in subset.columns:
subset = subset.sort_values("last_contact_at", ascending=True)
for _, row in subset.head(limits["recent_won"]).iterrows():
due_rows.append(self._pack(row, "Недавно выигранная сделка — предложить сопутствующие"))
# 5) Высокая вероятность > threshold
if not deals.empty and "probability" in deals.columns:
high = deals[deals["probability"] > high_prob_threshold]
# отдаём приоритет самой высокой вероятности
high = high.sort_values("probability", ascending=False)
seen = set()
picked = 0
for _, d in high.iterrows():
cid = int(d["client_id"])
if cid in seen:
continue
seen.add(cid)
row = clients.loc[[cid]]
if row.empty:
continue
due_rows.append(self._pack(row.iloc[0], "Высокая вероятность сделки — назначить дату"))
picked += 1
if picked >= limits["high_prob"]:
break
# Объединение, удаление дублей клиентов, сохранение первой по приоритету причины
df = pd.DataFrame(due_rows)
if df.empty:
return df
# Порядок правил = приоритету: 2 (встречи), 5 (high prob), 3 (proposal), 1 (давно), 4 (won)
priority = {
"Напоминание о встрече": 1,
"Высокая вероятность сделки — назначить дату": 2,
"Есть предложение — нужна обратная связь": 3,
"Давно не связывались": 4,
"Недавно выигранная сделка — предложить сопутствующие": 5,
}
df["__prio"] = df["reason"].map(priority).fillna(9).astype(int)
# keep first by priority
df = df.sort_values(["__prio", "client_id"], ascending=[True, True])
df = df.drop_duplicates(subset="client_id", keep="first")
df = df.drop(columns="__prio")
# Удобный порядок колонок
cols = ["client_id", "client", "contact", "recent_interactions", "last_deal_amount", "last_stage", "reason", "last_notes"]
return df.reindex(columns=[c for c in cols if c in df.columns])
def build_due_list(self, limits: Dict[str, int] | None = None) -> str:
"""Строит форматированный строковый отчет для вывода пользователю."""
df = self.build_due_list_df(limits=limits)
if df.empty:
return "На сегодня приоритетных клиентов для контакта нет."
def format_group(title: str, data: pd.DataFrame) -> str:
if data.empty:
return ""
header = f"### {title}\n"
lines = [f"- {row['client']} ({row['contact']})" for _, row in data.iterrows()]
return header + "\n".join(lines) + "\n"
output_parts = []
reasons = {
"Давно не связывались": "Клиенты, с кем давно не было связи (для презентации новых продуктов):",
"Напоминание о встрече": "Клиенты с ближайшим запланированным контактом (напомнить о встрече):",
"Есть предложение — нужна обратная связь": "Клиенты, которым направлено предложение (получить обратную связь):",
"Недавно выигранная сделка — предложить сопутствующие": "Клиенты с недавней успешной сделкой (предложить сопутствующие товары):",
"Высокая вероятность сделки — назначить дату": "Клиенты с высокой вероятностью закрытия сделки (договориться о дате):",
}
for reason, title in reasons.items():
group_df = df[df["reason"] == reason]
output_parts.append(format_group(title, group_df))
return "\n".join(filter(None, output_parts))
# --------------------------------- CLI ---------------------------------------
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--data-dir", default="Data", help="Папка с parquet-файлами")
ap.add_argument("--out", default="due_list.csv", help="Файл для сохранения (csv/parquet)")
ap.add_argument("--out-format", choices=["csv", "parquet"], default="csv")
ap.add_argument("--limits", nargs=5, type=int, metavar=("LNO","UP","PROP","WON","HP"),
help="Лимиты по правилам: long_no_contact, upcoming, proposal, won, high_prob")
ap.add_argument("--won-days", type=int, default=30, help="Окно для 'recent won', дней")
ap.add_argument("--high-prob", type=float, default=0.80, help="Порог вероятности для 'high probability'")
ap.add_argument("--no-print", dest="do_print", action="store_false", help="Не выводить таблицу в консоль")
ap.add_argument("--llm-summary", action="store_true", help="Добавить сводку от LLM")
args = ap.parse_args()
reader = CRMReader(data_dir=args.data_dir)
monitor = CustomerMonitor(reader)
llm = LLMClient() if args.llm_summary else None
limits_values = args.limits or [2, 2, 2, 2, 2]
limit_keys = ["long_no_contact", "upcoming_contact", "proposal_stage", "recent_won", "high_prob"]
limits = dict(zip(limit_keys, limits_values))
df = monitor.build_due_list(
limits=limits,
won_days=args.won_days,
high_prob_threshold=args.high_prob,
)
# Печать
if args.do_print:
title = "План контактов на сегодня"
if llm:
summary = generate_plan_summary(llm, limits)
if summary:
sys.stdout.buffer.write(f"\n{summary}\n".encode("utf-8"))
_print_markdown_table(df, title=title)
# Сохранение
if args.out:
if args.out_format == "csv":
df.to_csv(args.out, index=False, encoding="utf-8-sig")
else:
df.to_parquet(args.out, index=False)
print(f"\n[ok] Сохранено {len(df)} строк в {args.out}")
if __name__ == "__main__":
main()