WissMah's picture
Update model/train_and_save.py
fbec100 verified
# Import required libraries
from pathlib import Path
import pandas as pd
import numpy as np
import joblib
# Scikit-learn imports for building ML pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score
DATA_PATH = Path("C:\Users\wissa\Downloads\data\stroke-flask-docker\data\healthcare-dataset-stroke-data.csv")
OUT_PATH = Path("C:\Users\wissa\Downloads\data\stroke-flask-docker\model\stroke_pipeline.joblib")
OUT_PATH.parent.mkdir(parents=True, exist_ok=True) # Make sure output folder exists
# Define feature groups
CATEGORICAL = ["gender","ever_married","work_type","Residence_type","smoking_status"]
NUMERIC = ["age","avg_glucose_level","bmi"]
BINARY_INT = ["hypertension","heart_disease"] # Already numeric (0/1), but treated separately
def load_real_or_synthetic():
if DATA_PATH.exists():
# Load dataset from CSV
df = pd.read_csv(DATA_PATH)
# Define which columns we MUST have
must_have = ["gender","age","hypertension","heart_disease","ever_married",
"work_type","Residence_type","avg_glucose_level","bmi",
"smoking_status","stroke"]
# Check if any required columns are missing
missing = set(must_have) - set(df.columns)
if missing:
raise ValueError(f"Dataset is missing columns: {missing}")
# Drop extra columns like "id" if present, keep only required ones
df = df[[c for c in df.columns if c in must_have]]
return df
else:
# If dataset file is not found, generate synthetic (random but realistic) data
rng = np.random.RandomState(42) # Random seed for reproducibility
N = 2000 # number of synthetic rows
# Generate random values for each feature
df = pd.DataFrame({
"gender": rng.choice(["Male","Female","Other"], size=N, p=[0.49,0.50,0.01]),
"age": rng.randint(1, 90, size=N),
"hypertension": rng.binomial(1, 0.15, size=N), # 15% chance of hypertension
"heart_disease": rng.binomial(1, 0.08, size=N), # 8% chance of heart disease
"ever_married": rng.choice(["Yes","No"], size=N, p=[0.7,0.3]),
"work_type": rng.choice(["Private","Self-employed","Govt_job","children","Never_worked"],
size=N, p=[0.6,0.2,0.18,0.01,0.01]),
"Residence_type": rng.choice(["Urban","Rural"], size=N, p=[0.55,0.45]),
"avg_glucose_level": rng.normal(100, 30, size=N).clip(50, 300), # realistic range
"bmi": rng.normal(28, 6, size=N).clip(10, 60),
"smoking_status": rng.choice(["formerly smoked","never smoked","smokes","Unknown"],
size=N, p=[0.2,0.6,0.15,0.05]),
})
# Define a "logit" (linear combination of features) that influences stroke probability
logit = (
0.03*df["age"] +
0.02*(df["avg_glucose_level"]-100) +
0.05*(df["bmi"]-28) +
0.8*df["hypertension"] +
0.9*df["heart_disease"] +
0.3*(df["ever_married"]=="Yes").astype(int)
)
# Convert logit to probability using sigmoid function
prob = 1/(1+np.exp(- (logit-4.0))) # shift so stroke is rare (imbalanced dataset)
# Assign stroke label (1 = stroke, 0 = no stroke) based on probability
df["stroke"] = (rng.rand(len(df)) < prob).astype(int)
return df
def build_pipeline():
# For categorical features: fill missing with most frequent, then one-hot encode
cat_proc = Pipeline(steps=[
("impute", SimpleImputer(strategy="most_frequent")),
("ohe", OneHotEncoder(handle_unknown="ignore"))
])
# For numeric features: fill missing with median, then scale to mean=0, std=1
num_proc = Pipeline(steps=[
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler())
])
# For binary integer features: impute, then scale (optional but safe for pipeline)
bin_proc = Pipeline(steps=[
("impute", SimpleImputer(strategy="most_frequent")),
("scale", StandardScaler(with_mean=False)) # keep sparse-friendly format
])
# Combine all processors into one column transformer
pre = ColumnTransformer(transformers=[
("cat", cat_proc, CATEGORICAL),
("num", num_proc, NUMERIC),
("bin", bin_proc, BINARY_INT),
])
# Define classifier (logistic regression for binary classification)
clf = LogisticRegression(max_iter=1000, n_jobs=None)
# Final pipeline: preprocessing β†’ model
pipeline = Pipeline([("pre", pre), ("clf", clf)])
return pipeline
def main():
df = load_real_or_synthetic()
# Split into features (X) and target (y = stroke)
X = df.drop(columns=["stroke"])
y = df["stroke"].astype(int)
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42, stratify=y
)
pipeline = build_pipeline()
pipeline.fit(X_train, y_train)
y_prob = pipeline.predict_proba(X_test)[:,1] # probability of stroke
y_pred = (y_prob >= 0.3).astype(int) # classify as 1 if prob β‰₯ 0.3
print("AUC:", roc_auc_score(y_test, y_prob)) # area under ROC curve
print("Report:\n", classification_report(y_test, y_pred)) # precision/recall/F1
joblib.dump(pipeline, OUT_PATH)
print(f"Saved pipeline to {OUT_PATH.resolve()}")
if __name__ == "__main__":
main()