File size: 4,470 Bytes
4ae3e44
081369c
 
 
 
 
4ae3e44
 
 
 
 
 
 
 
 
a8f90b0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
081369c
 
a8f90b0
 
 
081369c
 
 
 
a8f90b0
081369c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
a8f90b0
 
 
 
 
081369c
 
 
 
 
 
 
a8f90b0
 
 
 
 
081369c
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
import os
from typing import Any, List, Mapping, Optional
from datetime import datetime


_MODEL_LOGS: List[str] = []


def log(message: str) -> None:
    print(f"[WORDS2CSV] {message}")


def log_debug(message: str) -> None:
    if os.getenv("WORDS2CSV_DEBUG"):
        print(f"[WORDS2CSV-DEBUG] {message}")


def _log_model_response(
    *,
    model_name: str,
    content: str,
    duration: float,
    usage: Optional[Any] = None,
    pricing: Optional[Mapping[str, Mapping[str, float]]] = None,
    default_pricing_key: str = "default",
) -> Optional[float]:
    """Log model usage, cost (if pricing and usage are provided), and response details.

    Returns the calculated cost if pricing and usage are provided, otherwise None.
    """
    global _MODEL_LOGS

    cost: Optional[float] = None

    if usage is not None and pricing is not None:
        # Normalize different provider usage formats into a common shape.
        prompt_tokens: Optional[float] = None
        completion_tokens: Optional[float] = None
        total_tokens: Optional[float] = None

        # Attribute-style access (e.g. OpenAI usage)
        if hasattr(usage, "prompt_tokens") and hasattr(usage, "completion_tokens"):
            prompt_tokens = float(getattr(usage, "prompt_tokens"))
            completion_tokens = float(getattr(usage, "completion_tokens"))
            total_tokens = float(getattr(usage, "total_tokens", prompt_tokens + completion_tokens))

        # Gemini GenerateContentResponseUsageMetadata-style
        elif hasattr(usage, "prompt_token_count") and hasattr(usage, "candidates_token_count"):
            prompt_tokens = float(getattr(usage, "prompt_token_count"))
            completion_tokens = float(getattr(usage, "candidates_token_count"))
            total_tokens = float(getattr(usage, "total_token_count", prompt_tokens + completion_tokens))

        # dict-style access fallback
        elif isinstance(usage, Mapping):
            if {"prompt_tokens", "completion_tokens"}.issubset(usage.keys()):
                prompt_tokens = float(usage["prompt_tokens"])
                completion_tokens = float(usage["completion_tokens"])
                total_tokens = float(usage.get("total_tokens", prompt_tokens + completion_tokens))
            elif {"prompt_token_count", "candidates_token_count"}.issubset(usage.keys()):
                prompt_tokens = float(usage["prompt_token_count"])
                completion_tokens = float(usage["candidates_token_count"])
                total_tokens = float(usage.get("total_token_count", prompt_tokens + completion_tokens))

        if prompt_tokens is not None and completion_tokens is not None and total_tokens is not None:
            pricing_row = pricing.get(model_name, pricing[default_pricing_key])
            input_cost = (prompt_tokens / 1_000_000) * pricing_row["input"]
            output_cost = (completion_tokens / 1_000_000) * pricing_row["output"]
            cost = input_cost + output_cost

            log(f"Model: {model_name}")
            log(
                "Token usage: Input={prompt}, Output={completion}, Total={total}".format(
                    prompt=int(prompt_tokens), completion=int(completion_tokens), total=int(total_tokens)
                )
            )
            log(f"Estimated cost: ${cost:.6f}")
            log(f"Execution time: {duration:.3f} seconds")
        else:
            # Usage provided but in an unknown format – still log basic info.
            log(f"Model: {model_name}")
            log(f"Execution time: {duration:.3f} seconds")
            log_debug(f"Unrecognized usage format: {usage!r}")
    else:
        log(f"Model: {model_name}")
        log(f"Execution time: {duration:.3f} seconds")

    log("Model response received")

    # Store latest model log in a simple CSV-like line: timestamp, model name, duration, cost
    timestamp = datetime.now().isoformat(timespec="seconds")
    duration_s = float(duration)
    cost_value = float(cost) if cost is not None else 0.0
    line = f"{timestamp}, {model_name}, {duration_s:.3f} seconds, ${cost_value:.6f}"
    _MODEL_LOGS.append(line)
    log_debug(f"Response length: {len(content)} characters")
    log_debug(f"Result: {content}")
    log_debug("End of result")

    return cost


def get_latest_model_log() -> Optional[str]:
    """Return full accumulated model logs as a single string, or None if empty."""
    if not _MODEL_LOGS:
        return None
    return "\n".join(_MODEL_LOGS)