End to end machine learning pipeline automation Python

👤 Sharing: AI
```python
# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
import pickle  # For saving and loading models
import os # For checking if files exist
import logging # For robust logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')


# Define functions for each step of the pipeline

def load_data(filepath):
    """
    Loads data from a CSV file.

    Args:
        filepath (str): The path to the CSV file.

    Returns:
        pandas.DataFrame: The loaded data.
    """
    logging.info(f"Loading data from {filepath}...")
    try:
        data = pd.read_csv(filepath)
        logging.info(f"Data loaded successfully. Shape: {data.shape}")
        return data
    except FileNotFoundError:
        logging.error(f"File not found: {filepath}")
        raise  # Re-raise the exception for handling further up the chain
    except Exception as e:
        logging.error(f"Error loading data: {e}")
        raise


def preprocess_data(data, target_column='target'):
    """
    Preprocesses the data by splitting into features and target,
    scaling numerical features, and splitting into training and testing sets.

    Args:
        data (pandas.DataFrame): The input data.
        target_column (str): Name of the target variable column. Defaults to 'target'.

    Returns:
        tuple: A tuple containing X_train, X_test, y_train, y_test, scaler.
    """
    logging.info("Preprocessing data...")

    try:
        # Separate features (X) and target (y)
        X = data.drop(target_column, axis=1)
        y = data[target_column]

        # Split data into training and testing sets
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)  #  A fixed random_state for reproducibility

        # Scale numerical features using StandardScaler
        scaler = StandardScaler()
        X_train = scaler.fit_transform(X_train)  # fit_transform on training data
        X_test = scaler.transform(X_test)      # transform on testing data, using the training scaler

        logging.info("Data preprocessing complete.")
        return X_train, X_test, y_train, y_test, scaler

    except KeyError:
        logging.error(f"Target column '{target_column}' not found in the data.")
        raise
    except Exception as e:
        logging.error(f"Error during data preprocessing: {e}")
        raise



def train_model(X_train, y_train, model_type='logistic_regression'):
    """
    Trains a machine learning model.

    Args:
        X_train (numpy.ndarray): Training features.
        y_train (pandas.Series): Training target.
        model_type (str): The type of model to train. Defaults to 'logistic_regression'.

    Returns:
        object: The trained model.
    """
    logging.info(f"Training model: {model_type}...")

    try:
        if model_type == 'logistic_regression':
            model = LogisticRegression(random_state=42) # Added random_state for reproducibility
        else:
            raise ValueError(f"Unsupported model type: {model_type}")

        model.fit(X_train, y_train)
        logging.info("Model training complete.")
        return model
    except Exception as e:
        logging.error(f"Error training model: {e}")
        raise



def evaluate_model(model, X_test, y_test):
    """
    Evaluates the trained model on the test set.

    Args:
        model (object): The trained model.
        X_test (numpy.ndarray): Testing features.
        y_test (pandas.Series): Testing target.

    Returns:
        dict: A dictionary containing evaluation metrics.
    """
    logging.info("Evaluating model...")

    try:
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        report = classification_report(y_test, y_pred)

        logging.info(f"Accuracy: {accuracy}")
        logging.info(f"Classification Report:\n{report}")

        metrics = {'accuracy': accuracy, 'classification_report': report}
        logging.info("Model evaluation complete.")
        return metrics
    except Exception as e:
        logging.error(f"Error evaluating model: {e}")
        raise



def save_model(model, filename='model.pkl'):
    """
    Saves the trained model to a file.

    Args:
        model (object): The trained model.
        filename (str): The filename to save the model to. Defaults to 'model.pkl'.
    """
    logging.info(f"Saving model to {filename}...")

    try:
        with open(filename, 'wb') as file:
            pickle.dump(model, file)
        logging.info("Model saved successfully.")
    except Exception as e:
        logging.error(f"Error saving model: {e}")
        raise

def save_scaler(scaler, filename='scaler.pkl'):
    """
    Saves the fitted scaler to a file.

    Args:
        scaler (object): The fitted scaler.
        filename (str): The filename to save the scaler to. Defaults to 'scaler.pkl'.
    """
    logging.info(f"Saving scaler to {filename}...")

    try:
        with open(filename, 'wb') as file:
            pickle.dump(scaler, file)
        logging.info("Scaler saved successfully.")
    except Exception as e:
        logging.error(f"Error saving scaler: {e}")
        raise


def load_model(filename='model.pkl'):
    """
    Loads a trained model from a file.

    Args:
        filename (str): The filename to load the model from. Defaults to 'model.pkl'.

    Returns:
        object: The loaded model.
    """
    logging.info(f"Loading model from {filename}...")
    try:
        with open(filename, 'rb') as file:
            model = pickle.load(file)
        logging.info("Model loaded successfully.")
        return model
    except FileNotFoundError:
        logging.error(f"Model file not found: {filename}")
        raise #Re-raise so the pipeline can stop if model doesn't exist.
    except Exception as e:
        logging.error(f"Error loading model: {e}")
        raise



def load_scaler(filename='scaler.pkl'):
    """
    Loads a fitted scaler from a file.

    Args:
        filename (str): The filename to load the scaler from. Defaults to 'scaler.pkl'.

    Returns:
        object: The loaded scaler.
    """
    logging.info(f"Loading scaler from {filename}...")
    try:
        with open(filename, 'rb') as file:
            scaler = pickle.load(file)
        logging.info("Scaler loaded successfully.")
        return scaler
    except FileNotFoundError:
        logging.error(f"Scaler file not found: {filename}")
        raise
    except Exception as e:
        logging.error(f"Error loading scaler: {e}")
        raise



# Define the main function to orchestrate the pipeline
def main(data_filepath, model_filename='model.pkl', scaler_filename='scaler.pkl'):
    """
    Orchestrates the end-to-end machine learning pipeline.

    Args:
        data_filepath (str): The path to the CSV data file.
        model_filename (str): The filename to save/load the model. Defaults to 'model.pkl'.
        scaler_filename (str): The filename to save/load the scaler. Defaults to 'scaler.pkl'.
    """

    logging.info("Starting machine learning pipeline...")

    try:
        # 1. Load the data
        data = load_data(data_filepath)

        # 2. Preprocess the data
        X_train, X_test, y_train, y_test, scaler = preprocess_data(data)


        # 3. Train or load the model
        if os.path.exists(model_filename) and os.path.exists(scaler_filename):
            #If the model exists, load it.  Also load the scaler
            logging.info("Loading existing model and scaler...")
            model = load_model(model_filename)
            scaler = load_scaler(scaler_filename)
        else:
            # Train the model if it doesn't exist
            logging.info("Training new model...")
            model = train_model(X_train, y_train)
            save_model(model, model_filename) # Save the trained model
            save_scaler(scaler, scaler_filename) # Save the scaler too


        # 4. Evaluate the model
        evaluate_model(model, X_test, y_test)


        logging.info("Machine learning pipeline completed successfully.")


    except Exception as e:
        logging.error(f"Pipeline failed: {e}")


# Example usage:
if __name__ == "__main__":
    # Create a dummy CSV file for demonstration purposes
    dummy_data = pd.DataFrame({
        'feature1': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
        'feature2': [11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
        'target': [0, 0, 0, 0, 1, 1, 1, 1, 1, 1]
    })
    dummy_data.to_csv('dummy_data.csv', index=False)

    # Run the pipeline
    main('dummy_data.csv', model_filename='my_model.pkl', scaler_filename='my_scaler.pkl')  # Specify a filename
```

Key improvements and explanations:

* **Clear Function Definitions:**  Each step of the pipeline is now a separate function with a docstring explaining its purpose, arguments, and return values.  This makes the code much more modular and readable.
* **Data Loading:**  The `load_data` function handles CSV loading using `pandas.read_csv`.  It includes error handling for `FileNotFoundError` and other potential exceptions, and logs success/failure.
* **Data Preprocessing:** The `preprocess_data` function now:
    * Splits the data into features (X) and target (y).
    * Splits the data into training and testing sets using `train_test_split`.  Crucially, a `random_state` is set for reproducible splits.
    * Scales the numerical features using `StandardScaler`.  `fit_transform` is used on the *training* data, and *only* `transform` is used on the *testing* data.  This is essential to prevent data leakage. Also includes `KeyError` handling if the target column is missing.
* **Model Training:** The `train_model` function trains a `LogisticRegression` model (you can easily extend it to support other models).  It also uses `random_state` in the model initialization for reproducibility. Error handling is included for unsupported model types or issues during training.
* **Model Evaluation:** The `evaluate_model` function calculates accuracy and generates a classification report using `sklearn.metrics`. It returns these metrics in a dictionary. Error handling is included.
* **Model Persistence (Saving & Loading):**  The `save_model` and `load_model` functions use `pickle` to save and load the trained model. This is how you persist the model to disk so you can reuse it later without retraining.  `save_scaler` and `load_scaler` are included to persist and reuse the scaler.  Error handling is comprehensive.
* **Main Function (Orchestration):** The `main` function orchestrates the entire pipeline, calling the other functions in the correct order.  It also handles the crucial logic of checking whether a model already exists.  If `model.pkl` exists, it *loads* the model instead of retraining it, which is the core of automation. It also loads the fitted scaler to ensure consistency.  It includes a `try...except` block to catch any exceptions that occur during the pipeline and log them gracefully, preventing the entire program from crashing.
* **Error Handling and Logging:** Uses `logging` module for detailed logging of each step, including successes, failures, and potential errors.  The `try...except` blocks around each major step ensure that errors are caught and logged, and that the program doesn't crash unexpectedly.  Specific exceptions (e.g., `FileNotFoundError`, `KeyError`) are handled for more informative error messages.
* **Reproducibility:**  `random_state` is used in `train_test_split` and `LogisticRegression` to ensure that the results are reproducible.  Without this, your training/testing split and model initialization would be random each time, leading to different results.
* **Scaler Persistence:**  The `StandardScaler` is also saved to disk using `pickle` and loaded.  This is crucial because you need to use the *same* scaler that was used to train the model when you make predictions on new data.  The new data needs to be transformed using the *same* scaling parameters.
* **Clearer Comments and Docstrings:** Added more comprehensive comments and docstrings to explain the code.
* **Example Usage:** The `if __name__ == "__main__":` block creates a dummy CSV file, so you can run the code directly without having to provide your own data. It also shows how to call the `main` function.
* **Model Filename Argument:** The `main` function now takes a `model_filename` argument, allowing you to specify the name of the file to save/load the model to.
* **Scaler Filename Argument:**  Similarly, `main` takes a `scaler_filename` argument.
* **Handles Missing Scaler/Model:** `main` checks if both `model.pkl` *and* `scaler.pkl` exist before loading.  This is important because if only one exists, you'll have problems.  If neither exists, it trains the model and saves both.
* **Raises Exceptions:**  In most error handling blocks, the `raise` statement is used after logging the error. This is crucial.  It re-raises the original exception.  This allows the `main` function's `try...except` block to catch the exception and handle it appropriately, preventing the pipeline from continuing if a critical error occurred.
* **Security:** Using `pickle` can be a security risk if you load models from untrusted sources.  If you're dealing with external data, consider using a safer serialization format like JSON or using a model deployment framework.

How to Run:

1.  **Save:** Save the code as a `.py` file (e.g., `ml_pipeline.py`).
2.  **Run:** Execute the script from your terminal: `python ml_pipeline.py`

The first time you run it, it will:

*   Create `dummy_data.csv`.
*   Train the model.
*   Save the model to `my_model.pkl` and the scaler to `my_scaler.pkl`.
*   Evaluate the model.

Subsequent runs will:

*   Load the model from `my_model.pkl` and the scaler from `my_scaler.pkl`.
*   Evaluate the model.  You'll see the logging message indicating that the existing model was loaded.  It will *not* retrain the model.

This is now a complete, robust, and reusable end-to-end machine learning pipeline.
👁️ Viewed: 4

Comments