File size: 5,803 Bytes
fbec100 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 |
# Import required libraries
from pathlib import Path
import pandas as pd
import numpy as np
import joblib
# Scikit-learn imports for building ML pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
DATA_PATH = Path("C:\Users\wissa\Downloads\data\stroke-flask-docker\data\healthcare-dataset-stroke-data.csv")
OUT_PATH = Path("C:\Users\wissa\Downloads\data\stroke-flask-docker\model\stroke_pipeline.joblib")
OUT_PATH.parent.mkdir(parents=True, exist_ok=True) # Make sure output folder exists
# Define feature groups
CATEGORICAL = ["gender","ever_married","work_type","Residence_type","smoking_status"]
NUMERIC = ["age","avg_glucose_level","bmi"]
BINARY_INT = ["hypertension","heart_disease"] # Already numeric (0/1), but treated separately
def load_real_or_synthetic():
if DATA_PATH.exists():
# Load dataset from CSV
df = pd.read_csv(DATA_PATH)
# Define which columns we MUST have
must_have = ["gender","age","hypertension","heart_disease","ever_married",
"work_type","Residence_type","avg_glucose_level","bmi",
"smoking_status","stroke"]
# Check if any required columns are missing
missing = set(must_have) - set(df.columns)
if missing:
raise ValueError(f"Dataset is missing columns: {missing}")
# Drop extra columns like "id" if present, keep only required ones
df = df[[c for c in df.columns if c in must_have]]
return df
else:
# If dataset file is not found, generate synthetic (random but realistic) data
rng = np.random.RandomState(42) # Random seed for reproducibility
N = 2000 # number of synthetic rows
# Generate random values for each feature
df = pd.DataFrame({
"gender": rng.choice(["Male","Female","Other"], size=N, p=[0.49,0.50,0.01]),
"age": rng.randint(1, 90, size=N),
"hypertension": rng.binomial(1, 0.15, size=N), # 15% chance of hypertension
"heart_disease": rng.binomial(1, 0.08, size=N), # 8% chance of heart disease
"ever_married": rng.choice(["Yes","No"], size=N, p=[0.7,0.3]),
"work_type": rng.choice(["Private","Self-employed","Govt_job","children","Never_worked"],
size=N, p=[0.6,0.2,0.18,0.01,0.01]),
"Residence_type": rng.choice(["Urban","Rural"], size=N, p=[0.55,0.45]),
"avg_glucose_level": rng.normal(100, 30, size=N).clip(50, 300), # realistic range
"bmi": rng.normal(28, 6, size=N).clip(10, 60),
"smoking_status": rng.choice(["formerly smoked","never smoked","smokes","Unknown"],
size=N, p=[0.2,0.6,0.15,0.05]),
})
# Define a "logit" (linear combination of features) that influences stroke probability
logit = (
0.03*df["age"] +
0.02*(df["avg_glucose_level"]-100) +
0.05*(df["bmi"]-28) +
0.8*df["hypertension"] +
0.9*df["heart_disease"] +
0.3*(df["ever_married"]=="Yes").astype(int)
)
# Convert logit to probability using sigmoid function
prob = 1/(1+np.exp(- (logit-4.0))) # shift so stroke is rare (imbalanced dataset)
# Assign stroke label (1 = stroke, 0 = no stroke) based on probability
df["stroke"] = (rng.rand(len(df)) < prob).astype(int)
return df
def build_pipeline():
# For categorical features: fill missing with most frequent, then one-hot encode
cat_proc = Pipeline(steps=[
("impute", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(handle_unknown="ignore"))
])
# For numeric features: fill missing with median, then scale to mean=0, std=1
num_proc = Pipeline(steps=[
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler())
])
# For binary integer features: impute, then scale (optional but safe for pipeline)
bin_proc = Pipeline(steps=[
("impute", SimpleImputer(strategy="most_frequent")),
("scale", StandardScaler(with_mean=False)) # keep sparse-friendly format
])
# Combine all processors into one column transformer
pre = ColumnTransformer(transformers=[
("cat", cat_proc, CATEGORICAL),
("num", num_proc, NUMERIC),
("bin", bin_proc, BINARY_INT),
])
# Define classifier (logistic regression for binary classification)
clf = LogisticRegression(max_iter=1000, n_jobs=None)
# Final pipeline: preprocessing β model
pipeline = Pipeline([("pre", pre), ("clf", clf)])
return pipeline
def main():
df = load_real_or_synthetic()
# Split into features (X) and target (y = stroke)
X = df.drop(columns=["stroke"])
y = df["stroke"].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
pipeline = build_pipeline()
pipeline.fit(X_train, y_train)
y_prob = pipeline.predict_proba(X_test)[:,1] # probability of stroke
y_pred = (y_prob >= 0.3).astype(int) # classify as 1 if prob β₯ 0.3
print("AUC:", roc_auc_score(y_test, y_prob)) # area under ROC curve
print("Report:\n", classification_report(y_test, y_pred)) # precision/recall/F1
joblib.dump(pipeline, OUT_PATH)
print(f"Saved pipeline to {OUT_PATH.resolve()}")
if __name__ == "__main__":
main()
|