| | """ |
| | Portfolio financial calculations module. |
| | |
| | Handles: |
| | - Fetching historical price data from yfinance |
| | - Calculating portfolio weights |
| | - Calculating log returns |
| | - Computing covariance matrix |
| | - Calculating portfolio variance and volatility |
| | - Generating variance breakdown for detailed formulas |
| | """ |
| |
|
| | from typing import Dict, List, Tuple, Optional |
| | import numpy as np |
| | import pandas as pd |
| | import yfinance as yf |
| | import streamlit as st |
| | from concurrent.futures import ProcessPoolExecutor, as_completed |
| | import logging |
| | from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type |
| | from ratelimit import limits, sleep_and_retry |
| |
|
| | |
| | logging.basicConfig(level=logging.INFO) |
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | |
| | TRADING_DAYS_PER_YEAR = 252 |
| | MIN_DATA_POINTS = 30 |
| | MAX_TICKERS = 100 |
| |
|
| |
|
| | |
| | @sleep_and_retry |
| | @limits(calls=5, period=10) |
| | @retry( |
| | stop=stop_after_attempt(3), |
| | wait=wait_exponential(multiplier=1, min=2, max=60), |
| | retry=retry_if_exception_type((ConnectionError, TimeoutError)), |
| | reraise=True |
| | ) |
| | def _fetch_with_retry(ticker_obj, period: str): |
| | """ |
| | Internal function with retry logic for fetching data. |
| | |
| | IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid |
| | yfinance timezone request that incorrectly handles 429 errors. |
| | |
| | Args: |
| | ticker_obj: yfinance Ticker object |
| | period: Time period (e.g., '1y', '6mo', '3mo', '1mo') |
| | |
| | Returns: |
| | Historical data or None |
| | """ |
| | return ticker_obj.history(period=period) |
| |
|
| |
|
| | def fetch_single_ticker(ticker: str, period: str = "1y") -> Tuple[str, Optional[pd.Series], Optional[str]]: |
| | """ |
| | Fetch historical data for a single ticker with rate limiting and exponential backoff. |
| | |
| | This function runs in a separate process for parallel execution. |
| | Uses tenacity for exponential backoff and ratelimit for request throttling. |
| | |
| | IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid |
| | yfinance timezone request that incorrectly handles 429 errors. |
| | |
| | Args: |
| | ticker: Ticker symbol |
| | period: Time period for historical data (default: '1y') |
| | |
| | Returns: |
| | Tuple of (ticker, price_series, error_message) |
| | """ |
| | try: |
| | |
| | ticker_obj = yf.Ticker(ticker) |
| |
|
| | |
| | |
| | periods_to_try = [period, '6mo', '3mo', '1mo'] |
| |
|
| | for idx, try_period in enumerate(periods_to_try, 1): |
| | try: |
| | hist = _fetch_with_retry(ticker_obj, try_period) |
| | if not hist.empty and len(hist) > 0: |
| | logger.info(f"✅ {ticker}: Fetched {len(hist)} days (period: {try_period})") |
| | return ticker, hist['Close'], None |
| | except Exception as e: |
| | logger.warning(f"⚠️ {ticker}: Period '{try_period}' failed - {str(e)}") |
| | if idx == len(periods_to_try): |
| | |
| | logger.error(f"❌ {ticker}: All periods exhausted") |
| | return ticker, None, f"All periods failed. Last error: {str(e)}" |
| | |
| |
|
| | |
| | logger.error(f"❌ {ticker}: No data available after trying all periods") |
| | return ticker, None, "No data available after all retry attempts" |
| |
|
| | except Exception as e: |
| | logger.error(f"❌ {ticker}: Fatal error - {str(e)}") |
| | return ticker, None, str(e) |
| |
|
| |
|
| | @st.cache_data(ttl=3600) |
| | def fetch_historical_data( |
| | tickers: Tuple[str, ...], |
| | period: str = "1y" |
| | ) -> Tuple[Optional[pd.DataFrame], Optional[str]]: |
| | """ |
| | Fetch historical price data using yfinance. |
| | |
| | Uses fallback strategy: |
| | 1. Try downloading all tickers together |
| | 2. If that fails, download one by one and combine |
| | |
| | Args: |
| | tickers: Tuple of ticker symbols (e.g., ('AAPL', 'GOOGL', 'MSFT')) |
| | period: Time period for historical data (default: '1y') |
| | |
| | Returns: |
| | Tuple of (prices_dataframe, error_message) |
| | - If successful: (DataFrame, None) |
| | - If failed: (None, error_message) |
| | """ |
| | ticker_list = list(tickers) |
| |
|
| | |
| | try: |
| | data = yf.download( |
| | ticker_list, |
| | period=period, |
| | progress=False, |
| | threads=False, |
| | ignore_tz=True |
| | ) |
| |
|
| | if not data.empty: |
| | |
| | if len(ticker_list) == 1: |
| | prices = data[['Adj Close']].copy() |
| | prices.columns = ticker_list |
| | else: |
| | prices = data['Adj Close'].copy() |
| |
|
| | |
| | prices = prices.dropna() |
| |
|
| | if len(prices) >= MIN_DATA_POINTS: |
| | return prices, None |
| |
|
| | except Exception as e: |
| | |
| | logger.warning(f"Batch download failed: {str(e)}, trying individual downloads...") |
| |
|
| | |
| | st.info(f"📥 Fetching data for {len(ticker_list)} tickers in parallel...") |
| |
|
| | individual_prices = {} |
| | failed_tickers = [] |
| |
|
| | |
| | max_workers = min(len(ticker_list), 4) |
| |
|
| | try: |
| | |
| | with ProcessPoolExecutor(max_workers=max_workers) as executor: |
| | |
| | future_to_ticker = { |
| | executor.submit(fetch_single_ticker, ticker, period): ticker |
| | for ticker in ticker_list |
| | } |
| |
|
| | |
| | completed = 0 |
| | for future in as_completed(future_to_ticker): |
| | ticker = future_to_ticker[future] |
| | completed += 1 |
| |
|
| | try: |
| | ticker_symbol, price_series, error = future.result(timeout=30) |
| |
|
| | if price_series is not None and not price_series.empty: |
| | individual_prices[ticker_symbol] = price_series |
| | st.success(f"✅ {ticker_symbol}: {len(price_series)} days ({completed}/{len(ticker_list)})") |
| | else: |
| | failed_tickers.append(ticker_symbol) |
| | st.error(f"❌ {ticker_symbol}: {error or 'No data'} ({completed}/{len(ticker_list)})") |
| |
|
| | except Exception as e: |
| | failed_tickers.append(ticker) |
| | st.error(f"❌ {ticker}: {str(e)} ({completed}/{len(ticker_list)})") |
| |
|
| | except Exception as e: |
| | st.error(f"Parallel processing error: {str(e)}") |
| | |
| | pass |
| |
|
| | |
| | if not individual_prices: |
| | return None, f"Could not fetch data for any tickers. Failed: {', '.join(failed_tickers)}\n\nTip: Try using the JSON editor to enter a smaller portfolio first, or try again in a few minutes." |
| |
|
| | |
| | prices_df = pd.DataFrame(individual_prices) |
| |
|
| | |
| | prices_df = prices_df.dropna() |
| |
|
| | |
| | if len(prices_df) < MIN_DATA_POINTS: |
| | return None, f"Insufficient data: only {len(prices_df)} days available (minimum {MIN_DATA_POINTS} required)" |
| |
|
| | |
| | if failed_tickers: |
| | st.warning(f"⚠️ Could not fetch data for: {', '.join(failed_tickers)}") |
| |
|
| | return prices_df, None |
| |
|
| |
|
| | def calculate_log_returns(prices: pd.DataFrame) -> pd.DataFrame: |
| | """ |
| | Calculate log returns from price data. |
| | |
| | Formula: r_t = ln(P_t / P_{t-1}) |
| | |
| | Args: |
| | prices: DataFrame of historical prices (columns = tickers, index = dates) |
| | |
| | Returns: |
| | DataFrame of log returns (first row will be dropped due to NaN) |
| | """ |
| | |
| | returns = np.log(prices / prices.shift(1)) |
| |
|
| | |
| | returns = returns.dropna() |
| |
|
| | return returns |
| |
|
| |
|
| | def calculate_portfolio_weights(amounts: Dict[str, float]) -> Dict[str, float]: |
| | """ |
| | Calculate portfolio weights from position amounts. |
| | |
| | Formula: w_i = amount_i / sum(amounts) |
| | |
| | Args: |
| | amounts: Dictionary mapping tickers to dollar amounts |
| | |
| | Returns: |
| | Dictionary mapping tickers to weights (percentages as decimals) |
| | """ |
| | total = sum(amounts.values()) |
| |
|
| | if total <= 0: |
| | raise ValueError("Total portfolio amount must be positive") |
| |
|
| | weights = {ticker: amount / total for ticker, amount in amounts.items()} |
| |
|
| | |
| | weight_sum = sum(weights.values()) |
| | if not np.isclose(weight_sum, 1.0, atol=1e-6): |
| | |
| | weights = {ticker: w / weight_sum for ticker, w in weights.items()} |
| |
|
| | return weights |
| |
|
| |
|
| | def calculate_covariance_matrix(returns: pd.DataFrame, annualized: bool = False) -> pd.DataFrame: |
| | """ |
| | Calculate covariance matrix of returns. |
| | |
| | Args: |
| | returns: DataFrame of log returns |
| | annualized: If True, multiply by TRADING_DAYS_PER_YEAR (default: False) |
| | |
| | Returns: |
| | DataFrame of covariance matrix (tickers × tickers) |
| | """ |
| | cov_matrix = returns.cov() |
| |
|
| | if annualized: |
| | cov_matrix = cov_matrix * TRADING_DAYS_PER_YEAR |
| |
|
| | return cov_matrix |
| |
|
| |
|
| | def calculate_portfolio_variance( |
| | weights: Dict[str, float], |
| | cov_matrix: pd.DataFrame, |
| | annualized: bool = True |
| | ) -> float: |
| | """ |
| | Calculate portfolio variance. |
| | |
| | Formula: σ²_p = w^T × Σ × w |
| | |
| | Where: |
| | - w = vector of weights |
| | - Σ = covariance matrix (annualized) |
| | |
| | Args: |
| | weights: Dictionary of portfolio weights |
| | cov_matrix: Covariance matrix (daily, will be annualized if annualized=True) |
| | annualized: If True, annualize the covariance matrix (default: True) |
| | |
| | Returns: |
| | Portfolio variance (annualized if annualized=True) |
| | """ |
| | |
| | tickers = list(weights.keys()) |
| |
|
| | |
| | w = np.array([weights[ticker] for ticker in tickers]) |
| |
|
| | |
| | cov = cov_matrix.loc[tickers, tickers].values |
| |
|
| | |
| | if annualized: |
| | cov = cov * TRADING_DAYS_PER_YEAR |
| |
|
| | |
| | variance = w @ cov @ w |
| |
|
| | return float(variance) |
| |
|
| |
|
| | def calculate_portfolio_volatility(variance: float) -> float: |
| | """ |
| | Calculate portfolio volatility (standard deviation). |
| | |
| | Formula: σ_p = √(σ²_p) |
| | |
| | Args: |
| | variance: Portfolio variance |
| | |
| | Returns: |
| | Portfolio volatility (standard deviation) |
| | """ |
| | return float(np.sqrt(variance)) |
| |
|
| |
|
| | def get_variance_breakdown( |
| | weights: Dict[str, float], |
| | cov_matrix: pd.DataFrame, |
| | annualized: bool = True |
| | ) -> List[Tuple[str, str, float, float, float, float]]: |
| | """ |
| | Generate detailed breakdown of variance calculation. |
| | |
| | Returns a list of all variance components for the detailed formula expansion. |
| | |
| | Args: |
| | weights: Dictionary of portfolio weights |
| | cov_matrix: Covariance matrix (daily) |
| | annualized: If True, use annualized covariance (default: True) |
| | |
| | Returns: |
| | List of tuples: (ticker_i, ticker_j, w_i, w_j, cov_ij, contribution) |
| | where contribution = w_i × w_j × cov_ij |
| | """ |
| | tickers = list(weights.keys()) |
| | n = len(tickers) |
| |
|
| | breakdown = [] |
| |
|
| | for i, ticker_i in enumerate(tickers): |
| | for j, ticker_j in enumerate(tickers): |
| | w_i = weights[ticker_i] |
| | w_j = weights[ticker_j] |
| |
|
| | |
| | cov_ij = cov_matrix.loc[ticker_i, ticker_j] |
| |
|
| | |
| | if annualized: |
| | cov_ij = cov_ij * TRADING_DAYS_PER_YEAR |
| |
|
| | |
| | contribution = w_i * w_j * cov_ij |
| |
|
| | breakdown.append((ticker_i, ticker_j, w_i, w_j, cov_ij, contribution)) |
| |
|
| | return breakdown |
| |
|
| |
|
| | def get_portfolio_metrics( |
| | amounts: Dict[str, float], |
| | period: str = "1y" |
| | ) -> Tuple[Optional[Dict], Optional[str]]: |
| | """ |
| | Calculate all portfolio metrics in one go. |
| | |
| | This is a convenience function that orchestrates all calculations. |
| | |
| | Args: |
| | amounts: Dictionary of {ticker: amount} |
| | period: Historical data period (default: '1y') |
| | |
| | Returns: |
| | Tuple of (metrics_dict, error_message) |
| | |
| | metrics_dict contains: |
| | - weights: Dict[str, float] |
| | - prices: pd.DataFrame |
| | - returns: pd.DataFrame |
| | - cov_matrix: pd.DataFrame |
| | - variance: float |
| | - volatility: float |
| | - variance_breakdown: List[Tuple] |
| | """ |
| | try: |
| | tickers = list(amounts.keys()) |
| |
|
| | |
| | weights = calculate_portfolio_weights(amounts) |
| |
|
| | |
| | prices, error = fetch_historical_data(tuple(tickers), period) |
| | if error: |
| | return None, error |
| |
|
| | |
| | returns = calculate_log_returns(prices) |
| |
|
| | |
| | cov_matrix = calculate_covariance_matrix(returns, annualized=False) |
| |
|
| | |
| | variance = calculate_portfolio_variance(weights, cov_matrix, annualized=True) |
| |
|
| | |
| | volatility = calculate_portfolio_volatility(variance) |
| |
|
| | |
| | variance_breakdown = get_variance_breakdown(weights, cov_matrix, annualized=True) |
| |
|
| | metrics = { |
| | 'weights': weights, |
| | 'prices': prices, |
| | 'returns': returns, |
| | 'cov_matrix': cov_matrix, |
| | 'variance': variance, |
| | 'volatility': volatility, |
| | 'variance_breakdown': variance_breakdown, |
| | } |
| |
|
| | return metrics, None |
| |
|
| | except Exception as e: |
| | return None, f"Error calculating portfolio metrics: {str(e)}" |
| |
|