financial_analyst / portfolio_calculator.py
Dmitry Beresnev
fix max tickers limit
0f98bc8
"""
Portfolio financial calculations module.
Handles:
- Fetching historical price data from yfinance
- Calculating portfolio weights
- Calculating log returns
- Computing covariance matrix
- Calculating portfolio variance and volatility
- Generating variance breakdown for detailed formulas
"""
from typing import Dict, List, Tuple, Optional
import numpy as np
import pandas as pd
import yfinance as yf
import streamlit as st
from concurrent.futures import ProcessPoolExecutor, as_completed
import logging
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from ratelimit import limits, sleep_and_retry
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Constants
TRADING_DAYS_PER_YEAR = 252
MIN_DATA_POINTS = 30
MAX_TICKERS = 100
# Rate limiter: Max 5 calls per 10 seconds per ticker (conservative for Yahoo Finance)
@sleep_and_retry
@limits(calls=5, period=10)
@retry(
stop=stop_after_attempt(3), # Max 3 retries
wait=wait_exponential(multiplier=1, min=2, max=60), # Exponential backoff: 2s, 4s, 8s, ... max 60s
retry=retry_if_exception_type((ConnectionError, TimeoutError)), # Only retry on network errors
reraise=True
)
def _fetch_with_retry(ticker_obj, period: str):
"""
Internal function with retry logic for fetching data.
IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid
yfinance timezone request that incorrectly handles 429 errors.
Args:
ticker_obj: yfinance Ticker object
period: Time period (e.g., '1y', '6mo', '3mo', '1mo')
Returns:
Historical data or None
"""
return ticker_obj.history(period=period)
def fetch_single_ticker(ticker: str, period: str = "1y") -> Tuple[str, Optional[pd.Series], Optional[str]]:
"""
Fetch historical data for a single ticker with rate limiting and exponential backoff.
This function runs in a separate process for parallel execution.
Uses tenacity for exponential backoff and ratelimit for request throttling.
IMPORTANT: Only uses 'period' parameter (not start/end dates) to avoid
yfinance timezone request that incorrectly handles 429 errors.
Args:
ticker: Ticker symbol
period: Time period for historical data (default: '1y')
Returns:
Tuple of (ticker, price_series, error_message)
"""
try:
# Create fresh Ticker object in this process
ticker_obj = yf.Ticker(ticker)
# Fallback periods to try (from longest to shortest)
# Avoid 'max' as it may trigger timezone request
periods_to_try = [period, '6mo', '3mo', '1mo']
for idx, try_period in enumerate(periods_to_try, 1):
try:
hist = _fetch_with_retry(ticker_obj, try_period)
if not hist.empty and len(hist) > 0:
logger.info(f"✅ {ticker}: Fetched {len(hist)} days (period: {try_period})")
return ticker, hist['Close'], None
except Exception as e:
logger.warning(f"⚠️ {ticker}: Period '{try_period}' failed - {str(e)}")
if idx == len(periods_to_try):
# Last attempt failed
logger.error(f"❌ {ticker}: All periods exhausted")
return ticker, None, f"All periods failed. Last error: {str(e)}"
# Continue to next period
# All periods failed
logger.error(f"❌ {ticker}: No data available after trying all periods")
return ticker, None, "No data available after all retry attempts"
except Exception as e:
logger.error(f"❌ {ticker}: Fatal error - {str(e)}")
return ticker, None, str(e)
@st.cache_data(ttl=3600) # Cache for 1 hour
def fetch_historical_data(
tickers: Tuple[str, ...], # Tuple for hashability (caching requirement)
period: str = "1y"
) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
"""
Fetch historical price data using yfinance.
Uses fallback strategy:
1. Try downloading all tickers together
2. If that fails, download one by one and combine
Args:
tickers: Tuple of ticker symbols (e.g., ('AAPL', 'GOOGL', 'MSFT'))
period: Time period for historical data (default: '1y')
Returns:
Tuple of (prices_dataframe, error_message)
- If successful: (DataFrame, None)
- If failed: (None, error_message)
"""
ticker_list = list(tickers)
# Strategy 1: Try downloading all tickers together
try:
data = yf.download(
ticker_list,
period=period,
progress=False,
threads=False, # Disable threading for better reliability
ignore_tz=True
)
if not data.empty:
# Extract 'Adj Close' prices
if len(ticker_list) == 1:
prices = data[['Adj Close']].copy()
prices.columns = ticker_list
else:
prices = data['Adj Close'].copy()
# Drop rows with NaN values
prices = prices.dropna()
if len(prices) >= MIN_DATA_POINTS:
return prices, None
except Exception as e:
# Log the error but continue to fallback strategy
logger.warning(f"Batch download failed: {str(e)}, trying individual downloads...")
# Strategy 2: Parallel download using ProcessPoolExecutor
st.info(f"📥 Fetching data for {len(ticker_list)} tickers in parallel...")
individual_prices = {}
failed_tickers = []
# Determine number of workers (max 4 to avoid overwhelming the API)
max_workers = min(len(ticker_list), 4)
try:
# Use ProcessPoolExecutor for true parallel execution
with ProcessPoolExecutor(max_workers=max_workers) as executor:
# Submit all ticker fetch jobs
future_to_ticker = {
executor.submit(fetch_single_ticker, ticker, period): ticker
for ticker in ticker_list
}
# Process results as they complete
completed = 0
for future in as_completed(future_to_ticker):
ticker = future_to_ticker[future]
completed += 1
try:
ticker_symbol, price_series, error = future.result(timeout=30)
if price_series is not None and not price_series.empty:
individual_prices[ticker_symbol] = price_series
st.success(f"✅ {ticker_symbol}: {len(price_series)} days ({completed}/{len(ticker_list)})")
else:
failed_tickers.append(ticker_symbol)
st.error(f"❌ {ticker_symbol}: {error or 'No data'} ({completed}/{len(ticker_list)})")
except Exception as e:
failed_tickers.append(ticker)
st.error(f"❌ {ticker}: {str(e)} ({completed}/{len(ticker_list)})")
except Exception as e:
st.error(f"Parallel processing error: {str(e)}")
# Fall back to empty result
pass
# Check if we got any data
if not individual_prices:
return None, f"Could not fetch data for any tickers. Failed: {', '.join(failed_tickers)}\n\nTip: Try using the JSON editor to enter a smaller portfolio first, or try again in a few minutes."
# Combine all individual price series
prices_df = pd.DataFrame(individual_prices)
# Drop rows with NaN values
prices_df = prices_df.dropna()
# Check we have enough data points
if len(prices_df) < MIN_DATA_POINTS:
return None, f"Insufficient data: only {len(prices_df)} days available (minimum {MIN_DATA_POINTS} required)"
# Warn about failed tickers
if failed_tickers:
st.warning(f"⚠️ Could not fetch data for: {', '.join(failed_tickers)}")
return prices_df, None
def calculate_log_returns(prices: pd.DataFrame) -> pd.DataFrame:
"""
Calculate log returns from price data.
Formula: r_t = ln(P_t / P_{t-1})
Args:
prices: DataFrame of historical prices (columns = tickers, index = dates)
Returns:
DataFrame of log returns (first row will be dropped due to NaN)
"""
# Calculate log returns: ln(price_t / price_{t-1})
returns = np.log(prices / prices.shift(1))
# Drop the first row (NaN)
returns = returns.dropna()
return returns
def calculate_portfolio_weights(amounts: Dict[str, float]) -> Dict[str, float]:
"""
Calculate portfolio weights from position amounts.
Formula: w_i = amount_i / sum(amounts)
Args:
amounts: Dictionary mapping tickers to dollar amounts
Returns:
Dictionary mapping tickers to weights (percentages as decimals)
"""
total = sum(amounts.values())
if total <= 0:
raise ValueError("Total portfolio amount must be positive")
weights = {ticker: amount / total for ticker, amount in amounts.items()}
# Validate weights sum to 1.0 (accounting for floating point errors)
weight_sum = sum(weights.values())
if not np.isclose(weight_sum, 1.0, atol=1e-6):
# Normalize to ensure exact sum = 1.0
weights = {ticker: w / weight_sum for ticker, w in weights.items()}
return weights
def calculate_covariance_matrix(returns: pd.DataFrame, annualized: bool = False) -> pd.DataFrame:
"""
Calculate covariance matrix of returns.
Args:
returns: DataFrame of log returns
annualized: If True, multiply by TRADING_DAYS_PER_YEAR (default: False)
Returns:
DataFrame of covariance matrix (tickers × tickers)
"""
cov_matrix = returns.cov()
if annualized:
cov_matrix = cov_matrix * TRADING_DAYS_PER_YEAR
return cov_matrix
def calculate_portfolio_variance(
weights: Dict[str, float],
cov_matrix: pd.DataFrame,
annualized: bool = True
) -> float:
"""
Calculate portfolio variance.
Formula: σ²_p = w^T × Σ × w
Where:
- w = vector of weights
- Σ = covariance matrix (annualized)
Args:
weights: Dictionary of portfolio weights
cov_matrix: Covariance matrix (daily, will be annualized if annualized=True)
annualized: If True, annualize the covariance matrix (default: True)
Returns:
Portfolio variance (annualized if annualized=True)
"""
# Ensure tickers are in same order
tickers = list(weights.keys())
# Create weight vector (as numpy array)
w = np.array([weights[ticker] for ticker in tickers])
# Get covariance matrix for these tickers
cov = cov_matrix.loc[tickers, tickers].values
# Annualize if requested
if annualized:
cov = cov * TRADING_DAYS_PER_YEAR
# Calculate variance: w^T × Σ × w
variance = w @ cov @ w
return float(variance)
def calculate_portfolio_volatility(variance: float) -> float:
"""
Calculate portfolio volatility (standard deviation).
Formula: σ_p = √(σ²_p)
Args:
variance: Portfolio variance
Returns:
Portfolio volatility (standard deviation)
"""
return float(np.sqrt(variance))
def get_variance_breakdown(
weights: Dict[str, float],
cov_matrix: pd.DataFrame,
annualized: bool = True
) -> List[Tuple[str, str, float, float, float, float]]:
"""
Generate detailed breakdown of variance calculation.
Returns a list of all variance components for the detailed formula expansion.
Args:
weights: Dictionary of portfolio weights
cov_matrix: Covariance matrix (daily)
annualized: If True, use annualized covariance (default: True)
Returns:
List of tuples: (ticker_i, ticker_j, w_i, w_j, cov_ij, contribution)
where contribution = w_i × w_j × cov_ij
"""
tickers = list(weights.keys())
n = len(tickers)
breakdown = []
for i, ticker_i in enumerate(tickers):
for j, ticker_j in enumerate(tickers):
w_i = weights[ticker_i]
w_j = weights[ticker_j]
# Get covariance value
cov_ij = cov_matrix.loc[ticker_i, ticker_j]
# Annualize if requested
if annualized:
cov_ij = cov_ij * TRADING_DAYS_PER_YEAR
# Calculate contribution to total variance
contribution = w_i * w_j * cov_ij
breakdown.append((ticker_i, ticker_j, w_i, w_j, cov_ij, contribution))
return breakdown
def get_portfolio_metrics(
amounts: Dict[str, float],
period: str = "1y"
) -> Tuple[Optional[Dict], Optional[str]]:
"""
Calculate all portfolio metrics in one go.
This is a convenience function that orchestrates all calculations.
Args:
amounts: Dictionary of {ticker: amount}
period: Historical data period (default: '1y')
Returns:
Tuple of (metrics_dict, error_message)
metrics_dict contains:
- weights: Dict[str, float]
- prices: pd.DataFrame
- returns: pd.DataFrame
- cov_matrix: pd.DataFrame
- variance: float
- volatility: float
- variance_breakdown: List[Tuple]
"""
try:
tickers = list(amounts.keys())
# 1. Calculate weights
weights = calculate_portfolio_weights(amounts)
# 2. Fetch historical data (convert to tuple for caching)
prices, error = fetch_historical_data(tuple(tickers), period)
if error:
return None, error
# 3. Calculate returns
returns = calculate_log_returns(prices)
# 4. Calculate covariance matrix
cov_matrix = calculate_covariance_matrix(returns, annualized=False)
# 5. Calculate variance
variance = calculate_portfolio_variance(weights, cov_matrix, annualized=True)
# 6. Calculate volatility
volatility = calculate_portfolio_volatility(variance)
# 7. Get variance breakdown
variance_breakdown = get_variance_breakdown(weights, cov_matrix, annualized=True)
metrics = {
'weights': weights,
'prices': prices,
'returns': returns,
'cov_matrix': cov_matrix,
'variance': variance,
'volatility': volatility,
'variance_breakdown': variance_breakdown,
}
return metrics, None
except Exception as e:
return None, f"Error calculating portfolio metrics: {str(e)}"