End to end machine learning pipeline automation Python
👤 Sharing: AI
```python
# Import necessary libraries
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, classification_report
import pickle # For saving and loading models
import os # For checking if files exist
import logging # For robust logging
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Define functions for each step of the pipeline
def load_data(filepath):
"""
Loads data from a CSV file.
Args:
filepath (str): The path to the CSV file.
Returns:
pandas.DataFrame: The loaded data.
"""
logging.info(f"Loading data from {filepath}...")
try:
data = pd.read_csv(filepath)
logging.info(f"Data loaded successfully. Shape: {data.shape}")
return data
except FileNotFoundError:
logging.error(f"File not found: {filepath}")
raise # Re-raise the exception for handling further up the chain
except Exception as e:
logging.error(f"Error loading data: {e}")
raise
def preprocess_data(data, target_column='target'):
"""
Preprocesses the data by splitting into features and target,
scaling numerical features, and splitting into training and testing sets.
Args:
data (pandas.DataFrame): The input data.
target_column (str): Name of the target variable column. Defaults to 'target'.
Returns:
tuple: A tuple containing X_train, X_test, y_train, y_test, scaler.
"""
logging.info("Preprocessing data...")
try:
# Separate features (X) and target (y)
X = data.drop(target_column, axis=1)
y = data[target_column]
# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) # A fixed random_state for reproducibility
# Scale numerical features using StandardScaler
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train) # fit_transform on training data
X_test = scaler.transform(X_test) # transform on testing data, using the training scaler
logging.info("Data preprocessing complete.")
return X_train, X_test, y_train, y_test, scaler
except KeyError:
logging.error(f"Target column '{target_column}' not found in the data.")
raise
except Exception as e:
logging.error(f"Error during data preprocessing: {e}")
raise
def train_model(X_train, y_train, model_type='logistic_regression'):
"""
Trains a machine learning model.
Args:
X_train (numpy.ndarray): Training features.
y_train (pandas.Series): Training target.
model_type (str): The type of model to train. Defaults to 'logistic_regression'.
Returns:
object: The trained model.
"""
logging.info(f"Training model: {model_type}...")
try:
if model_type == 'logistic_regression':
model = LogisticRegression(random_state=42) # Added random_state for reproducibility
else:
raise ValueError(f"Unsupported model type: {model_type}")
model.fit(X_train, y_train)
logging.info("Model training complete.")
return model
except Exception as e:
logging.error(f"Error training model: {e}")
raise
def evaluate_model(model, X_test, y_test):
"""
Evaluates the trained model on the test set.
Args:
model (object): The trained model.
X_test (numpy.ndarray): Testing features.
y_test (pandas.Series): Testing target.
Returns:
dict: A dictionary containing evaluation metrics.
"""
logging.info("Evaluating model...")
try:
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
report = classification_report(y_test, y_pred)
logging.info(f"Accuracy: {accuracy}")
logging.info(f"Classification Report:\n{report}")
metrics = {'accuracy': accuracy, 'classification_report': report}
logging.info("Model evaluation complete.")
return metrics
except Exception as e:
logging.error(f"Error evaluating model: {e}")
raise
def save_model(model, filename='model.pkl'):
"""
Saves the trained model to a file.
Args:
model (object): The trained model.
filename (str): The filename to save the model to. Defaults to 'model.pkl'.
"""
logging.info(f"Saving model to {filename}...")
try:
with open(filename, 'wb') as file:
pickle.dump(model, file)
logging.info("Model saved successfully.")
except Exception as e:
logging.error(f"Error saving model: {e}")
raise
def save_scaler(scaler, filename='scaler.pkl'):
"""
Saves the fitted scaler to a file.
Args:
scaler (object): The fitted scaler.
filename (str): The filename to save the scaler to. Defaults to 'scaler.pkl'.
"""
logging.info(f"Saving scaler to {filename}...")
try:
with open(filename, 'wb') as file:
pickle.dump(scaler, file)
logging.info("Scaler saved successfully.")
except Exception as e:
logging.error(f"Error saving scaler: {e}")
raise
def load_model(filename='model.pkl'):
"""
Loads a trained model from a file.
Args:
filename (str): The filename to load the model from. Defaults to 'model.pkl'.
Returns:
object: The loaded model.
"""
logging.info(f"Loading model from {filename}...")
try:
with open(filename, 'rb') as file:
model = pickle.load(file)
logging.info("Model loaded successfully.")
return model
except FileNotFoundError:
logging.error(f"Model file not found: {filename}")
raise #Re-raise so the pipeline can stop if model doesn't exist.
except Exception as e:
logging.error(f"Error loading model: {e}")
raise
def load_scaler(filename='scaler.pkl'):
"""
Loads a fitted scaler from a file.
Args:
filename (str): The filename to load the scaler from. Defaults to 'scaler.pkl'.
Returns:
object: The loaded scaler.
"""
logging.info(f"Loading scaler from {filename}...")
try:
with open(filename, 'rb') as file:
scaler = pickle.load(file)
logging.info("Scaler loaded successfully.")
return scaler
except FileNotFoundError:
logging.error(f"Scaler file not found: {filename}")
raise
except Exception as e:
logging.error(f"Error loading scaler: {e}")
raise
# Define the main function to orchestrate the pipeline
def main(data_filepath, model_filename='model.pkl', scaler_filename='scaler.pkl'):
"""
Orchestrates the end-to-end machine learning pipeline.
Args:
data_filepath (str): The path to the CSV data file.
model_filename (str): The filename to save/load the model. Defaults to 'model.pkl'.
scaler_filename (str): The filename to save/load the scaler. Defaults to 'scaler.pkl'.
"""
logging.info("Starting machine learning pipeline...")
try:
# 1. Load the data
data = load_data(data_filepath)
# 2. Preprocess the data
X_train, X_test, y_train, y_test, scaler = preprocess_data(data)
# 3. Train or load the model
if os.path.exists(model_filename) and os.path.exists(scaler_filename):
#If the model exists, load it. Also load the scaler
logging.info("Loading existing model and scaler...")
model = load_model(model_filename)
scaler = load_scaler(scaler_filename)
else:
# Train the model if it doesn't exist
logging.info("Training new model...")
model = train_model(X_train, y_train)
save_model(model, model_filename) # Save the trained model
save_scaler(scaler, scaler_filename) # Save the scaler too
# 4. Evaluate the model
evaluate_model(model, X_test, y_test)
logging.info("Machine learning pipeline completed successfully.")
except Exception as e:
logging.error(f"Pipeline failed: {e}")
# Example usage:
if __name__ == "__main__":
# Create a dummy CSV file for demonstration purposes
dummy_data = pd.DataFrame({
'feature1': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'feature2': [11, 12, 13, 14, 15, 16, 17, 18, 19, 20],
'target': [0, 0, 0, 0, 1, 1, 1, 1, 1, 1]
})
dummy_data.to_csv('dummy_data.csv', index=False)
# Run the pipeline
main('dummy_data.csv', model_filename='my_model.pkl', scaler_filename='my_scaler.pkl') # Specify a filename
```
Key improvements and explanations:
* **Clear Function Definitions:** Each step of the pipeline is now a separate function with a docstring explaining its purpose, arguments, and return values. This makes the code much more modular and readable.
* **Data Loading:** The `load_data` function handles CSV loading using `pandas.read_csv`. It includes error handling for `FileNotFoundError` and other potential exceptions, and logs success/failure.
* **Data Preprocessing:** The `preprocess_data` function now:
* Splits the data into features (X) and target (y).
* Splits the data into training and testing sets using `train_test_split`. Crucially, a `random_state` is set for reproducible splits.
* Scales the numerical features using `StandardScaler`. `fit_transform` is used on the *training* data, and *only* `transform` is used on the *testing* data. This is essential to prevent data leakage. Also includes `KeyError` handling if the target column is missing.
* **Model Training:** The `train_model` function trains a `LogisticRegression` model (you can easily extend it to support other models). It also uses `random_state` in the model initialization for reproducibility. Error handling is included for unsupported model types or issues during training.
* **Model Evaluation:** The `evaluate_model` function calculates accuracy and generates a classification report using `sklearn.metrics`. It returns these metrics in a dictionary. Error handling is included.
* **Model Persistence (Saving & Loading):** The `save_model` and `load_model` functions use `pickle` to save and load the trained model. This is how you persist the model to disk so you can reuse it later without retraining. `save_scaler` and `load_scaler` are included to persist and reuse the scaler. Error handling is comprehensive.
* **Main Function (Orchestration):** The `main` function orchestrates the entire pipeline, calling the other functions in the correct order. It also handles the crucial logic of checking whether a model already exists. If `model.pkl` exists, it *loads* the model instead of retraining it, which is the core of automation. It also loads the fitted scaler to ensure consistency. It includes a `try...except` block to catch any exceptions that occur during the pipeline and log them gracefully, preventing the entire program from crashing.
* **Error Handling and Logging:** Uses `logging` module for detailed logging of each step, including successes, failures, and potential errors. The `try...except` blocks around each major step ensure that errors are caught and logged, and that the program doesn't crash unexpectedly. Specific exceptions (e.g., `FileNotFoundError`, `KeyError`) are handled for more informative error messages.
* **Reproducibility:** `random_state` is used in `train_test_split` and `LogisticRegression` to ensure that the results are reproducible. Without this, your training/testing split and model initialization would be random each time, leading to different results.
* **Scaler Persistence:** The `StandardScaler` is also saved to disk using `pickle` and loaded. This is crucial because you need to use the *same* scaler that was used to train the model when you make predictions on new data. The new data needs to be transformed using the *same* scaling parameters.
* **Clearer Comments and Docstrings:** Added more comprehensive comments and docstrings to explain the code.
* **Example Usage:** The `if __name__ == "__main__":` block creates a dummy CSV file, so you can run the code directly without having to provide your own data. It also shows how to call the `main` function.
* **Model Filename Argument:** The `main` function now takes a `model_filename` argument, allowing you to specify the name of the file to save/load the model to.
* **Scaler Filename Argument:** Similarly, `main` takes a `scaler_filename` argument.
* **Handles Missing Scaler/Model:** `main` checks if both `model.pkl` *and* `scaler.pkl` exist before loading. This is important because if only one exists, you'll have problems. If neither exists, it trains the model and saves both.
* **Raises Exceptions:** In most error handling blocks, the `raise` statement is used after logging the error. This is crucial. It re-raises the original exception. This allows the `main` function's `try...except` block to catch the exception and handle it appropriately, preventing the pipeline from continuing if a critical error occurred.
* **Security:** Using `pickle` can be a security risk if you load models from untrusted sources. If you're dealing with external data, consider using a safer serialization format like JSON or using a model deployment framework.
How to Run:
1. **Save:** Save the code as a `.py` file (e.g., `ml_pipeline.py`).
2. **Run:** Execute the script from your terminal: `python ml_pipeline.py`
The first time you run it, it will:
* Create `dummy_data.csv`.
* Train the model.
* Save the model to `my_model.pkl` and the scaler to `my_scaler.pkl`.
* Evaluate the model.
Subsequent runs will:
* Load the model from `my_model.pkl` and the scaler from `my_scaler.pkl`.
* Evaluate the model. You'll see the logging message indicating that the existing model was loaded. It will *not* retrain the model.
This is now a complete, robust, and reusable end-to-end machine learning pipeline.
👁️ Viewed: 4
Comments