Spaces:
Sleeping
Sleeping
| # app.py — Yahoo Finance–based Monte Carlo Simulator with Weights & Table | |
| from __future__ import annotations | |
| import os | |
| import math | |
| from typing import Dict, List, Optional, Tuple | |
| import gradio as gr | |
| import yfinance as yf | |
| import numpy as np | |
| import pandas as pd | |
| import matplotlib.pyplot as plt | |
| TRADING_DAYS = 252 | |
| # ---------- Data helpers ---------- | |
| def clean_table(df: pd.DataFrame) -> List[Tuple[str, Optional[float]]]: | |
| """ | |
| Accepts a DataFrame with columns ["Ticker", "Weight"] and returns a cleaned | |
| list of (ticker, weight_or_None) rows, dropping blanks/NaNs in Ticker. | |
| """ | |
| if df is None or df.empty: | |
| return [] | |
| cols = [c.strip().lower() for c in df.columns.tolist()] | |
| colmap = {c: i for i, c in enumerate(cols)} | |
| tk_col = colmap.get("ticker", 0) | |
| wt_col = colmap.get("weight", 1) | |
| rows = [] | |
| for _, row in df.iterrows(): | |
| raw_tk = str(row.iloc[tk_col]).strip() | |
| if not raw_tk or raw_tk.lower() in ("nan", "none"): | |
| continue | |
| ticker = raw_tk.upper() | |
| weight = None | |
| try: | |
| w = row.iloc[wt_col] | |
| if pd.notna(w): | |
| weight = float(w) | |
| except (ValueError, IndexError): | |
| weight = None | |
| rows.append((ticker, weight)) | |
| return rows | |
| def normalize_weights(pairs: List[Tuple[str, Optional[float]]]) -> Tuple[List[str], np.ndarray]: | |
| """ | |
| Normalizes weights, defaulting to equal weights if none are provided. | |
| """ | |
| tickers = [t for t, _ in pairs] | |
| raw = np.array([w for _, w in pairs], dtype=object) | |
| if np.all(raw == None): | |
| return tickers, np.ones(len(tickers)) / max(1, len(tickers)) | |
| weights = np.array([0.0 if w is None else float(w) for _, w in pairs]) | |
| weights = np.clip(weights, 0, None) | |
| s = weights.sum() | |
| return tickers, weights / s if s > 0 else np.ones(len(tickers)) / max(1, len(tickers)) | |
| def fetch_price_series(ticker: str, period: str = "1y", interval: str = "1d") -> Optional[pd.Series]: | |
| """Fetches historical close prices for a ticker using yfinance.""" | |
| try: | |
| data = yf.Ticker(ticker) | |
| hist = data.history(period=period, interval=interval) | |
| return hist["Close"] if not hist.empty else None | |
| except Exception: | |
| return None | |
| def summarize_series(prices: pd.Series) -> Optional[Dict[str, float]]: | |
| """Computes daily log-return mu, sigma, and last price.""" | |
| if prices is None or len(prices) < 30: | |
| return None | |
| rets = np.log(prices / prices.shift(1)).dropna() | |
| if rets.empty: | |
| return None | |
| return { | |
| "mu": float(rets.mean()), | |
| "sigma": float(rets.std()), | |
| "last": float(prices.iloc[-1]), | |
| "n": len(rets) | |
| } | |
| def simulate_gbm(last_price: float, mu_d: float, sigma_d: float, days: int, paths: int, seed: Optional[int] = None) -> np.ndarray: | |
| """Simulates Geometric Brownian Motion paths.""" | |
| if seed is not None: | |
| np.random.seed(seed) | |
| drift = mu_d - 0.5 * sigma_d**2 | |
| shocks = np.random.normal(0.0, 1.0, size=(days, paths)) | |
| log_paths = np.cumsum(drift + sigma_d * shocks, axis=0) | |
| prices = last_price * np.exp(log_paths) | |
| return np.vstack([np.full((1, paths), last_price), prices]) | |
| def run_multi_ticker_mc(tickers: List[str], days: int, paths: int, period: str, seed: Optional[int] = None) -> Dict[str, Dict]: | |
| """Runs simulations for multiple tickers.""" | |
| results = {} | |
| for t in tickers: | |
| series = fetch_price_series(t, period=period) | |
| summary = summarize_series(series) | |
| if summary: | |
| sim = simulate_gbm(summary["last"], summary["mu"], summary["sigma"], days, paths, seed) | |
| results[t] = {"params": summary, "paths": sim} | |
| else: | |
| results[t] = {"params": None, "paths": None} | |
| return results | |
| def portfolio_index(sim_results: Dict[str, Dict], tickers: List[str], weights: np.ndarray) -> Optional[np.ndarray]: | |
| """Builds a weighted portfolio index from simulated paths.""" | |
| norm_paths = [] | |
| for i, t in enumerate(tickers): | |
| payload = sim_results.get(t, {}) | |
| if payload.get("paths") is not None and payload.get("params"): | |
| last = payload["params"]["last"] | |
| if last > 0: | |
| norm_paths.append((payload["paths"] / last) * weights[i]) | |
| return np.stack(norm_paths).sum(axis=0) if norm_paths else None | |
| # ---------- Presentation Helpers ---------- | |
| def assess_investment(p10: float, p50: float, p90: float, last_price: float, target_multiplier: float = 1.5) -> str: | |
| """Analyzes simulation results to classify an investment.""" | |
| if last_price <= 0: | |
| return 'NEUTRAL' | |
| target_price = last_price * target_multiplier | |
| median_check_pass = (p50 >= target_price) | |
| risk_check_pass = (p10 / last_price) >= 0.75 | |
| potential_loss = max(0, last_price - p10) | |
| potential_gain = max(0, p90 - last_price) | |
| if potential_loss == 0 and potential_gain > 0: | |
| risk_reward_pass = True | |
| elif potential_loss > 0: | |
| risk_reward_pass = (potential_gain / potential_loss) >= 2.0 | |
| else: | |
| risk_reward_pass = False | |
| if median_check_pass and risk_check_pass and risk_reward_pass: | |
| return 'GOOD' | |
| if not risk_check_pass or not median_check_pass: | |
| return 'POOR' | |
| return 'NEUTRAL' | |
| def make_plot(sim_results: Dict[str, Dict], port_paths: Optional[np.ndarray], port_json: Optional[Dict[str, float]]): | |
| """Plots paths and adds a visual assessment marker.""" | |
| fig, ax = plt.subplots(figsize=(9, 5)) | |
| for t, payload in sim_results.items(): | |
| if payload.get("paths") is not None: | |
| ax.plot(payload["paths"].mean(axis=1), label=t, alpha=0.6) | |
| if port_paths is not None: | |
| ax.plot(port_paths.mean(axis=1), label="Portfolio (weighted)", linewidth=3.0, color='black') | |
| ax.set_title("Monte Carlo Simulation – Expected Price Paths") | |
| ax.set_xlabel("Days Ahead") | |
| ax.set_ylabel("Price / Index") | |
| ax.legend() | |
| if port_json: | |
| assessment = assess_investment( | |
| port_json.get("portfolio_horizon_p10", 0), | |
| port_json.get("portfolio_horizon_p50", 0), | |
| port_json.get("portfolio_horizon_p90", 0), | |
| 1.0 # Portfolio index starts at 1.0 | |
| ) | |
| if assessment == 'GOOD': | |
| ax.text(0.95, 0.95, "✓", transform=ax.transAxes, fontsize=40, color='green', ha='right', va='top', bbox=dict(boxstyle='round,pad=0.2', fc='white', ec='green', alpha=0.8)) | |
| elif assessment == 'POOR': | |
| ax.text(0.95, 0.95, "✗", transform=ax.transAxes, fontsize=40, color='red', ha='right', va='top', bbox=dict(boxstyle='round,pad=0.2', fc='white', ec='red', alpha=0.8)) | |
| return fig | |
| def build_stats(sim_results: Dict[str, Dict], port_paths: Optional[np.ndarray]): | |
| """Builds statistical summaries from simulation results.""" | |
| rows, out = [], {} | |
| for t, payload in sim_results.items(): | |
| params, paths = payload.get("params"), payload.get("paths") | |
| if params and paths is not None: | |
| end_prices = paths[-1, :] | |
| p10, p50, p90 = np.percentile(end_prices, [10, 50, 90]) | |
| mu_annual = params["mu"] * TRADING_DAYS | |
| sigma_annual = params["sigma"] * math.sqrt(TRADING_DAYS) | |
| out[t] = { "last_close": params["last"], "daily_mu": params["mu"], "daily_sigma": params["sigma"], | |
| "annual_mu_approx": mu_annual, "annual_sigma_approx": sigma_annual, | |
| "horizon_p10": p10, "horizon_p50": p50, "horizon_p90": p90, "n_daily_obs": params["n"] } | |
| rows.append([t, params["last"], params["mu"], params["sigma"], mu_annual, sigma_annual, p10, p50, p90, params["n"]]) | |
| cols = ["Ticker", "Last Close", "Daily μ", "Daily σ", "Annual μ (≈)", "Annual σ (≈)", "Horizon P10", "Horizon P50", "Horizon P90", "# Daily Obs"] | |
| table = pd.DataFrame(rows, columns=cols) | |
| port_json = None | |
| if port_paths is not None: | |
| end_vals = port_paths[-1, :] | |
| p10, p50, p90 = np.percentile(end_vals, [10, 50, 90]) | |
| port_json = {"portfolio_horizon_p10": p10, "portfolio_horizon_p50": p50, "portfolio_horizon_p90": p90} | |
| return out, table, port_json | |
| # ---------- Gradio UI ---------- | |
| def handle_run(df_input: pd.DataFrame, horizon_days: int, paths: int, period: str, seed_text: str): | |
| """Main controller function for the UI.""" | |
| pairs = clean_table(df_input) | |
| if not pairs: | |
| return None, {}, pd.DataFrame(), {} | |
| tickers, weights = normalize_weights(pairs) | |
| seed = int(seed_text.strip()) if seed_text.strip().isdigit() else None | |
| sims = run_multi_ticker_mc(tickers, horizon_days, paths, period, seed) | |
| port_paths = portfolio_index(sims, tickers, weights) | |
| stats_json, stats_table, port_json = build_stats(sims, port_paths) | |
| fig = make_plot(sims, port_paths, port_json) | |
| port_summary = port_json or {} | |
| port_summary["normalized_weights"] = {t: w for t, w in zip(tickers, weights)} | |
| return fig, stats_json, stats_table, port_summary | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Monte Carlo Portfolio Simulator\n...") | |
| holdings_df = gr.Dataframe(headers=["Ticker", "Weight"], value=[["AAPL", 0.5], ["MSFT", 0.5]], row_count=(3, "dynamic"), col_count=(2, "fixed"), wrap=True, label="Holdings") | |
| with gr.Row(): | |
| horizon = gr.Slider(label="Horizon (days)", minimum=30, maximum=756, step=21, value=252) | |
| n_paths = gr.Slider(label="Number of paths", minimum=100, maximum=1000000, step=100, value=5000) | |
| period = gr.Dropdown(label="History window", choices=["6mo", "1y", "2y", "5y", "10y", "max"], value="1y") | |
| seed_box = gr.Textbox(label="Random seed (optional)", placeholder="e.g., 42") | |
| run_btn = gr.Button("Run Simulation", variant="primary") | |
| with gr.Row(): | |
| plot_output = gr.Plot(label="Expected Paths (Per-Ticker + Portfolio)") | |
| with gr.Row(): | |
| table_output = gr.Dataframe(label="Per-Ticker Stats") | |
| json_output = gr.JSON(label="JSON (Per-Ticker Stats)") | |
| json_port = gr.JSON(label="JSON (Portfolio Summary)") | |
| run_btn.click( | |
| handle_run, | |
| inputs=[holdings_df, horizon, n_paths, period, seed_box], | |
| outputs=[plot_output, json_output, table_output, json_port] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860))) |