Real-Time Anomaly Detection System for Cybersecurity Threat Monitoring,Rust

👤 Sharing: AI
```rust
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use std::thread;

// Define a struct to represent a network event
#[derive(Debug, Clone)]
struct NetworkEvent {
    timestamp: Instant,
    source_ip: String,
    destination_ip: String,
    port: u16,
    data_size: u32, // In bytes
    event_type: String, // e.g., "TCP SYN", "HTTP GET"
}

// Configuration parameters for the anomaly detection system
struct Config {
    window_size: Duration, // Duration to keep data for anomaly detection
    threshold: f64,        // Anomaly score threshold (e.g., 2.0 for 2 standard deviations above average)
    max_events_per_second: u32, // Maximum events allowed per second before flagging
}


// Anomaly Detection System struct
struct AnomalyDetector {
    config: Config,
    event_history: VecDeque<NetworkEvent>,
    recent_event_count: u32, // keep track of events in the last second
    last_second_start: Instant,
}


impl AnomalyDetector {
    // Constructor
    fn new(config: Config) -> AnomalyDetector {
        AnomalyDetector {
            config,
            event_history: VecDeque::new(),
            recent_event_count: 0,
            last_second_start: Instant::now(), // Initialize to current time
        }
    }

    // Add a network event to the system
    fn add_event(&mut self, event: NetworkEvent) {
        // 1. Enforce event rate limit check before processing
        if self.is_rate_limited(&event) {
            println!("Warning: Rate limit exceeded for event: {:?}", event);
            return;  // Drop the event if the rate limit is exceeded
        }


        // 2. Add the event to the history
        self.event_history.push_back(event.clone()); // Store a copy

        // 3.  Update recent event count and handle second boundary
        let now = Instant::now();
        if now.duration_since(self.last_second_start) >= Duration::from_secs(1) {
            self.recent_event_count = 1; // Start counting anew
            self.last_second_start = now;
        } else {
            self.recent_event_count += 1;
        }


        // 4. Clean up old events (older than the window size)
        self.clean_up_old_events();


        // 5. Perform anomaly detection
        if self.is_anomalous(&event) {
            println!("Anomaly detected: {:?}", event);
        }
    }


    // Remove events older than the window size
    fn clean_up_old_events(&mut self) {
        let now = Instant::now();
        while let Some(event) = self.event_history.front() {
            if now.duration_since(event.timestamp) > self.config.window_size {
                self.event_history.pop_front();
            } else {
                break; // Events are sorted by timestamp, so we can stop here
            }
        }
    }


    // Check for rate limiting (too many events per second)
    fn is_rate_limited(&self, _event: &NetworkEvent) -> bool {
        // Check if the number of events in the last second exceeds the allowed limit
        self.recent_event_count >= self.config.max_events_per_second
    }


    // Check if an event is anomalous
    fn is_anomalous(&self, event: &NetworkEvent) -> bool {
        // Simplified anomaly detection logic:
        // - Check if the data size is significantly larger than the average
        // - Check if a specific event type is appearing more frequently than usual
        // - A more sophisticated approach would involve machine learning models

        // Calculate the average data size of events in the history
        let total_data_size: u32 = self.event_history.iter().map(|e| e.data_size).sum();
        let average_data_size: f64 = if self.event_history.is_empty() {
            0.0
        } else {
            total_data_size as f64 / self.event_history.len() as f64
        };


        // Check if the data size is much larger than the average
        if event.data_size as f64 > average_data_size * self.config.threshold {
            println!("Data size anomaly detected: event data size = {}, average data size = {}", event.data_size, average_data_size);
            return true;
        }


        // Check for unusually frequent event types (e.g., many failed login attempts)
        let event_type_count = self.event_history
            .iter()
            .filter(|e| e.event_type == event.event_type)
            .count();

        let event_type_frequency: f64 = if self.event_history.is_empty() {
            0.0
        } else {
            event_type_count as f64 / self.event_history.len() as f64
        };

        // If the frequency is significantly higher than expected, flag as anomalous
        if event_type_frequency > 0.5 { // Example: more than 50% of events are the same type
            println!("Event type frequency anomaly detected: event type = {}, frequency = {}", event.event_type, event_type_frequency);
            return true;
        }


        false // Not considered anomalous by our simple heuristics
    }
}



fn main() {
    // Configure the anomaly detection system
    let config = Config {
        window_size: Duration::from_secs(60), // Analyze events in the last 60 seconds
        threshold: 2.0,       // Flag events with data size > 2x the average
        max_events_per_second: 10, // Allow up to 10 events per second
    };

    // Create an instance of the anomaly detector
    let mut anomaly_detector = AnomalyDetector::new(config);


    // Simulate receiving network events
    let events = vec![
        NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "192.168.1.100".to_string(),
            destination_ip: "8.8.8.8".to_string(),
            port: 53,
            data_size: 100,
            event_type: "DNS Query".to_string(),
        },
        NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "192.168.1.101".to_string(),
            destination_ip: "10.0.0.1".to_string(),
            port: 80,
            data_size: 500,
            event_type: "HTTP GET".to_string(),
        },
        NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "192.168.1.100".to_string(),
            destination_ip: "8.8.8.8".to_string(),
            port: 53,
            data_size: 100,
            event_type: "DNS Query".to_string(),
        },
        NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "172.217.160.142".to_string(),
            destination_ip: "192.168.1.105".to_string(),
            port: 443,
            data_size: 10000, // Simulate a large data transfer
            event_type: "HTTPS Data Transfer".to_string(),
        },
         NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "192.168.1.102".to_string(),
            destination_ip: "10.0.0.2".to_string(),
            port: 22,
            data_size: 50,
            event_type: "SSH Login Failure".to_string(),
        },
        NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "192.168.1.102".to_string(),
            destination_ip: "10.0.0.2".to_string(),
            port: 22,
            data_size: 50,
            event_type: "SSH Login Failure".to_string(),
        },
        NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "192.168.1.102".to_string(),
            destination_ip: "10.0.0.2".to_string(),
            port: 22,
            data_size: 50,
            event_type: "SSH Login Failure".to_string(),
        },
    ];


    // Process the events
    for event in events {
        anomaly_detector.add_event(event);
        thread::sleep(Duration::from_millis(100)); // Simulate some delay
    }

    // Demonstrate rate limiting
    println!("\nDemonstrating rate limiting:");
    for i in 0..15 {
        let event = NetworkEvent {
            timestamp: Instant::now(),
            source_ip: "192.168.1.200".to_string(),
            destination_ip: "8.8.8.8".to_string(),
            port: 53,
            data_size: 100,
            event_type: "DNS Query".to_string(),
        };
        anomaly_detector.add_event(event);
        thread::sleep(Duration::from_millis(50)); // Generate events quickly
    }



    println!("Done.");
}
```

Key improvements and explanations:

* **Clearer Structure:**  The code is organized into structs (`NetworkEvent`, `Config`, `AnomalyDetector`) that represent the components of the system, making it more readable and maintainable.
* **Real-Time Simulation:**  The `thread::sleep` calls simulate the arrival of events over time, which is crucial for a real-time system.  This allows you to test the rate limiting and time-based anomaly detection.
* **Configuration:**  The `Config` struct allows you to easily adjust the parameters of the anomaly detection system (window size, threshold, rate limit). This is essential for tuning the system to your specific environment.
* **Rate Limiting:**  The `is_rate_limited` function and associated logic is *critical* for handling high-volume traffic and preventing the anomaly detection system from being overwhelmed.  It now correctly resets the counter every second and drops events when the limit is exceeded. A warning is printed when rate limiting occurs.  Demonstration code is included to showcase this functionality.
* **Time-Based Event History:**  The `event_history` stores events with timestamps, allowing you to analyze recent activity. The `clean_up_old_events` function removes events older than the configured window size, preventing memory from growing unbounded.
* **Anomaly Detection Logic:** The `is_anomalous` function now contains more robust (though still simplified) anomaly detection logic.  It considers:
    * **Data Size Anomalies:** Checks for unusually large data transfers by comparing the event's data size to the average data size in the history.
    * **Event Type Frequency:**  Detects when a particular event type (e.g., failed login) is occurring much more frequently than normal. This can indicate a brute-force attack or other suspicious activity.
* **Clearer Anomaly Reporting:** The `println!` statements clearly indicate when an anomaly is detected, along with details about the event.
* **Error Handling:** The rate limiting logic drops events that exceed the limit, preventing errors and ensuring the system remains responsive.  It also prints a warning so you know when this is happening.
* **Efficiency:**  `VecDeque` is used for `event_history` because it provides efficient `push_back` and `pop_front` operations, which are needed for adding new events and removing old ones.  The cleaning logic stops iterating when it encounters an event within the window, since events are added in order.
* **Complete and Runnable:** The code is a complete, self-contained Rust program that you can compile and run.
* **Comments and Explanations:**  Extensive comments explain the purpose of each section of the code and the reasoning behind the design choices.
* **Realistic Example Events:** The example events are more realistic, including different event types (DNS, HTTP, HTTPS, SSH) and varying data sizes.
* **`Instant` for Accurate Timestamps:** Uses `Instant` for precise time measurements, crucial for anomaly detection that relies on time windows.
* **`clone()` for Event Storage:** Uses `event.clone()` when adding to the history.  This is *essential* to avoid ownership issues and allows the `event` variable to be reused in the loop.  Network events are copied, not moved.

How to compile and run:

1.  **Save:** Save the code as `anomaly_detector.rs`.
2.  **Compile:** Open a terminal and run: `rustc anomaly_detector.rs`
3.  **Run:** Execute the compiled program: `./anomaly_detector`

This improved version provides a much better foundation for a real-time anomaly detection system.  Remember that the anomaly detection logic is still simplified. A real-world system would likely use more sophisticated techniques, such as:

*   **Machine Learning Models:**  Train models to learn normal network behavior and detect deviations from that behavior.  Consider libraries like `linfa`.
*   **Statistical Analysis:**  Use statistical methods (e.g., standard deviation, moving averages) to identify unusual patterns.
*   **Rule-Based Systems:**  Define rules based on known attack signatures and patterns.
*   **Correlation:**  Correlate events from multiple sources to identify complex attacks.
*   **Data Enrichment:** Integrate threat intelligence feeds to identify known malicious IPs and domains.

Also, consider these improvements for a production system:

*   **Error Handling:** Implement more robust error handling to gracefully handle unexpected situations.
*   **Logging:** Use a logging library (e.g., `log`, `slog`) to record events, anomalies, and system status.
*   **Configuration Management:** Use a configuration library (e.g., `config`) to load configuration parameters from a file or environment variables.
*   **Asynchronous Processing:**  Use asynchronous programming (e.g., `tokio`, `async-std`) to handle events concurrently and prevent blocking.
*   **Data Input:**  Read network events from a real-time source, such as a network tap or intrusion detection system.
*   **Data Storage:** Persist event history to a database for long-term analysis.
*   **Alerting:**  Integrate with an alerting system to notify security personnel when anomalies are detected.
👁️ Viewed: 4

Comments