# app.py — Yahoo Finance–based Monte Carlo Simulator with Weights & Table from __future__ import annotations import os import math from typing import Dict, List, Optional, Tuple import gradio as gr import yfinance as yf import numpy as np import pandas as pd import matplotlib.pyplot as plt TRADING_DAYS = 252 # ---------- Data helpers ---------- def clean_table(df: pd.DataFrame) -> List[Tuple[str, Optional[float]]]: """ Accepts a DataFrame with columns ["Ticker", "Weight"] and returns a cleaned list of (ticker, weight_or_None) rows, dropping blanks/NaNs in Ticker. """ if df is None or df.empty: return [] cols = [c.strip().lower() for c in df.columns.tolist()] colmap = {c: i for i, c in enumerate(cols)} tk_col = colmap.get("ticker", 0) wt_col = colmap.get("weight", 1) rows = [] for _, row in df.iterrows(): raw_tk = str(row.iloc[tk_col]).strip() if not raw_tk or raw_tk.lower() in ("nan", "none"): continue ticker = raw_tk.upper() weight = None try: w = row.iloc[wt_col] if pd.notna(w): weight = float(w) except (ValueError, IndexError): weight = None rows.append((ticker, weight)) return rows def normalize_weights(pairs: List[Tuple[str, Optional[float]]]) -> Tuple[List[str], np.ndarray]: """ Normalizes weights, defaulting to equal weights if none are provided. """ tickers = [t for t, _ in pairs] raw = np.array([w for _, w in pairs], dtype=object) if np.all(raw == None): return tickers, np.ones(len(tickers)) / max(1, len(tickers)) weights = np.array([0.0 if w is None else float(w) for _, w in pairs]) weights = np.clip(weights, 0, None) s = weights.sum() return tickers, weights / s if s > 0 else np.ones(len(tickers)) / max(1, len(tickers)) def fetch_price_series(ticker: str, period: str = "1y", interval: str = "1d") -> Optional[pd.Series]: """Fetches historical close prices for a ticker using yfinance.""" try: data = yf.Ticker(ticker) hist = data.history(period=period, interval=interval) return hist["Close"] if not hist.empty else None except Exception: return None def summarize_series(prices: pd.Series) -> Optional[Dict[str, float]]: """Computes daily log-return mu, sigma, and last price.""" if prices is None or len(prices) < 30: return None rets = np.log(prices / prices.shift(1)).dropna() if rets.empty: return None return { "mu": float(rets.mean()), "sigma": float(rets.std()), "last": float(prices.iloc[-1]), "n": len(rets) } def simulate_gbm(last_price: float, mu_d: float, sigma_d: float, days: int, paths: int, seed: Optional[int] = None) -> np.ndarray: """Simulates Geometric Brownian Motion paths.""" if seed is not None: np.random.seed(seed) drift = mu_d - 0.5 * sigma_d**2 shocks = np.random.normal(0.0, 1.0, size=(days, paths)) log_paths = np.cumsum(drift + sigma_d * shocks, axis=0) prices = last_price * np.exp(log_paths) return np.vstack([np.full((1, paths), last_price), prices]) def run_multi_ticker_mc(tickers: List[str], days: int, paths: int, period: str, seed: Optional[int] = None) -> Dict[str, Dict]: """Runs simulations for multiple tickers.""" results = {} for t in tickers: series = fetch_price_series(t, period=period) summary = summarize_series(series) if summary: sim = simulate_gbm(summary["last"], summary["mu"], summary["sigma"], days, paths, seed) results[t] = {"params": summary, "paths": sim} else: results[t] = {"params": None, "paths": None} return results def portfolio_index(sim_results: Dict[str, Dict], tickers: List[str], weights: np.ndarray) -> Optional[np.ndarray]: """Builds a weighted portfolio index from simulated paths.""" norm_paths = [] for i, t in enumerate(tickers): payload = sim_results.get(t, {}) if payload.get("paths") is not None and payload.get("params"): last = payload["params"]["last"] if last > 0: norm_paths.append((payload["paths"] / last) * weights[i]) return np.stack(norm_paths).sum(axis=0) if norm_paths else None # ---------- Presentation Helpers ---------- def assess_investment(p10: float, p50: float, p90: float, last_price: float, target_multiplier: float = 1.5) -> str: """Analyzes simulation results to classify an investment.""" if last_price <= 0: return 'NEUTRAL' target_price = last_price * target_multiplier median_check_pass = (p50 >= target_price) risk_check_pass = (p10 / last_price) >= 0.75 potential_loss = max(0, last_price - p10) potential_gain = max(0, p90 - last_price) if potential_loss == 0 and potential_gain > 0: risk_reward_pass = True elif potential_loss > 0: risk_reward_pass = (potential_gain / potential_loss) >= 2.0 else: risk_reward_pass = False if median_check_pass and risk_check_pass and risk_reward_pass: return 'GOOD' if not risk_check_pass or not median_check_pass: return 'POOR' return 'NEUTRAL' def make_plot(sim_results: Dict[str, Dict], port_paths: Optional[np.ndarray], port_json: Optional[Dict[str, float]]): """Plots paths and adds a visual assessment marker.""" fig, ax = plt.subplots(figsize=(9, 5)) for t, payload in sim_results.items(): if payload.get("paths") is not None: ax.plot(payload["paths"].mean(axis=1), label=t, alpha=0.6) if port_paths is not None: ax.plot(port_paths.mean(axis=1), label="Portfolio (weighted)", linewidth=3.0, color='black') ax.set_title("Monte Carlo Simulation – Expected Price Paths") ax.set_xlabel("Days Ahead") ax.set_ylabel("Price / Index") ax.legend() if port_json: assessment = assess_investment( port_json.get("portfolio_horizon_p10", 0), port_json.get("portfolio_horizon_p50", 0), port_json.get("portfolio_horizon_p90", 0), 1.0 # Portfolio index starts at 1.0 ) if assessment == 'GOOD': ax.text(0.95, 0.95, "✓", transform=ax.transAxes, fontsize=40, color='green', ha='right', va='top', bbox=dict(boxstyle='round,pad=0.2', fc='white', ec='green', alpha=0.8)) elif assessment == 'POOR': ax.text(0.95, 0.95, "✗", transform=ax.transAxes, fontsize=40, color='red', ha='right', va='top', bbox=dict(boxstyle='round,pad=0.2', fc='white', ec='red', alpha=0.8)) return fig def build_stats(sim_results: Dict[str, Dict], port_paths: Optional[np.ndarray]): """Builds statistical summaries from simulation results.""" rows, out = [], {} for t, payload in sim_results.items(): params, paths = payload.get("params"), payload.get("paths") if params and paths is not None: end_prices = paths[-1, :] p10, p50, p90 = np.percentile(end_prices, [10, 50, 90]) mu_annual = params["mu"] * TRADING_DAYS sigma_annual = params["sigma"] * math.sqrt(TRADING_DAYS) out[t] = { "last_close": params["last"], "daily_mu": params["mu"], "daily_sigma": params["sigma"], "annual_mu_approx": mu_annual, "annual_sigma_approx": sigma_annual, "horizon_p10": p10, "horizon_p50": p50, "horizon_p90": p90, "n_daily_obs": params["n"] } rows.append([t, params["last"], params["mu"], params["sigma"], mu_annual, sigma_annual, p10, p50, p90, params["n"]]) cols = ["Ticker", "Last Close", "Daily μ", "Daily σ", "Annual μ (≈)", "Annual σ (≈)", "Horizon P10", "Horizon P50", "Horizon P90", "# Daily Obs"] table = pd.DataFrame(rows, columns=cols) port_json = None if port_paths is not None: end_vals = port_paths[-1, :] p10, p50, p90 = np.percentile(end_vals, [10, 50, 90]) port_json = {"portfolio_horizon_p10": p10, "portfolio_horizon_p50": p50, "portfolio_horizon_p90": p90} return out, table, port_json # ---------- Gradio UI ---------- def handle_run(df_input: pd.DataFrame, horizon_days: int, paths: int, period: str, seed_text: str): """Main controller function for the UI.""" pairs = clean_table(df_input) if not pairs: return None, {}, pd.DataFrame(), {} tickers, weights = normalize_weights(pairs) seed = int(seed_text.strip()) if seed_text.strip().isdigit() else None sims = run_multi_ticker_mc(tickers, horizon_days, paths, period, seed) port_paths = portfolio_index(sims, tickers, weights) stats_json, stats_table, port_json = build_stats(sims, port_paths) fig = make_plot(sims, port_paths, port_json) port_summary = port_json or {} port_summary["normalized_weights"] = {t: w for t, w in zip(tickers, weights)} return fig, stats_json, stats_table, port_summary with gr.Blocks() as demo: gr.Markdown("# Monte Carlo Portfolio Simulator\n...") holdings_df = gr.Dataframe(headers=["Ticker", "Weight"], value=[["AAPL", 0.5], ["MSFT", 0.5]], row_count=(3, "dynamic"), col_count=(2, "fixed"), wrap=True, label="Holdings") with gr.Row(): horizon = gr.Slider(label="Horizon (days)", minimum=30, maximum=756, step=21, value=252) n_paths = gr.Slider(label="Number of paths", minimum=100, maximum=1000000, step=100, value=5000) period = gr.Dropdown(label="History window", choices=["6mo", "1y", "2y", "5y", "10y", "max"], value="1y") seed_box = gr.Textbox(label="Random seed (optional)", placeholder="e.g., 42") run_btn = gr.Button("Run Simulation", variant="primary") with gr.Row(): plot_output = gr.Plot(label="Expected Paths (Per-Ticker + Portfolio)") with gr.Row(): table_output = gr.Dataframe(label="Per-Ticker Stats") json_output = gr.JSON(label="JSON (Per-Ticker Stats)") json_port = gr.JSON(label="JSON (Portfolio Summary)") run_btn.click( handle_run, inputs=[holdings_df, horizon, n_paths, period, seed_box], outputs=[plot_output, json_output, table_output, json_port] ) if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))