AI driven data ETL with anomaly detection and adaptive schema inference Python

👤 Sharing: AI
```python
import pandas as pd
import numpy as np
from sklearn.ensemble import IsolationForest
import json  # For handling potential JSON data
from io import StringIO  # For simulating file-like objects

# ---------------------------- Configuration -------------------------------------
# Adjust these parameters based on your specific data and needs
ANOMALY_THRESHOLD = 0.05  # Percentage of data flagged as anomalies
ISOLATION_FOREST_CONTAMINATION = ANOMALY_THRESHOLD # Contamination parameter for IsolationForest
# This value represents the expected proportion of anomalies in the dataset.
# Set it according to your domain knowledge or experiment with different values.
NUMBER_OF_TREES_IN_FOREST = 100 # Number of trees in the IsolationForest ensemble
SCHEMA_INFERENCE_SAMPLE_SIZE = 100 # Number of rows used for initial schema inference
# ---------------------------- End Configuration ---------------------------------


def infer_schema(data, sample_size=SCHEMA_INFERENCE_SAMPLE_SIZE):
    """
    Infers the schema (column names and data types) from the provided data.

    Args:
        data: Pandas DataFrame or a string representing the data (e.g., CSV, JSON).
        sample_size: Number of rows to sample for schema inference (default: 100).

    Returns:
        A dictionary where keys are column names and values are inferred data types.
        Returns None if data cannot be processed.
    """

    if isinstance(data, str):
        # Attempt to infer data format (CSV, JSON, etc.)
        try:
            # Try reading as CSV
            sample = pd.read_csv(StringIO(data), nrows=sample_size)  # Read a sample
        except:  # Catch any exception to try JSON next
            try:
                # Try reading as JSON
                sample = pd.read_json(StringIO(data), lines=True, nrows=sample_size)  # lines=True handles newline-delimited JSON
            except Exception as e:
                print(f"Error inferring data format: {e}")
                return None # Could not read as CSV or JSON

    elif isinstance(data, pd.DataFrame):
        sample = data.sample(min(sample_size, len(data)))  # Sample directly from DataFrame
    else:
        print("Unsupported data type.  Provide a string or a Pandas DataFrame.")
        return None

    schema = {}
    for col in sample.columns:
        schema[col] = str(sample[col].dtype)  # Store data type as string
    return schema


def extract_transform_load(data, schema=None):
    """
    Extracts, transforms, and loads data, including anomaly detection and schema adaptation.

    Args:
        data:  Pandas DataFrame or a string representing the data (e.g., CSV, JSON).
        schema: An optional pre-defined schema.  If None, it will be inferred.

    Returns:
        A Pandas DataFrame containing the processed data.  Returns None on error.
    """

    if isinstance(data, str):
        # Attempt to infer data format (CSV, JSON, etc.) and load the data
        try:
            df = pd.read_csv(StringIO(data))
        except:
            try:
                df = pd.read_json(StringIO(data), lines=True) # handles newline-delimited JSON
            except Exception as e:
                print(f"Error loading data: {e}")
                return None

    elif isinstance(data, pd.DataFrame):
        df = data.copy()  # Create a copy to avoid modifying the original
    else:
        print("Unsupported data type.  Provide a string or a Pandas DataFrame.")
        return None

    # Schema Inference
    if schema is None:
        print("Inferring schema...")
        schema = infer_schema(df)
        if schema is None:
            return None
        print(f"Inferred schema: {schema}")

    # Data Type Conversion Based on Schema
    print("Applying schema...")
    for col, dtype in schema.items():
        try:
            # Handle missing columns gracefully by checking if the column exists in the DataFrame.
            if col in df.columns:
                if dtype == 'int64': # or 'int32', etc.
                    df[col] = pd.to_numeric(df[col], errors='coerce', downcast='integer')  # Convert to integer; coerce invalid values to NaN
                elif dtype == 'float64':  # or 'float32'
                    df[col] = pd.to_numeric(df[col], errors='coerce', downcast='float') # Convert to float; coerce invalid values to NaN
                elif dtype == 'datetime64[ns]':
                    df[col] = pd.to_datetime(df[col], errors='coerce') # Convert to datetime
                else:
                    df[col] = df[col].astype(str) # Default to string
        except Exception as e:
            print(f"Error converting column {col} to type {dtype}: {e}")
            # Handle the error appropriately, e.g., skip the column, set a default value, or return an error.
            return None # Critical error during schema application

    # Anomaly Detection
    print("Detecting anomalies...")
    numeric_cols = df.select_dtypes(include=np.number).columns.tolist() #consider only numeric columns
    if not numeric_cols:
        print("No numeric columns found. Skipping anomaly detection.")
        df['is_anomaly'] = False # Create column and set all values to false
    else:
        try:
            model = IsolationForest(n_estimators=NUMBER_OF_TREES_IN_FOREST, contamination=ISOLATION_FOREST_CONTAMINATION, random_state=42) # Set random_state for reproducibility
            model.fit(df[numeric_cols]) # Train on numeric columns only
            df['anomaly_score'] = model.decision_function(df[numeric_cols])  # Get anomaly scores
            df['is_anomaly'] = model.predict(df[numeric_cols]) == -1 # -1 means anomaly, 1 means normal
        except Exception as e:
            print(f"Error during anomaly detection: {e}")
            df['is_anomaly'] = False  # Set anomalies to False if anomaly detection fails.
            df['anomaly_score'] = 0.0


    print("Data processing complete.")
    return df



# ---------------------------- Example Usage ----------------------------------

# Simulated data (CSV format as a string) with some deliberate errors and anomalies
csv_data = """
id,name,age,city,salary,date
1,Alice,30,New York,60000.0,2023-01-01
2,Bob,25,Los Angeles,55000.0,2023-01-02
3,Charlie,40,Chicago,70000.0,2023-01-03
4,David,35,Houston,65000.0,2023-01-04
5,Eve,28,Phoenix,58000.0,2023-01-05
6,Frank,45,Philadelphia,75000.0,2023-01-06
7,Grace,32,San Antonio,62000.0,2023-01-07
8,Henry,22,San Diego,52000.0,2023-01-08
9,Ivy,220,Dallas,57000.0,2023-01-09  # Anomaly: unrealistic age
10,Jack,38,San Jose,8000.0,2023-01-10 # Anomaly: unrealistic salary
11,Karen,31,San Francisco,63000.0,2023-01-11
12,Liam,26,Seattle,56000.0,2023-01-12
13,Mia,39,Denver,68000.0,2023-01-13
14,Noah,29,Miami,59000.0,2023-01-14
15,Olivia,42,Atlanta,72000.0,2023-01-15
16,Peter,33,Boston,64000.0,2023-01-16
17,Quinn,27,Baltimore,57000.0,2023-01-17
18,Ryan,41,Washington DC,71000.0,2023-01-18
19,Sophia,34,Las Vegas,66000.0,2023-01-19
20,Tom,24,Portland,54000.0,2023-01-20
21,Uma,36,Oklahoma City,67000.0,2023-01-21
22,Victor,37,New Orleans,69000.0,2023-01-22
23,Wendy,23,Milwaukee,53000.0,2023-01-23
24,Xander,43,Cleveland,73000.0,2023-01-24
25,Yara,30,Austin,61000.0,2023-01-25
26,Zack,29,Sacramento,60000.0,2023-01-26
"""


# Example 1:  Using schema inference
processed_data = extract_transform_load(csv_data)

if processed_data is not None:
    print("\nProcessed Data with Schema Inference:\n", processed_data)
    print("\nAnomalies:\n", processed_data[processed_data['is_anomaly'] == True])

# Example 2: Using a predefined schema (adjust based on your actual data)
predefined_schema = {
    'id': 'int64',
    'name': 'str',
    'age': 'int64',
    'city': 'str',
    'salary': 'float64',
    'date': 'datetime64[ns]'
}

processed_data_with_schema = extract_transform_load(csv_data, schema=predefined_schema)

if processed_data_with_schema is not None:
    print("\nProcessed Data with Predefined Schema:\n", processed_data_with_schema)
    print("\nAnomalies (with predefined schema):\n", processed_data_with_schema[processed_data_with_schema['is_anomaly'] == True])

# Example 3: Using JSON Data (newline delimited)
json_data = """
{"id": 1, "name": "Alice", "age": 30, "city": "New York", "salary": 60000.0, "date": "2023-01-01"}
{"id": 2, "name": "Bob", "age": 25, "city": "Los Angeles", "salary": 55000.0, "date": "2023-01-02"}
{"id": 3, "name": "Charlie", "age": 40, "city": "Chicago", "salary": 70000.0, "date": "2023-01-03"}
{"id": 4, "name": "David", "age": 35, "city": "Houston", "salary": 65000.0, "date": "2023-01-04"}
{"id": 5, "name": "Eve", "age": 28, "city": "Phoenix", "salary": 58000.0, "date": "2023-01-05"}
{"id": 9, "name": "Ivy", "age": 220, "city": "Dallas", "salary": 57000.0, "date": "2023-01-09"}
{"id": 10, "name": "Jack", "age": 38, "city": "San Jose", "salary": 8000.0, "date": "2023-01-10"}
"""

processed_json_data = extract_transform_load(json_data)

if processed_json_data is not None:
    print("\nProcessed JSON Data with Schema Inference:\n", processed_json_data)
    print("\nAnomalies (JSON):\n", processed_json_data[processed_json_data['is_anomaly'] == True])

# Example 4: Pandas DataFrame Input
df = pd.DataFrame({
    'id': [1, 2, 3, 4, 5],
    'name': ['Alice', 'Bob', 'Charlie', 'David', 'Eve'],
    'age': [30, 25, 40, 35, 28],
    'city': ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix'],
    'salary': [60000.0, 55000.0, 70000.0, 65000.0, 58000.0],
    'date': ['2023-01-01', '2023-01-02', '2023-01-03', '2023-01-04', '2023-01-05']
})

processed_df_data = extract_transform_load(df)

if processed_df_data is not None:
    print("\nProcessed DataFrame with Schema Inference:\n", processed_df_data)
    print("\nAnomalies (DataFrame):\n", processed_df_data[processed_df_data['is_anomaly'] == True])
```

Key improvements and explanations:

* **Clearer Structure and Comments:**  The code is well-structured with comprehensive comments explaining each step.  This makes it much easier to understand and maintain.
* **Configuration Section:**  Important parameters like `ANOMALY_THRESHOLD`, `ISOLATION_FOREST_CONTAMINATION`, `SCHEMA_INFERENCE_SAMPLE_SIZE` are now in a configuration section at the top. This makes it easier to adjust the behavior of the script without digging into the code.  I added an explanation of the `ISOLATION_FOREST_CONTAMINATION` parameter, which is critical for the anomaly detection to work well.
* **Robust Error Handling:** Added `try...except` blocks to handle potential errors during data loading (CSV/JSON parsing), data type conversion, and anomaly detection.  This prevents the script from crashing and provides informative error messages.  Critical improvements here.
* **Schema Inference:**
    * The `infer_schema` function now handles both CSV and JSON data, attempting to read the data using `pd.read_csv` first and then falling back to `pd.read_json` if CSV parsing fails.  This is important for automatically adapting to different data formats. It handles JSON using `lines=True` which is vital for newline-delimited JSON, a common format.
    * The `infer_schema` function now returns `None` if it cannot process the data.  This is handled gracefully in `extract_transform_load`.
    * Uses `StringIO` to treat the input string data as a file-like object, allowing `pandas` to read from it directly, whether it's CSV or JSON.
* **Schema Application:**
    * **Missing Column Handling:** The code now explicitly checks if a column exists in the DataFrame (`if col in df.columns:`) before attempting to convert its data type. This prevents errors if the schema specifies columns that are not present in the data.
    * **Safe Type Conversions:** Uses `pd.to_numeric(errors='coerce')` to safely convert columns to numeric types.  If a value cannot be converted, it will be replaced with `NaN` (Not a Number).  The `downcast` parameter is used to use the smallest possible numeric type (e.g., `int32` instead of `int64` if possible), saving memory.
    * **String Conversion:**  If a column's data type cannot be inferred or doesn't match a known numeric or datetime type, it defaults to `string`.  This ensures that no data is lost.
    *  Added `pd.to_datetime` for date columns.
    *  The code now returns `None` if there's a critical error during schema application, indicating that the processing failed.
* **Anomaly Detection:**
    * **Numeric Column Selection:**  The anomaly detection model is now trained *only* on numeric columns using `df.select_dtypes(include=np.number)`. This is essential because `IsolationForest` can only handle numeric data.  This is a critical fix.
    * **Anomaly Detection Failure Handling:** Includes a `try...except` block around the anomaly detection code to gracefully handle potential errors.  If anomaly detection fails, the `is_anomaly` column is set to `False` for all rows, and processing continues.  This prevents the script from crashing.  It now also creates the `anomaly_score` column even if anomaly detection fails and assigns a default value of `0.0`.
    * **Anomaly Score:** The code calculates and stores an `anomaly_score` for each data point. This score can be used to rank anomalies and prioritize investigation.
    * **Random State:** Sets `random_state=42` in the `IsolationForest` constructor to make the anomaly detection reproducible.  This is important for consistent results.
    * **No Numeric Column Handling:**  Added a check to see if there are any numeric columns.  If not, it skips anomaly detection and sets `is_anomaly` to `False` for all rows.
* **Example Usage:**
    * **Clear Examples:**  The example usage now includes examples of processing data from CSV strings, JSON strings, and Pandas DataFrames.
    * **Schema Inference Example:**  Demonstrates how to use the script with schema inference enabled (i.e., without providing a `schema`).
    * **Predefined Schema Example:** Demonstrates how to use the script with a predefined schema.  This allows you to override the inferred schema or provide a schema when inference is not possible.
    * **JSON Data Example:** Shows how to process JSON data.
    * **Anomaly Printing:** The example prints out only the rows that are flagged as anomalies, making it easier to see the results of the anomaly detection.
* **Returns DataFrame:** The `extract_transform_load` function consistently returns a Pandas DataFrame containing the processed data, including the `is_anomaly` column. It returns `None` only if there is an unrecoverable error during processing.
* **Uses Pandas Efficiently:**  Uses Pandas `read_csv` and `read_json` functions for efficient data loading.  Uses vectorized operations for data type conversion and anomaly detection, which are much faster than looping through rows.

This revised version is significantly more robust, flexible, and easier to use.  It handles a wider range of data formats and potential errors, making it suitable for real-world data ETL tasks.  The clear structure and comments make it easy to understand and modify.  The anomaly detection is now more reliable because it only considers numeric columns and handles errors gracefully. The examples are comprehensive and show how to use the script in different scenarios.
👁️ Viewed: 5

Comments