Rajan Sharma
Update app.py
1ab293d verified
# app.py — Yahoo Finance–based Monte Carlo Simulator with Weights & Table
from __future__ import annotations
import os
import math
from typing import Dict, List, Optional, Tuple
import gradio as gr
import yfinance as yf
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
TRADING_DAYS = 252
# ---------- Data helpers ----------
def clean_table(df: pd.DataFrame) -> List[Tuple[str, Optional[float]]]:
"""
Accepts a DataFrame with columns ["Ticker", "Weight"] and returns a cleaned
list of (ticker, weight_or_None) rows, dropping blanks/NaNs in Ticker.
"""
if df is None or df.empty:
return []
cols = [c.strip().lower() for c in df.columns.tolist()]
colmap = {c: i for i, c in enumerate(cols)}
tk_col = colmap.get("ticker", 0)
wt_col = colmap.get("weight", 1)
rows = []
for _, row in df.iterrows():
raw_tk = str(row.iloc[tk_col]).strip()
if not raw_tk or raw_tk.lower() in ("nan", "none"):
continue
ticker = raw_tk.upper()
weight = None
try:
w = row.iloc[wt_col]
if pd.notna(w):
weight = float(w)
except (ValueError, IndexError):
weight = None
rows.append((ticker, weight))
return rows
def normalize_weights(pairs: List[Tuple[str, Optional[float]]]) -> Tuple[List[str], np.ndarray]:
"""
Normalizes weights, defaulting to equal weights if none are provided.
"""
tickers = [t for t, _ in pairs]
raw = np.array([w for _, w in pairs], dtype=object)
if np.all(raw == None):
return tickers, np.ones(len(tickers)) / max(1, len(tickers))
weights = np.array([0.0 if w is None else float(w) for _, w in pairs])
weights = np.clip(weights, 0, None)
s = weights.sum()
return tickers, weights / s if s > 0 else np.ones(len(tickers)) / max(1, len(tickers))
def fetch_price_series(ticker: str, period: str = "1y", interval: str = "1d") -> Optional[pd.Series]:
"""Fetches historical close prices for a ticker using yfinance."""
try:
data = yf.Ticker(ticker)
hist = data.history(period=period, interval=interval)
return hist["Close"] if not hist.empty else None
except Exception:
return None
def summarize_series(prices: pd.Series) -> Optional[Dict[str, float]]:
"""Computes daily log-return mu, sigma, and last price."""
if prices is None or len(prices) < 30:
return None
rets = np.log(prices / prices.shift(1)).dropna()
if rets.empty:
return None
return {
"mu": float(rets.mean()),
"sigma": float(rets.std()),
"last": float(prices.iloc[-1]),
"n": len(rets)
}
def simulate_gbm(last_price: float, mu_d: float, sigma_d: float, days: int, paths: int, seed: Optional[int] = None) -> np.ndarray:
"""Simulates Geometric Brownian Motion paths."""
if seed is not None:
np.random.seed(seed)
drift = mu_d - 0.5 * sigma_d**2
shocks = np.random.normal(0.0, 1.0, size=(days, paths))
log_paths = np.cumsum(drift + sigma_d * shocks, axis=0)
prices = last_price * np.exp(log_paths)
return np.vstack([np.full((1, paths), last_price), prices])
def run_multi_ticker_mc(tickers: List[str], days: int, paths: int, period: str, seed: Optional[int] = None) -> Dict[str, Dict]:
"""Runs simulations for multiple tickers."""
results = {}
for t in tickers:
series = fetch_price_series(t, period=period)
summary = summarize_series(series)
if summary:
sim = simulate_gbm(summary["last"], summary["mu"], summary["sigma"], days, paths, seed)
results[t] = {"params": summary, "paths": sim}
else:
results[t] = {"params": None, "paths": None}
return results
def portfolio_index(sim_results: Dict[str, Dict], tickers: List[str], weights: np.ndarray) -> Optional[np.ndarray]:
"""Builds a weighted portfolio index from simulated paths."""
norm_paths = []
for i, t in enumerate(tickers):
payload = sim_results.get(t, {})
if payload.get("paths") is not None and payload.get("params"):
last = payload["params"]["last"]
if last > 0:
norm_paths.append((payload["paths"] / last) * weights[i])
return np.stack(norm_paths).sum(axis=0) if norm_paths else None
# ---------- Presentation Helpers ----------
def assess_investment(p10: float, p50: float, p90: float, last_price: float, target_multiplier: float = 1.5) -> str:
"""Analyzes simulation results to classify an investment."""
if last_price <= 0:
return 'NEUTRAL'
target_price = last_price * target_multiplier
median_check_pass = (p50 >= target_price)
risk_check_pass = (p10 / last_price) >= 0.75
potential_loss = max(0, last_price - p10)
potential_gain = max(0, p90 - last_price)
if potential_loss == 0 and potential_gain > 0:
risk_reward_pass = True
elif potential_loss > 0:
risk_reward_pass = (potential_gain / potential_loss) >= 2.0
else:
risk_reward_pass = False
if median_check_pass and risk_check_pass and risk_reward_pass:
return 'GOOD'
if not risk_check_pass or not median_check_pass:
return 'POOR'
return 'NEUTRAL'
def make_plot(sim_results: Dict[str, Dict], port_paths: Optional[np.ndarray], port_json: Optional[Dict[str, float]]):
"""Plots paths and adds a visual assessment marker."""
fig, ax = plt.subplots(figsize=(9, 5))
for t, payload in sim_results.items():
if payload.get("paths") is not None:
ax.plot(payload["paths"].mean(axis=1), label=t, alpha=0.6)
if port_paths is not None:
ax.plot(port_paths.mean(axis=1), label="Portfolio (weighted)", linewidth=3.0, color='black')
ax.set_title("Monte Carlo Simulation – Expected Price Paths")
ax.set_xlabel("Days Ahead")
ax.set_ylabel("Price / Index")
ax.legend()
if port_json:
assessment = assess_investment(
port_json.get("portfolio_horizon_p10", 0),
port_json.get("portfolio_horizon_p50", 0),
port_json.get("portfolio_horizon_p90", 0),
1.0 # Portfolio index starts at 1.0
)
if assessment == 'GOOD':
ax.text(0.95, 0.95, "✓", transform=ax.transAxes, fontsize=40, color='green', ha='right', va='top', bbox=dict(boxstyle='round,pad=0.2', fc='white', ec='green', alpha=0.8))
elif assessment == 'POOR':
ax.text(0.95, 0.95, "✗", transform=ax.transAxes, fontsize=40, color='red', ha='right', va='top', bbox=dict(boxstyle='round,pad=0.2', fc='white', ec='red', alpha=0.8))
return fig
def build_stats(sim_results: Dict[str, Dict], port_paths: Optional[np.ndarray]):
"""Builds statistical summaries from simulation results."""
rows, out = [], {}
for t, payload in sim_results.items():
params, paths = payload.get("params"), payload.get("paths")
if params and paths is not None:
end_prices = paths[-1, :]
p10, p50, p90 = np.percentile(end_prices, [10, 50, 90])
mu_annual = params["mu"] * TRADING_DAYS
sigma_annual = params["sigma"] * math.sqrt(TRADING_DAYS)
out[t] = { "last_close": params["last"], "daily_mu": params["mu"], "daily_sigma": params["sigma"],
"annual_mu_approx": mu_annual, "annual_sigma_approx": sigma_annual,
"horizon_p10": p10, "horizon_p50": p50, "horizon_p90": p90, "n_daily_obs": params["n"] }
rows.append([t, params["last"], params["mu"], params["sigma"], mu_annual, sigma_annual, p10, p50, p90, params["n"]])
cols = ["Ticker", "Last Close", "Daily μ", "Daily σ", "Annual μ (≈)", "Annual σ (≈)", "Horizon P10", "Horizon P50", "Horizon P90", "# Daily Obs"]
table = pd.DataFrame(rows, columns=cols)
port_json = None
if port_paths is not None:
end_vals = port_paths[-1, :]
p10, p50, p90 = np.percentile(end_vals, [10, 50, 90])
port_json = {"portfolio_horizon_p10": p10, "portfolio_horizon_p50": p50, "portfolio_horizon_p90": p90}
return out, table, port_json
# ---------- Gradio UI ----------
def handle_run(df_input: pd.DataFrame, horizon_days: int, paths: int, period: str, seed_text: str):
"""Main controller function for the UI."""
pairs = clean_table(df_input)
if not pairs:
return None, {}, pd.DataFrame(), {}
tickers, weights = normalize_weights(pairs)
seed = int(seed_text.strip()) if seed_text.strip().isdigit() else None
sims = run_multi_ticker_mc(tickers, horizon_days, paths, period, seed)
port_paths = portfolio_index(sims, tickers, weights)
stats_json, stats_table, port_json = build_stats(sims, port_paths)
fig = make_plot(sims, port_paths, port_json)
port_summary = port_json or {}
port_summary["normalized_weights"] = {t: w for t, w in zip(tickers, weights)}
return fig, stats_json, stats_table, port_summary
with gr.Blocks() as demo:
gr.Markdown("# Monte Carlo Portfolio Simulator\n...")
holdings_df = gr.Dataframe(headers=["Ticker", "Weight"], value=[["AAPL", 0.5], ["MSFT", 0.5]], row_count=(3, "dynamic"), col_count=(2, "fixed"), wrap=True, label="Holdings")
with gr.Row():
horizon = gr.Slider(label="Horizon (days)", minimum=30, maximum=756, step=21, value=252)
n_paths = gr.Slider(label="Number of paths", minimum=100, maximum=1000000, step=100, value=5000)
period = gr.Dropdown(label="History window", choices=["6mo", "1y", "2y", "5y", "10y", "max"], value="1y")
seed_box = gr.Textbox(label="Random seed (optional)", placeholder="e.g., 42")
run_btn = gr.Button("Run Simulation", variant="primary")
with gr.Row():
plot_output = gr.Plot(label="Expected Paths (Per-Ticker + Portfolio)")
with gr.Row():
table_output = gr.Dataframe(label="Per-Ticker Stats")
json_output = gr.JSON(label="JSON (Per-Ticker Stats)")
json_port = gr.JSON(label="JSON (Portfolio Summary)")
run_btn.click(
handle_run,
inputs=[holdings_df, horizon, n_paths, period, seed_box],
outputs=[plot_output, json_output, table_output, json_port]
)
if __name__ == "__main__":
demo.launch(server_name="0.0.0.0", server_port=int(os.environ.get("PORT", 7860)))