words2csv / logging_helper.py
snake11235's picture
feat: add Gradio cache and Python cache directories to gitignore
081369c
import os
from typing import Any, List, Mapping, Optional
from datetime import datetime
_MODEL_LOGS: List[str] = []
def log(message: str) -> None:
print(f"[WORDS2CSV] {message}")
def log_debug(message: str) -> None:
if os.getenv("WORDS2CSV_DEBUG"):
print(f"[WORDS2CSV-DEBUG] {message}")
def _log_model_response(
*,
model_name: str,
content: str,
duration: float,
usage: Optional[Any] = None,
pricing: Optional[Mapping[str, Mapping[str, float]]] = None,
default_pricing_key: str = "default",
) -> Optional[float]:
"""Log model usage, cost (if pricing and usage are provided), and response details.
Returns the calculated cost if pricing and usage are provided, otherwise None.
"""
global _MODEL_LOGS
cost: Optional[float] = None
if usage is not None and pricing is not None:
# Normalize different provider usage formats into a common shape.
prompt_tokens: Optional[float] = None
completion_tokens: Optional[float] = None
total_tokens: Optional[float] = None
# Attribute-style access (e.g. OpenAI usage)
if hasattr(usage, "prompt_tokens") and hasattr(usage, "completion_tokens"):
prompt_tokens = float(getattr(usage, "prompt_tokens"))
completion_tokens = float(getattr(usage, "completion_tokens"))
total_tokens = float(getattr(usage, "total_tokens", prompt_tokens + completion_tokens))
# Gemini GenerateContentResponseUsageMetadata-style
elif hasattr(usage, "prompt_token_count") and hasattr(usage, "candidates_token_count"):
prompt_tokens = float(getattr(usage, "prompt_token_count"))
completion_tokens = float(getattr(usage, "candidates_token_count"))
total_tokens = float(getattr(usage, "total_token_count", prompt_tokens + completion_tokens))
# dict-style access fallback
elif isinstance(usage, Mapping):
if {"prompt_tokens", "completion_tokens"}.issubset(usage.keys()):
prompt_tokens = float(usage["prompt_tokens"])
completion_tokens = float(usage["completion_tokens"])
total_tokens = float(usage.get("total_tokens", prompt_tokens + completion_tokens))
elif {"prompt_token_count", "candidates_token_count"}.issubset(usage.keys()):
prompt_tokens = float(usage["prompt_token_count"])
completion_tokens = float(usage["candidates_token_count"])
total_tokens = float(usage.get("total_token_count", prompt_tokens + completion_tokens))
if prompt_tokens is not None and completion_tokens is not None and total_tokens is not None:
pricing_row = pricing.get(model_name, pricing[default_pricing_key])
input_cost = (prompt_tokens / 1_000_000) * pricing_row["input"]
output_cost = (completion_tokens / 1_000_000) * pricing_row["output"]
cost = input_cost + output_cost
log(f"Model: {model_name}")
log(
"Token usage: Input={prompt}, Output={completion}, Total={total}".format(
prompt=int(prompt_tokens), completion=int(completion_tokens), total=int(total_tokens)
)
)
log(f"Estimated cost: ${cost:.6f}")
log(f"Execution time: {duration:.3f} seconds")
else:
# Usage provided but in an unknown format – still log basic info.
log(f"Model: {model_name}")
log(f"Execution time: {duration:.3f} seconds")
log_debug(f"Unrecognized usage format: {usage!r}")
else:
log(f"Model: {model_name}")
log(f"Execution time: {duration:.3f} seconds")
log("Model response received")
# Store latest model log in a simple CSV-like line: timestamp, model name, duration, cost
timestamp = datetime.now().isoformat(timespec="seconds")
duration_s = float(duration)
cost_value = float(cost) if cost is not None else 0.0
line = f"{timestamp}, {model_name}, {duration_s:.3f} seconds, ${cost_value:.6f}"
_MODEL_LOGS.append(line)
log_debug(f"Response length: {len(content)} characters")
log_debug(f"Result: {content}")
log_debug("End of result")
return cost
def get_latest_model_log() -> Optional[str]:
"""Return full accumulated model logs as a single string, or None if empty."""
if not _MODEL_LOGS:
return None
return "\n".join(_MODEL_LOGS)