Spaces:
Sleeping
Sleeping
| import os | |
| from typing import Any, List, Mapping, Optional | |
| from datetime import datetime | |
| _MODEL_LOGS: List[str] = [] | |
| def log(message: str) -> None: | |
| print(f"[WORDS2CSV] {message}") | |
| def log_debug(message: str) -> None: | |
| if os.getenv("WORDS2CSV_DEBUG"): | |
| print(f"[WORDS2CSV-DEBUG] {message}") | |
| def _log_model_response( | |
| *, | |
| model_name: str, | |
| content: str, | |
| duration: float, | |
| usage: Optional[Any] = None, | |
| pricing: Optional[Mapping[str, Mapping[str, float]]] = None, | |
| default_pricing_key: str = "default", | |
| ) -> Optional[float]: | |
| """Log model usage, cost (if pricing and usage are provided), and response details. | |
| Returns the calculated cost if pricing and usage are provided, otherwise None. | |
| """ | |
| global _MODEL_LOGS | |
| cost: Optional[float] = None | |
| if usage is not None and pricing is not None: | |
| # Normalize different provider usage formats into a common shape. | |
| prompt_tokens: Optional[float] = None | |
| completion_tokens: Optional[float] = None | |
| total_tokens: Optional[float] = None | |
| # Attribute-style access (e.g. OpenAI usage) | |
| if hasattr(usage, "prompt_tokens") and hasattr(usage, "completion_tokens"): | |
| prompt_tokens = float(getattr(usage, "prompt_tokens")) | |
| completion_tokens = float(getattr(usage, "completion_tokens")) | |
| total_tokens = float(getattr(usage, "total_tokens", prompt_tokens + completion_tokens)) | |
| # Gemini GenerateContentResponseUsageMetadata-style | |
| elif hasattr(usage, "prompt_token_count") and hasattr(usage, "candidates_token_count"): | |
| prompt_tokens = float(getattr(usage, "prompt_token_count")) | |
| completion_tokens = float(getattr(usage, "candidates_token_count")) | |
| total_tokens = float(getattr(usage, "total_token_count", prompt_tokens + completion_tokens)) | |
| # dict-style access fallback | |
| elif isinstance(usage, Mapping): | |
| if {"prompt_tokens", "completion_tokens"}.issubset(usage.keys()): | |
| prompt_tokens = float(usage["prompt_tokens"]) | |
| completion_tokens = float(usage["completion_tokens"]) | |
| total_tokens = float(usage.get("total_tokens", prompt_tokens + completion_tokens)) | |
| elif {"prompt_token_count", "candidates_token_count"}.issubset(usage.keys()): | |
| prompt_tokens = float(usage["prompt_token_count"]) | |
| completion_tokens = float(usage["candidates_token_count"]) | |
| total_tokens = float(usage.get("total_token_count", prompt_tokens + completion_tokens)) | |
| if prompt_tokens is not None and completion_tokens is not None and total_tokens is not None: | |
| pricing_row = pricing.get(model_name, pricing[default_pricing_key]) | |
| input_cost = (prompt_tokens / 1_000_000) * pricing_row["input"] | |
| output_cost = (completion_tokens / 1_000_000) * pricing_row["output"] | |
| cost = input_cost + output_cost | |
| log(f"Model: {model_name}") | |
| log( | |
| "Token usage: Input={prompt}, Output={completion}, Total={total}".format( | |
| prompt=int(prompt_tokens), completion=int(completion_tokens), total=int(total_tokens) | |
| ) | |
| ) | |
| log(f"Estimated cost: ${cost:.6f}") | |
| log(f"Execution time: {duration:.3f} seconds") | |
| else: | |
| # Usage provided but in an unknown format – still log basic info. | |
| log(f"Model: {model_name}") | |
| log(f"Execution time: {duration:.3f} seconds") | |
| log_debug(f"Unrecognized usage format: {usage!r}") | |
| else: | |
| log(f"Model: {model_name}") | |
| log(f"Execution time: {duration:.3f} seconds") | |
| log("Model response received") | |
| # Store latest model log in a simple CSV-like line: timestamp, model name, duration, cost | |
| timestamp = datetime.now().isoformat(timespec="seconds") | |
| duration_s = float(duration) | |
| cost_value = float(cost) if cost is not None else 0.0 | |
| line = f"{timestamp}, {model_name}, {duration_s:.3f} seconds, ${cost_value:.6f}" | |
| _MODEL_LOGS.append(line) | |
| log_debug(f"Response length: {len(content)} characters") | |
| log_debug(f"Result: {content}") | |
| log_debug("End of result") | |
| return cost | |
| def get_latest_model_log() -> Optional[str]: | |
| """Return full accumulated model logs as a single string, or None if empty.""" | |
| if not _MODEL_LOGS: | |
| return None | |
| return "\n".join(_MODEL_LOGS) | |