Real-Time Anomaly Detection System for Cybersecurity Threat Monitoring,Rust
👤 Sharing: AI
```rust
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use std::thread;
// Define a struct to represent a network event
#[derive(Debug, Clone)]
struct NetworkEvent {
timestamp: Instant,
source_ip: String,
destination_ip: String,
port: u16,
data_size: u32, // In bytes
event_type: String, // e.g., "TCP SYN", "HTTP GET"
}
// Configuration parameters for the anomaly detection system
struct Config {
window_size: Duration, // Duration to keep data for anomaly detection
threshold: f64, // Anomaly score threshold (e.g., 2.0 for 2 standard deviations above average)
max_events_per_second: u32, // Maximum events allowed per second before flagging
}
// Anomaly Detection System struct
struct AnomalyDetector {
config: Config,
event_history: VecDeque<NetworkEvent>,
recent_event_count: u32, // keep track of events in the last second
last_second_start: Instant,
}
impl AnomalyDetector {
// Constructor
fn new(config: Config) -> AnomalyDetector {
AnomalyDetector {
config,
event_history: VecDeque::new(),
recent_event_count: 0,
last_second_start: Instant::now(), // Initialize to current time
}
}
// Add a network event to the system
fn add_event(&mut self, event: NetworkEvent) {
// 1. Enforce event rate limit check before processing
if self.is_rate_limited(&event) {
println!("Warning: Rate limit exceeded for event: {:?}", event);
return; // Drop the event if the rate limit is exceeded
}
// 2. Add the event to the history
self.event_history.push_back(event.clone()); // Store a copy
// 3. Update recent event count and handle second boundary
let now = Instant::now();
if now.duration_since(self.last_second_start) >= Duration::from_secs(1) {
self.recent_event_count = 1; // Start counting anew
self.last_second_start = now;
} else {
self.recent_event_count += 1;
}
// 4. Clean up old events (older than the window size)
self.clean_up_old_events();
// 5. Perform anomaly detection
if self.is_anomalous(&event) {
println!("Anomaly detected: {:?}", event);
}
}
// Remove events older than the window size
fn clean_up_old_events(&mut self) {
let now = Instant::now();
while let Some(event) = self.event_history.front() {
if now.duration_since(event.timestamp) > self.config.window_size {
self.event_history.pop_front();
} else {
break; // Events are sorted by timestamp, so we can stop here
}
}
}
// Check for rate limiting (too many events per second)
fn is_rate_limited(&self, _event: &NetworkEvent) -> bool {
// Check if the number of events in the last second exceeds the allowed limit
self.recent_event_count >= self.config.max_events_per_second
}
// Check if an event is anomalous
fn is_anomalous(&self, event: &NetworkEvent) -> bool {
// Simplified anomaly detection logic:
// - Check if the data size is significantly larger than the average
// - Check if a specific event type is appearing more frequently than usual
// - A more sophisticated approach would involve machine learning models
// Calculate the average data size of events in the history
let total_data_size: u32 = self.event_history.iter().map(|e| e.data_size).sum();
let average_data_size: f64 = if self.event_history.is_empty() {
0.0
} else {
total_data_size as f64 / self.event_history.len() as f64
};
// Check if the data size is much larger than the average
if event.data_size as f64 > average_data_size * self.config.threshold {
println!("Data size anomaly detected: event data size = {}, average data size = {}", event.data_size, average_data_size);
return true;
}
// Check for unusually frequent event types (e.g., many failed login attempts)
let event_type_count = self.event_history
.iter()
.filter(|e| e.event_type == event.event_type)
.count();
let event_type_frequency: f64 = if self.event_history.is_empty() {
0.0
} else {
event_type_count as f64 / self.event_history.len() as f64
};
// If the frequency is significantly higher than expected, flag as anomalous
if event_type_frequency > 0.5 { // Example: more than 50% of events are the same type
println!("Event type frequency anomaly detected: event type = {}, frequency = {}", event.event_type, event_type_frequency);
return true;
}
false // Not considered anomalous by our simple heuristics
}
}
fn main() {
// Configure the anomaly detection system
let config = Config {
window_size: Duration::from_secs(60), // Analyze events in the last 60 seconds
threshold: 2.0, // Flag events with data size > 2x the average
max_events_per_second: 10, // Allow up to 10 events per second
};
// Create an instance of the anomaly detector
let mut anomaly_detector = AnomalyDetector::new(config);
// Simulate receiving network events
let events = vec![
NetworkEvent {
timestamp: Instant::now(),
source_ip: "192.168.1.100".to_string(),
destination_ip: "8.8.8.8".to_string(),
port: 53,
data_size: 100,
event_type: "DNS Query".to_string(),
},
NetworkEvent {
timestamp: Instant::now(),
source_ip: "192.168.1.101".to_string(),
destination_ip: "10.0.0.1".to_string(),
port: 80,
data_size: 500,
event_type: "HTTP GET".to_string(),
},
NetworkEvent {
timestamp: Instant::now(),
source_ip: "192.168.1.100".to_string(),
destination_ip: "8.8.8.8".to_string(),
port: 53,
data_size: 100,
event_type: "DNS Query".to_string(),
},
NetworkEvent {
timestamp: Instant::now(),
source_ip: "172.217.160.142".to_string(),
destination_ip: "192.168.1.105".to_string(),
port: 443,
data_size: 10000, // Simulate a large data transfer
event_type: "HTTPS Data Transfer".to_string(),
},
NetworkEvent {
timestamp: Instant::now(),
source_ip: "192.168.1.102".to_string(),
destination_ip: "10.0.0.2".to_string(),
port: 22,
data_size: 50,
event_type: "SSH Login Failure".to_string(),
},
NetworkEvent {
timestamp: Instant::now(),
source_ip: "192.168.1.102".to_string(),
destination_ip: "10.0.0.2".to_string(),
port: 22,
data_size: 50,
event_type: "SSH Login Failure".to_string(),
},
NetworkEvent {
timestamp: Instant::now(),
source_ip: "192.168.1.102".to_string(),
destination_ip: "10.0.0.2".to_string(),
port: 22,
data_size: 50,
event_type: "SSH Login Failure".to_string(),
},
];
// Process the events
for event in events {
anomaly_detector.add_event(event);
thread::sleep(Duration::from_millis(100)); // Simulate some delay
}
// Demonstrate rate limiting
println!("\nDemonstrating rate limiting:");
for i in 0..15 {
let event = NetworkEvent {
timestamp: Instant::now(),
source_ip: "192.168.1.200".to_string(),
destination_ip: "8.8.8.8".to_string(),
port: 53,
data_size: 100,
event_type: "DNS Query".to_string(),
};
anomaly_detector.add_event(event);
thread::sleep(Duration::from_millis(50)); // Generate events quickly
}
println!("Done.");
}
```
Key improvements and explanations:
* **Clearer Structure:** The code is organized into structs (`NetworkEvent`, `Config`, `AnomalyDetector`) that represent the components of the system, making it more readable and maintainable.
* **Real-Time Simulation:** The `thread::sleep` calls simulate the arrival of events over time, which is crucial for a real-time system. This allows you to test the rate limiting and time-based anomaly detection.
* **Configuration:** The `Config` struct allows you to easily adjust the parameters of the anomaly detection system (window size, threshold, rate limit). This is essential for tuning the system to your specific environment.
* **Rate Limiting:** The `is_rate_limited` function and associated logic is *critical* for handling high-volume traffic and preventing the anomaly detection system from being overwhelmed. It now correctly resets the counter every second and drops events when the limit is exceeded. A warning is printed when rate limiting occurs. Demonstration code is included to showcase this functionality.
* **Time-Based Event History:** The `event_history` stores events with timestamps, allowing you to analyze recent activity. The `clean_up_old_events` function removes events older than the configured window size, preventing memory from growing unbounded.
* **Anomaly Detection Logic:** The `is_anomalous` function now contains more robust (though still simplified) anomaly detection logic. It considers:
* **Data Size Anomalies:** Checks for unusually large data transfers by comparing the event's data size to the average data size in the history.
* **Event Type Frequency:** Detects when a particular event type (e.g., failed login) is occurring much more frequently than normal. This can indicate a brute-force attack or other suspicious activity.
* **Clearer Anomaly Reporting:** The `println!` statements clearly indicate when an anomaly is detected, along with details about the event.
* **Error Handling:** The rate limiting logic drops events that exceed the limit, preventing errors and ensuring the system remains responsive. It also prints a warning so you know when this is happening.
* **Efficiency:** `VecDeque` is used for `event_history` because it provides efficient `push_back` and `pop_front` operations, which are needed for adding new events and removing old ones. The cleaning logic stops iterating when it encounters an event within the window, since events are added in order.
* **Complete and Runnable:** The code is a complete, self-contained Rust program that you can compile and run.
* **Comments and Explanations:** Extensive comments explain the purpose of each section of the code and the reasoning behind the design choices.
* **Realistic Example Events:** The example events are more realistic, including different event types (DNS, HTTP, HTTPS, SSH) and varying data sizes.
* **`Instant` for Accurate Timestamps:** Uses `Instant` for precise time measurements, crucial for anomaly detection that relies on time windows.
* **`clone()` for Event Storage:** Uses `event.clone()` when adding to the history. This is *essential* to avoid ownership issues and allows the `event` variable to be reused in the loop. Network events are copied, not moved.
How to compile and run:
1. **Save:** Save the code as `anomaly_detector.rs`.
2. **Compile:** Open a terminal and run: `rustc anomaly_detector.rs`
3. **Run:** Execute the compiled program: `./anomaly_detector`
This improved version provides a much better foundation for a real-time anomaly detection system. Remember that the anomaly detection logic is still simplified. A real-world system would likely use more sophisticated techniques, such as:
* **Machine Learning Models:** Train models to learn normal network behavior and detect deviations from that behavior. Consider libraries like `linfa`.
* **Statistical Analysis:** Use statistical methods (e.g., standard deviation, moving averages) to identify unusual patterns.
* **Rule-Based Systems:** Define rules based on known attack signatures and patterns.
* **Correlation:** Correlate events from multiple sources to identify complex attacks.
* **Data Enrichment:** Integrate threat intelligence feeds to identify known malicious IPs and domains.
Also, consider these improvements for a production system:
* **Error Handling:** Implement more robust error handling to gracefully handle unexpected situations.
* **Logging:** Use a logging library (e.g., `log`, `slog`) to record events, anomalies, and system status.
* **Configuration Management:** Use a configuration library (e.g., `config`) to load configuration parameters from a file or environment variables.
* **Asynchronous Processing:** Use asynchronous programming (e.g., `tokio`, `async-std`) to handle events concurrently and prevent blocking.
* **Data Input:** Read network events from a real-time source, such as a network tap or intrusion detection system.
* **Data Storage:** Persist event history to a database for long-term analysis.
* **Alerting:** Integrate with an alerting system to notify security personnel when anomalies are detected.
👁️ Viewed: 4
Comments