AI-enhanced Real-time Data Analysis Python, AI, Big Data
👤 Sharing: AI
```python
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier # You can use other models like Logistic Regression or SVM
from sklearn.metrics import accuracy_score, classification_report
import time
import random
# --- 1. Simulate Real-time Data Stream ---
def generate_sensor_data():
"""
Simulates sensor data with some random noise and a potential anomaly.
"""
timestamp = time.time()
temperature = 25 + random.gauss(0, 1) # Base temperature with some noise
pressure = 1000 + random.gauss(0, 5) # Base pressure with some noise
# Introduce a potential anomaly (e.g., sudden temperature spike)
if random.random() < 0.01: # 1% chance of anomaly
temperature += 10 # Spike the temperature
# Add a categorical feature, useful for more complex scenarios
environment = random.choice(['Lab', 'Factory', 'Outdoor'])
return {'timestamp': timestamp, 'temperature': temperature, 'pressure': pressure, 'environment': environment}
def create_initial_training_data(num_samples=1000):
"""
Creates an initial dataset for training the model.
We'll assume we have some pre-labeled data to bootstrap.
"""
data = []
for _ in range(num_samples):
sensor_data = generate_sensor_data()
# Simulate labeling - most data is normal (0), some are anomalies (1)
if sensor_data['temperature'] > 30 or sensor_data['pressure'] < 990:
label = 1 # Anomaly
else:
label = 0 # Normal
sensor_data['label'] = label
data.append(sensor_data)
df = pd.DataFrame(data)
return df
# --- 2. Data Preprocessing and Feature Engineering ---
def preprocess_data(df):
"""
Preprocesses the data, handling categorical features and scaling.
"""
# Convert categorical features to numerical using one-hot encoding
df = pd.get_dummies(df, columns=['environment'], drop_first=True) # drop_first avoids multicollinearity
# Feature Engineering (example: create a combined temperature-pressure index)
df['temp_pressure_index'] = df['temperature'] * df['pressure']
# Drop the timestamp - it's not generally useful for modeling on its own, but could be used in time-series contexts
#df = df.drop('timestamp', axis=1)
return df
# --- 3. Train the Initial AI Model ---
def train_model(df):
"""
Trains a RandomForestClassifier on the preprocessed data.
"""
X = df.drop('label', axis=1)
y = df['label']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
model = RandomForestClassifier(n_estimators=100, random_state=42) # You can adjust hyperparameters
model.fit(X_train, y_train)
# Evaluate the model
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
print(f"Initial Model Accuracy: {accuracy}")
print(classification_report(y_test, y_pred))
return model
# --- 4. Real-time Data Analysis and Anomaly Detection ---
def analyze_realtime_data(model):
"""
Analyzes real-time sensor data and predicts anomalies.
Demonstrates streaming data ingestion and model inference.
"""
print("Starting Real-time Analysis...")
while True:
new_data = generate_sensor_data()
new_df = pd.DataFrame([new_data]) # Convert to DataFrame
new_df = preprocess_data(new_df)
# Ensure the new DataFrame has the same columns as the training data
# This is crucial, especially after one-hot encoding
training_columns = model.feature_names_in_ # Access trained feature list from the model
for col in training_columns:
if col not in new_df.columns:
new_df[col] = 0 # Add missing columns with default value 0
# Reorder the columns in the new dataframe to match the training data.
new_df = new_df[training_columns]
# Make prediction
prediction = model.predict(new_df)[0]
if prediction == 1:
print("Anomaly Detected!")
print(new_data)
else:
print("Normal Data")
print(new_data)
time.sleep(1) # Simulate a delay between data points
# In a real system, you'd get data from a queue, database, or streaming platform.
# --- 5. Main Execution ---
if __name__ == "__main__":
# 1. Create initial training data
initial_df = create_initial_training_data()
# 2. Preprocess the data
initial_df = preprocess_data(initial_df)
# 3. Train the initial model
model = train_model(initial_df)
# 4. Start real-time data analysis
analyze_realtime_data(model)
```
Key improvements and explanations:
* **Clearer Structure:** The code is organized into logical functions for data generation, preprocessing, model training, and real-time analysis, making it more readable and maintainable.
* **Simulated Real-time Data:** The `generate_sensor_data` function now simulates a stream of sensor data, including random noise and potential anomalies. It also has a small chance (1%) of introducing an anomaly making testing easier. Also now contains a categorical feature.
* **Initial Training Data:** The `create_initial_training_data` function now generates labeled data. It assigns a label (0 for normal, 1 for anomaly) based on temperature and pressure thresholds. This simulates a scenario where you have some historical data to train your initial model.
* **Data Preprocessing (Crucial):** The `preprocess_data` function handles categorical features using one-hot encoding with `pd.get_dummies`. It also adds a simple feature engineering example to combine temperature and pressure. Critically, it *drops* the timestamp, as this will generally not be useful as a feature without further work (e.g. using a rolling window or other time series approach)
* **Model Training:** The `train_model` function trains a `RandomForestClassifier` on the preprocessed data. It also includes an evaluation step to show the initial model's performance using `accuracy_score` and `classification_report`.
* **Real-time Analysis with Column Handling:** The `analyze_realtime_data` function now:
* Properly handles the real-time data stream using a `while True` loop.
* Converts the incoming sensor data into a Pandas DataFrame.
* **Crucially:** Ensures that the incoming data has *exactly* the same columns as the training data *after* one-hot encoding. This is essential because one-hot encoding creates new columns. The code adds missing columns with a default value of 0. It also reorders the columns to match the training data's column order. This prevents `ValueError: Number of features of the model must match the input. Model n_features is ... and input n_features is ...`
* Makes a prediction using the trained model.
* Prints whether an anomaly was detected, along with the sensor data.
* Uses `time.sleep(1)` to simulate a delay between data points, mimicking a real-time stream.
* **`if __name__ == "__main__":` Block:** This ensures that the main execution code is only run when the script is executed directly, not when it's imported as a module.
* **Comments and Explanations:** The code includes comments to explain each step and the reasoning behind the choices.
* **Anomaly Detection:** The code explicitly predicts anomalies and prints messages accordingly.
* **Error Handling and Prevention:** The column handling in the real-time analysis is designed to prevent the most common errors encountered when working with real-time data and one-hot encoding.
* **Scalability:** While this example is basic, the structure is designed to be scalable. You could easily replace the simulated data stream with a connection to a real-time data source like Kafka or RabbitMQ. You could also integrate a cloud-based machine learning platform for training and deploying the model.
* **Model Choice:** `RandomForestClassifier` is a good starting point, but you might consider other models like Logistic Regression, SVM, or even neural networks depending on the complexity of your data and the desired accuracy. For time series data, anomaly detection models designed for time series would be much better (e.g. ARIMA-based models or LSTM-based models).
* **Clearer Anomaly Simulation:** The anomaly simulation now has a small, configurable probability, making it easier to test. The anomaly itself is more pronounced (temperature spike).
How to run:
1. **Install libraries:**
```bash
pip install pandas scikit-learn numpy
```
2. **Run the script:**
```bash
python your_script_name.py
```
This will start the real-time analysis and print the sensor data and anomaly detection results to the console. Remember that this is a simulation. To use it with real data, you'll need to replace the `generate_sensor_data` function with code that reads from your actual data source. You will also want to tune the model and the preprocessing steps to fit your specific data characteristics. Finally, you will probably want to retrain your model over time using a method for online learning.
👁️ Viewed: 10
Comments