data-analysis-platform / data_handler.py
entropy25's picture
Update data_handler.py
ed4ea1f verified
import streamlit as st
import pandas as pd
import numpy as np
import warnings
from typing import Dict, List, Any, Tuple
from scipy import stats
warnings.filterwarnings('ignore')
# All cached data processing functions
@st.cache_data
def load_csv_with_encoding(file_content: bytes, filename: str) -> pd.DataFrame:
"""Load CSV with automatic encoding detection - cached"""
import chardet
detected = chardet.detect(file_content)
encoding = detected['encoding']
try:
from io import BytesIO
return pd.read_csv(BytesIO(file_content), encoding=encoding)
except:
encodings = ['utf-8', 'latin-1', 'cp1252', 'iso-8859-1']
for enc in encodings:
try:
return pd.read_csv(BytesIO(file_content), encoding=enc)
except:
continue
raise Exception("Cannot read file with any encoding")
@st.cache_data
def load_excel_file(file_content: bytes) -> pd.DataFrame:
"""Load Excel file - cached"""
from io import BytesIO
return pd.read_excel(BytesIO(file_content))
@st.cache_data
def calculate_basic_stats(df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate basic statistics - cached"""
dtype_counts = df.dtypes.value_counts()
dtype_dict = {str(k): int(v) for k, v in dtype_counts.items()}
return {
'shape': df.shape,
'memory_usage': float(df.memory_usage(deep=True).sum() / 1024**2),
'missing_values': int(df.isnull().sum().sum()),
'dtypes': dtype_dict,
'duplicates': int(df.duplicated().sum())
}
@st.cache_data
def calculate_column_cardinality(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate column cardinality analysis - cached"""
cardinality_data = []
for col in df.columns:
unique_count = df[col].nunique()
unique_ratio = unique_count / len(df)
# Determine column type based on cardinality
if unique_count == 1:
col_type = "Constant"
elif unique_count == len(df):
col_type = "Unique Identifier"
elif unique_ratio < 0.05:
col_type = "Low Cardinality"
elif unique_ratio < 0.5:
col_type = "Medium Cardinality"
else:
col_type = "High Cardinality"
cardinality_data.append({
'Column': col,
'Unique Count': unique_count,
'Unique Ratio': unique_ratio,
'Type': col_type,
'Data Type': str(df[col].dtype)
})
return pd.DataFrame(cardinality_data)
@st.cache_data
def calculate_memory_optimization(df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate memory optimization suggestions - cached"""
suggestions = []
current_memory = df.memory_usage(deep=True).sum() / 1024**2
potential_savings = 0
for col in df.columns:
if df[col].dtype == 'object':
unique_ratio = df[col].nunique() / len(df)
if unique_ratio < 0.5: # Less than 50% unique values
# Estimate category memory usage
category_memory = df[col].astype('category').memory_usage(deep=True)
object_memory = df[col].memory_usage(deep=True)
savings = (object_memory - category_memory) / 1024**2
if savings > 0.1: # More than 0.1MB savings
suggestions.append({
'column': col,
'current_type': 'object',
'suggested_type': 'category',
'savings_mb': savings
})
potential_savings += savings
return {
'suggestions': suggestions,
'current_memory_mb': current_memory,
'potential_savings_mb': potential_savings,
'potential_savings_pct': (potential_savings / current_memory) * 100 if current_memory > 0 else 0
}
@st.cache_data
def calculate_missing_data(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate missing data analysis - cached"""
missing_data = df.isnull().sum()
if missing_data.sum() > 0:
missing_df = pd.DataFrame({
'Column': missing_data.index,
'Missing Count': missing_data.values,
'Missing %': (missing_data.values / len(df)) * 100
})
return missing_df[missing_df['Missing Count'] > 0].sort_values('Missing %', ascending=False)
return pd.DataFrame()
@st.cache_data
def calculate_correlation_matrix(df: pd.DataFrame) -> pd.DataFrame:
"""Calculate correlation matrix - cached"""
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
return df[numeric_cols].corr() if len(numeric_cols) > 1 else pd.DataFrame()
@st.cache_data
def get_column_types(df: pd.DataFrame) -> Dict[str, List[str]]:
"""Get column types - cached"""
return {
'numeric': df.select_dtypes(include=[np.number]).columns.tolist(),
'categorical': df.select_dtypes(include=['object']).columns.tolist(),
'datetime': df.select_dtypes(include=['datetime64']).columns.tolist()
}
@st.cache_data
def calculate_numeric_stats(df: pd.DataFrame, column: str) -> Dict[str, float]:
"""Calculate enhanced numeric statistics - cached"""
series = df[column].dropna()
return {
'mean': series.mean(),
'median': series.median(),
'std': series.std(),
'skewness': series.skew(),
'kurtosis': series.kurtosis(),
'min': series.min(),
'max': series.max(),
'q25': series.quantile(0.25),
'q75': series.quantile(0.75)
}
@st.cache_data
def calculate_outliers(df: pd.DataFrame, column: str) -> pd.DataFrame:
"""Calculate outliers using IQR method - cached"""
Q1 = df[column].quantile(0.25)
Q3 = df[column].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
return df[(df[column] < lower_bound) | (df[column] > upper_bound)]
@st.cache_data
def detect_mixed_types(df: pd.DataFrame) -> List[Dict[str, Any]]:
"""Detect columns with mixed data types - cached"""
mixed_type_issues = []
for col in df.select_dtypes(include=['object']).columns:
# Try to convert to numeric
numeric_conversion = pd.to_numeric(df[col], errors='coerce')
new_nulls = numeric_conversion.isnull().sum() - df[col].isnull().sum()
if new_nulls > 0:
mixed_type_issues.append({
'column': col,
'problematic_values': new_nulls,
'total_values': len(df[col]),
'percentage': (new_nulls / len(df[col])) * 100
})
return mixed_type_issues
@st.cache_data
def get_value_counts(df: pd.DataFrame, column: str, top_n: int = 10) -> pd.Series:
"""Get value counts for categorical column - cached"""
return df[column].value_counts().head(top_n)
@st.cache_data
def calculate_crosstab(df: pd.DataFrame, col1: str, col2: str) -> pd.DataFrame:
"""Calculate crosstab between two categorical columns - cached"""
return pd.crosstab(df[col1], df[col2])
@st.cache_data
def calculate_group_stats(df: pd.DataFrame, group_col: str, metric_col: str) -> pd.DataFrame:
"""Calculate group statistics - cached"""
return df.groupby(group_col)[metric_col].agg(['mean', 'median', 'std', 'count'])
@st.cache_data
def calculate_data_quality_score(df: pd.DataFrame) -> Dict[str, Any]:
"""Calculate overall data quality score - cached"""
score = 100
issues = []
# Missing values penalty
missing_pct = (df.isnull().sum().sum() / (len(df) * len(df.columns))) * 100
if missing_pct > 0:
penalty = min(30, missing_pct * 2) # Max 30 points penalty
score -= penalty
issues.append(f"Missing values: {missing_pct:.1f}%")
# Duplicates penalty
duplicate_pct = (df.duplicated().sum() / len(df)) * 100
if duplicate_pct > 0:
penalty = min(20, duplicate_pct * 4) # Max 20 points penalty
score -= penalty
issues.append(f"Duplicate rows: {duplicate_pct:.1f}%")
# Constant columns penalty
constant_cols = [col for col in df.columns if df[col].nunique() == 1]
if constant_cols:
penalty = min(10, len(constant_cols) * 2)
score -= penalty
issues.append(f"Constant columns: {len(constant_cols)}")
# Mixed types penalty
mixed_types = detect_mixed_types(df)
if mixed_types:
penalty = min(10, len(mixed_types) * 3)
score -= penalty
issues.append(f"Mixed type columns: {len(mixed_types)}")
return {
'score': max(0, score),
'issues': issues,
'grade': 'A' if score >= 90 else 'B' if score >= 80 else 'C' if score >= 70 else 'D' if score >= 60 else 'F'
}
def load_data(uploaded_file):
"""Unified data loading function"""
file_content = uploaded_file.read()
uploaded_file.seek(0)
if uploaded_file.name.endswith('.csv'):
return load_csv_with_encoding(file_content, uploaded_file.name)
else:
return load_excel_file(file_content)
def apply_data_cleaning(df: pd.DataFrame, operations: List[Dict[str, Any]]) -> pd.DataFrame:
"""Apply data cleaning operations"""
cleaned_df = df.copy()
for operation in operations:
if operation['type'] == 'fill_missing':
if operation['method'] == 'mean':
cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna(
cleaned_df[operation['column']].mean())
elif operation['method'] == 'median':
cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna(
cleaned_df[operation['column']].median())
elif operation['method'] == 'mode':
cleaned_df[operation['column']] = cleaned_df[operation['column']].fillna(
cleaned_df[operation['column']].mode().iloc[0] if not cleaned_df[operation['column']].mode().empty else 0)
elif operation['method'] == 'drop':
cleaned_df = cleaned_df.dropna(subset=[operation['column']])
elif operation['type'] == 'remove_duplicates':
cleaned_df = cleaned_df.drop_duplicates()
elif operation['type'] == 'remove_outliers':
Q1 = cleaned_df[operation['column']].quantile(0.25)
Q3 = cleaned_df[operation['column']].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
cleaned_df = cleaned_df[
(cleaned_df[operation['column']] >= lower_bound) &
(cleaned_df[operation['column']] <= upper_bound)
]
elif operation['type'] == 'cap_outliers':
Q1 = cleaned_df[operation['column']].quantile(0.25)
Q3 = cleaned_df[operation['column']].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
cleaned_df[operation['column']] = cleaned_df[operation['column']].clip(lower_bound, upper_bound)
elif operation['type'] == 'convert_type':
if operation['target_type'] == 'category':
cleaned_df[operation['column']] = cleaned_df[operation['column']].astype('category')
return cleaned_df