|
|
|
|
|
|
|
|
"""
|
|
|
Модуль 5: Customer Monitor
|
|
|
Формирует ежедневный план контактов на основе CRM parquet данных (через CRMReader)
|
|
|
и печатает удобную таблицу в консоль.
|
|
|
|
|
|
Примеры:
|
|
|
python customer_monitor.py --data-dir ./Data --out today.csv --print
|
|
|
python customer_monitor.py --data-dir ./Data --out today.parquet --out-format parquet --limits 3 3 3 3 3
|
|
|
"""
|
|
|
|
|
|
from __future__ import annotations
|
|
|
import argparse
|
|
|
import sys
|
|
|
from typing import Dict, List
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
from access_parquet import CRMReader, WON_STAGE
|
|
|
from llm_client import LLMClient
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _safe_to_utc(ts) -> pd.Timestamp | None:
|
|
|
if ts is None or pd.isna(ts):
|
|
|
return None
|
|
|
t = pd.to_datetime(ts, errors="coerce", utc=True)
|
|
|
return None if pd.isna(t) else t
|
|
|
|
|
|
|
|
|
def _fmt_dt(ts) -> str:
|
|
|
t = _safe_to_utc(ts)
|
|
|
return t.tz_convert("Europe/Amsterdam").strftime("%Y-%m-%d %H:%M") if t is not None else "—"
|
|
|
|
|
|
|
|
|
def _fmt_money(x) -> str:
|
|
|
if x is None or pd.isna(x):
|
|
|
return "—"
|
|
|
try:
|
|
|
return f"{float(x):,.0f}".replace(",", " ")
|
|
|
except Exception:
|
|
|
return str(x)
|
|
|
|
|
|
|
|
|
def _print_markdown_table(df: pd.DataFrame, title: str = "План дня"):
|
|
|
"""Печатает DataFrame как красивую markdown-таблицу."""
|
|
|
if df.empty:
|
|
|
print(f"\n**{title}**: результатов нет.")
|
|
|
return
|
|
|
|
|
|
printable = df.copy()
|
|
|
|
|
|
printable["last_deal_amount"] = printable["last_deal_amount"].map(_fmt_money)
|
|
|
printable["recent_interactions"] = printable["recent_interactions"].astype(int)
|
|
|
|
|
|
|
|
|
headers = {
|
|
|
"client": "Клиент",
|
|
|
"contact": "Контакт",
|
|
|
"recent_interactions": "Взаимодействия (30д)",
|
|
|
"last_deal_amount": "Сумма посл. сделки",
|
|
|
"last_stage": "Стадия",
|
|
|
"reason": "Приоритет",
|
|
|
"last_notes": "Заметки",
|
|
|
}
|
|
|
printable = printable.rename(columns=headers)
|
|
|
printable = printable[list(headers.values())]
|
|
|
|
|
|
|
|
|
md_table = printable.to_markdown(index=False, tablefmt="pipe")
|
|
|
|
|
|
|
|
|
output = f"\n### {title}\n\n{md_table}\n"
|
|
|
sys.stdout.buffer.write(output.encode("utf-8"))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def generate_plan_summary(llm: LLMClient, limits: dict) -> str:
|
|
|
"""Генерирует текстовое описание логики выборки клиентов с помощью LLM."""
|
|
|
if not llm:
|
|
|
return ""
|
|
|
|
|
|
criteria_prompt = f"""
|
|
|
Кратко, в 2-3 предложениях, опиши для менеджера по продажам, по какому принципу был сформирован план контактов на сегодня.
|
|
|
Используй дружелюбный, но деловой тон.
|
|
|
|
|
|
Критерии и лимиты выборки:
|
|
|
1. Давно не связывались (last_contact_at): {limits['long_no_contact']} клиента(ов).
|
|
|
Цель: предложить новые продукты.
|
|
|
2. Ближайший запланированный контакт (next_contact_at): {limits['upcoming_contact']} клиента(ов).
|
|
|
Цель: напомнить о встрече.
|
|
|
3. Направлено предложение (stage=proposal): {limits['proposal_stage']} клиента(ов).
|
|
|
Цель: получить обратную связь.
|
|
|
4. Недавняя успешная сделка (stage=won): {limits['recent_won']} клиента(ов).
|
|
|
Цель: предложить сопутствующие товары.
|
|
|
5. Высокая вероятность закрытия (probability > 80%): {limits['high_prob']} клиента(ов).
|
|
|
Цель: договориться о дате сделки.
|
|
|
"""
|
|
|
try:
|
|
|
response = llm.chat(
|
|
|
messages=[{"role": "user", "content": criteria_prompt}],
|
|
|
system_prompt="Ты — AI-ассистент, который помогает менеджеру по продажам понять его план работы."
|
|
|
)
|
|
|
return response.text.strip() if response.text else ""
|
|
|
except Exception as e:
|
|
|
print(f"[!] Не удалось сгенерировать сводку: {e}", file=sys.stderr)
|
|
|
return ""
|
|
|
|
|
|
|
|
|
class CustomerMonitor:
|
|
|
"""
|
|
|
Строит due-список по 5 правилам:
|
|
|
1) Давно не связывались — сорт. по last_contact_at ↑
|
|
|
2) Ближайший запланированный контакт — сорт. по next_contact_at ↑
|
|
|
3) Stage = proposal — ближайшие по next_contact_at (или last_contact_at)
|
|
|
4) Недавно won (30 дней) — предложить сопутствующие
|
|
|
5) Высокая вероятность > threshold — договориться о дате
|
|
|
"""
|
|
|
def __init__(self, reader: CRMReader, now: pd.Timestamp | None = None):
|
|
|
self.reader = reader
|
|
|
self.now = pd.Timestamp.now(tz="UTC") if now is None else pd.to_datetime(now, utc=True)
|
|
|
|
|
|
def _recent_interactions_count(self, client_id: int, days: int = 30) -> int:
|
|
|
cutoff = self.now - pd.Timedelta(days=days)
|
|
|
return int(self.reader.get_recent_interactions(client_id, since=cutoff).shape[0])
|
|
|
|
|
|
def _last_deal_info(self, client_id: int):
|
|
|
deals = self.reader.get_client_deals(client_id).sort_values("updated_at", ascending=False)
|
|
|
if deals.empty:
|
|
|
return None, None
|
|
|
d = deals.iloc[0]
|
|
|
return d.get("amount"), d.get("stage")
|
|
|
|
|
|
def _pack(self, row: pd.Series, reason: str) -> dict:
|
|
|
amount, stage = self._last_deal_info(int(row["client_id"]))
|
|
|
return {
|
|
|
"client_id": int(row["client_id"]),
|
|
|
"client": row.get("organisation") or row.get("name"),
|
|
|
"contact": row.get("contact_name"),
|
|
|
"recent_interactions": self._recent_interactions_count(int(row["client_id"])),
|
|
|
"last_notes": row.get("notes", ""),
|
|
|
"last_deal_amount": amount,
|
|
|
"last_stage": stage,
|
|
|
"reason": reason,
|
|
|
}
|
|
|
|
|
|
def build_due_list_df(
|
|
|
self,
|
|
|
limits: Dict[str, int] | None = None,
|
|
|
won_days: int = 30,
|
|
|
high_prob_threshold: float = 0.80
|
|
|
) -> pd.DataFrame:
|
|
|
|
|
|
limits = limits or {
|
|
|
"long_no_contact": 2,
|
|
|
"upcoming_contact": 2,
|
|
|
"proposal_stage": 2,
|
|
|
"recent_won": 2,
|
|
|
"high_prob": 2,
|
|
|
}
|
|
|
|
|
|
clients = self.reader.clients()
|
|
|
deals = self.reader.deals()
|
|
|
|
|
|
due_rows: List[dict] = []
|
|
|
|
|
|
|
|
|
if "last_contact_at" in clients.columns:
|
|
|
df = clients.sort_values("last_contact_at", ascending=True)
|
|
|
for _, row in df.head(limits["long_no_contact"]).iterrows():
|
|
|
due_rows.append(self._pack(row, "Давно не связывались"))
|
|
|
|
|
|
|
|
|
if "next_contact_at" in clients.columns:
|
|
|
df = clients.sort_values("next_contact_at", ascending=True)
|
|
|
for _, row in df.head(limits["upcoming_contact"]).iterrows():
|
|
|
due_rows.append(self._pack(row, "Напоминание о встрече"))
|
|
|
|
|
|
|
|
|
if not deals.empty and "stage" in deals.columns:
|
|
|
prop = deals[deals["stage"] == "proposal"]
|
|
|
|
|
|
cids = prop["client_id"].unique()
|
|
|
subset = clients[clients["client_id"].isin(cids)].copy()
|
|
|
if "next_contact_at" in subset.columns:
|
|
|
subset = subset.sort_values("next_contact_at", ascending=True)
|
|
|
elif "last_contact_at" in subset.columns:
|
|
|
subset = subset.sort_values("last_contact_at", ascending=True)
|
|
|
for _, row in subset.head(limits["proposal_stage"]).iterrows():
|
|
|
due_rows.append(self._pack(row, "Есть предложение — нужна обратная связь"))
|
|
|
|
|
|
|
|
|
if not deals.empty and {"stage", "updated_at"}.issubset(deals.columns):
|
|
|
cutoff = self.now - pd.Timedelta(days=won_days)
|
|
|
won_recent = deals[(deals["stage"] == WON_STAGE) & (deals["updated_at"] >= cutoff)]
|
|
|
cids = won_recent["client_id"].unique()
|
|
|
subset = clients[clients["client_id"].isin(cids)].copy()
|
|
|
|
|
|
if "last_contact_at" in subset.columns:
|
|
|
subset = subset.sort_values("last_contact_at", ascending=True)
|
|
|
for _, row in subset.head(limits["recent_won"]).iterrows():
|
|
|
due_rows.append(self._pack(row, "Недавно выигранная сделка — предложить сопутствующие"))
|
|
|
|
|
|
|
|
|
if not deals.empty and "probability" in deals.columns:
|
|
|
high = deals[deals["probability"] > high_prob_threshold]
|
|
|
|
|
|
high = high.sort_values("probability", ascending=False)
|
|
|
seen = set()
|
|
|
picked = 0
|
|
|
for _, d in high.iterrows():
|
|
|
cid = int(d["client_id"])
|
|
|
if cid in seen:
|
|
|
continue
|
|
|
seen.add(cid)
|
|
|
row = clients.loc[[cid]]
|
|
|
if row.empty:
|
|
|
continue
|
|
|
due_rows.append(self._pack(row.iloc[0], "Высокая вероятность сделки — назначить дату"))
|
|
|
picked += 1
|
|
|
if picked >= limits["high_prob"]:
|
|
|
break
|
|
|
|
|
|
|
|
|
df = pd.DataFrame(due_rows)
|
|
|
if df.empty:
|
|
|
return df
|
|
|
|
|
|
|
|
|
priority = {
|
|
|
"Напоминание о встрече": 1,
|
|
|
"Высокая вероятность сделки — назначить дату": 2,
|
|
|
"Есть предложение — нужна обратная связь": 3,
|
|
|
"Давно не связывались": 4,
|
|
|
"Недавно выигранная сделка — предложить сопутствующие": 5,
|
|
|
}
|
|
|
df["__prio"] = df["reason"].map(priority).fillna(9).astype(int)
|
|
|
|
|
|
df = df.sort_values(["__prio", "client_id"], ascending=[True, True])
|
|
|
df = df.drop_duplicates(subset="client_id", keep="first")
|
|
|
df = df.drop(columns="__prio")
|
|
|
|
|
|
|
|
|
cols = ["client_id", "client", "contact", "recent_interactions", "last_deal_amount", "last_stage", "reason", "last_notes"]
|
|
|
return df.reindex(columns=[c for c in cols if c in df.columns])
|
|
|
|
|
|
def build_due_list(self, limits: Dict[str, int] | None = None) -> str:
|
|
|
"""Строит форматированный строковый отчет для вывода пользователю."""
|
|
|
df = self.build_due_list_df(limits=limits)
|
|
|
if df.empty:
|
|
|
return "На сегодня приоритетных клиентов для контакта нет."
|
|
|
|
|
|
def format_group(title: str, data: pd.DataFrame) -> str:
|
|
|
if data.empty:
|
|
|
return ""
|
|
|
header = f"### {title}\n"
|
|
|
lines = [f"- {row['client']} ({row['contact']})" for _, row in data.iterrows()]
|
|
|
return header + "\n".join(lines) + "\n"
|
|
|
|
|
|
output_parts = []
|
|
|
reasons = {
|
|
|
"Давно не связывались": "Клиенты, с кем давно не было связи (для презентации новых продуктов):",
|
|
|
"Напоминание о встрече": "Клиенты с ближайшим запланированным контактом (напомнить о встрече):",
|
|
|
"Есть предложение — нужна обратная связь": "Клиенты, которым направлено предложение (получить обратную связь):",
|
|
|
"Недавно выигранная сделка — предложить сопутствующие": "Клиенты с недавней успешной сделкой (предложить сопутствующие товары):",
|
|
|
"Высокая вероятность сделки — назначить дату": "Клиенты с высокой вероятностью закрытия сделки (договориться о дате):",
|
|
|
}
|
|
|
|
|
|
for reason, title in reasons.items():
|
|
|
group_df = df[df["reason"] == reason]
|
|
|
output_parts.append(format_group(title, group_df))
|
|
|
|
|
|
return "\n".join(filter(None, output_parts))
|
|
|
|
|
|
|
|
|
|
|
|
def main():
|
|
|
ap = argparse.ArgumentParser()
|
|
|
ap.add_argument("--data-dir", default="Data", help="Папка с parquet-файлами")
|
|
|
ap.add_argument("--out", default="due_list.csv", help="Файл для сохранения (csv/parquet)")
|
|
|
ap.add_argument("--out-format", choices=["csv", "parquet"], default="csv")
|
|
|
ap.add_argument("--limits", nargs=5, type=int, metavar=("LNO","UP","PROP","WON","HP"),
|
|
|
help="Лимиты по правилам: long_no_contact, upcoming, proposal, won, high_prob")
|
|
|
ap.add_argument("--won-days", type=int, default=30, help="Окно для 'recent won', дней")
|
|
|
ap.add_argument("--high-prob", type=float, default=0.80, help="Порог вероятности для 'high probability'")
|
|
|
ap.add_argument("--no-print", dest="do_print", action="store_false", help="Не выводить таблицу в консоль")
|
|
|
ap.add_argument("--llm-summary", action="store_true", help="Добавить сводку от LLM")
|
|
|
args = ap.parse_args()
|
|
|
|
|
|
reader = CRMReader(data_dir=args.data_dir)
|
|
|
monitor = CustomerMonitor(reader)
|
|
|
llm = LLMClient() if args.llm_summary else None
|
|
|
|
|
|
limits_values = args.limits or [2, 2, 2, 2, 2]
|
|
|
limit_keys = ["long_no_contact", "upcoming_contact", "proposal_stage", "recent_won", "high_prob"]
|
|
|
limits = dict(zip(limit_keys, limits_values))
|
|
|
|
|
|
df = monitor.build_due_list(
|
|
|
limits=limits,
|
|
|
won_days=args.won_days,
|
|
|
high_prob_threshold=args.high_prob,
|
|
|
)
|
|
|
|
|
|
|
|
|
if args.do_print:
|
|
|
title = "План контактов на сегодня"
|
|
|
if llm:
|
|
|
summary = generate_plan_summary(llm, limits)
|
|
|
if summary:
|
|
|
sys.stdout.buffer.write(f"\n{summary}\n".encode("utf-8"))
|
|
|
|
|
|
_print_markdown_table(df, title=title)
|
|
|
|
|
|
|
|
|
if args.out:
|
|
|
if args.out_format == "csv":
|
|
|
df.to_csv(args.out, index=False, encoding="utf-8-sig")
|
|
|
else:
|
|
|
df.to_parquet(args.out, index=False)
|
|
|
print(f"\n[ok] Сохранено {len(df)} строк в {args.out}")
|
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
main()
|
|
|
|