# Import required libraries from pathlib import Path import pandas as pd import numpy as np import joblib # Scikit-learn imports for building ML pipeline from sklearn.compose import ColumnTransformer from sklearn.preprocessing import OneHotEncoder, StandardScaler from sklearn.impute import SimpleImputer from sklearn.linear_model import LogisticRegression from sklearn.pipeline import Pipeline from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, roc_auc_score DATA_PATH = Path("C:\Users\wissa\Downloads\data\stroke-flask-docker\data\healthcare-dataset-stroke-data.csv") OUT_PATH = Path("C:\Users\wissa\Downloads\data\stroke-flask-docker\model\stroke_pipeline.joblib") OUT_PATH.parent.mkdir(parents=True, exist_ok=True) # Make sure output folder exists # Define feature groups CATEGORICAL = ["gender","ever_married","work_type","Residence_type","smoking_status"] NUMERIC = ["age","avg_glucose_level","bmi"] BINARY_INT = ["hypertension","heart_disease"] # Already numeric (0/1), but treated separately def load_real_or_synthetic(): if DATA_PATH.exists(): # Load dataset from CSV df = pd.read_csv(DATA_PATH) # Define which columns we MUST have must_have = ["gender","age","hypertension","heart_disease","ever_married", "work_type","Residence_type","avg_glucose_level","bmi", "smoking_status","stroke"] # Check if any required columns are missing missing = set(must_have) - set(df.columns) if missing: raise ValueError(f"Dataset is missing columns: {missing}") # Drop extra columns like "id" if present, keep only required ones df = df[[c for c in df.columns if c in must_have]] return df else: # If dataset file is not found, generate synthetic (random but realistic) data rng = np.random.RandomState(42) # Random seed for reproducibility N = 2000 # number of synthetic rows # Generate random values for each feature df = pd.DataFrame({ "gender": rng.choice(["Male","Female","Other"], size=N, p=[0.49,0.50,0.01]), "age": rng.randint(1, 90, size=N), "hypertension": rng.binomial(1, 0.15, size=N), # 15% chance of hypertension "heart_disease": rng.binomial(1, 0.08, size=N), # 8% chance of heart disease "ever_married": rng.choice(["Yes","No"], size=N, p=[0.7,0.3]), "work_type": rng.choice(["Private","Self-employed","Govt_job","children","Never_worked"], size=N, p=[0.6,0.2,0.18,0.01,0.01]), "Residence_type": rng.choice(["Urban","Rural"], size=N, p=[0.55,0.45]), "avg_glucose_level": rng.normal(100, 30, size=N).clip(50, 300), # realistic range "bmi": rng.normal(28, 6, size=N).clip(10, 60), "smoking_status": rng.choice(["formerly smoked","never smoked","smokes","Unknown"], size=N, p=[0.2,0.6,0.15,0.05]), }) # Define a "logit" (linear combination of features) that influences stroke probability logit = ( 0.03*df["age"] + 0.02*(df["avg_glucose_level"]-100) + 0.05*(df["bmi"]-28) + 0.8*df["hypertension"] + 0.9*df["heart_disease"] + 0.3*(df["ever_married"]=="Yes").astype(int) ) # Convert logit to probability using sigmoid function prob = 1/(1+np.exp(- (logit-4.0))) # shift so stroke is rare (imbalanced dataset) # Assign stroke label (1 = stroke, 0 = no stroke) based on probability df["stroke"] = (rng.rand(len(df)) < prob).astype(int) return df def build_pipeline(): # For categorical features: fill missing with most frequent, then one-hot encode cat_proc = Pipeline(steps=[ ("impute", SimpleImputer(strategy="most_frequent")), ("ohe", OneHotEncoder(handle_unknown="ignore")) ]) # For numeric features: fill missing with median, then scale to mean=0, std=1 num_proc = Pipeline(steps=[ ("impute", SimpleImputer(strategy="median")), ("scale", StandardScaler()) ]) # For binary integer features: impute, then scale (optional but safe for pipeline) bin_proc = Pipeline(steps=[ ("impute", SimpleImputer(strategy="most_frequent")), ("scale", StandardScaler(with_mean=False)) # keep sparse-friendly format ]) # Combine all processors into one column transformer pre = ColumnTransformer(transformers=[ ("cat", cat_proc, CATEGORICAL), ("num", num_proc, NUMERIC), ("bin", bin_proc, BINARY_INT), ]) # Define classifier (logistic regression for binary classification) clf = LogisticRegression(max_iter=1000, n_jobs=None) # Final pipeline: preprocessing → model pipeline = Pipeline([("pre", pre), ("clf", clf)]) return pipeline def main(): df = load_real_or_synthetic() # Split into features (X) and target (y = stroke) X = df.drop(columns=["stroke"]) y = df["stroke"].astype(int) X_train, X_test, y_train, y_test = train_test_split( X, y, test_size=0.2, random_state=42, stratify=y ) pipeline = build_pipeline() pipeline.fit(X_train, y_train) y_prob = pipeline.predict_proba(X_test)[:,1] # probability of stroke y_pred = (y_prob >= 0.3).astype(int) # classify as 1 if prob ≥ 0.3 print("AUC:", roc_auc_score(y_test, y_prob)) # area under ROC curve print("Report:\n", classification_report(y_test, y_pred)) # precision/recall/F1 joblib.dump(pipeline, OUT_PATH) print(f"Saved pipeline to {OUT_PATH.resolve()}") if __name__ == "__main__": main()