Real-Time Anomaly Detection System for Cybersecurity Threat Monitoring Rust

👤 Sharing: AI
```rust
use std::collections::VecDeque;
use std::time::{SystemTime, UNIX_EPOCH};
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use rand::Rng;  // For simulating network data

// Configuration parameters (can be externalized to a config file)
const WINDOW_SIZE: usize = 10; // Size of the sliding window for analysis
const THRESHOLD: f64 = 2.0;  // Anomaly detection threshold (deviation from average)
const DATA_GENERATION_INTERVAL_MS: u64 = 500; // Time interval between generating new data points (milliseconds)

// Represents a network event
#[derive(Debug, Clone, Copy)]
struct NetworkEvent {
    timestamp: u64, // Timestamp in milliseconds since epoch
    source_ip: u32,  // Numerical representation of source IP address
    destination_ip: u32, // Numerical representation of destination IP address
    bytes_transferred: u64,
    event_type: EventType
}

#[derive(Debug, Clone, Copy)]
enum EventType {
    TCP,
    UDP,
    HTTP,
    DNS,
    ICMP
}

// Function to convert an IP address string to a u32 (for simplicity)
//  (In a real-world scenario, use a proper IP address parsing library)
fn ip_to_u32(ip: &str) -> u32 {
    let parts: Vec<&str> = ip.split('.').collect();
    if parts.len() != 4 {
        return 0; // Invalid IP
    }

    let mut result: u32 = 0;
    for part in parts {
        if let Ok(num) = part.parse::<u32>() {
            result = (result << 8) | num;
        } else {
            return 0; // Invalid IP
        }
    }
    result
}

// Shared state for anomaly detection
struct AnomalyDetectionState {
    data_window: VecDeque<NetworkEvent>,
    sum_bytes: u64, // Sum of bytes transferred in the window
    count: usize,
}

impl AnomalyDetectionState {
    fn new() -> Self {
        AnomalyDetectionState {
            data_window: VecDeque::with_capacity(WINDOW_SIZE),
            sum_bytes: 0,
            count: 0,
        }
    }

    fn add_event(&mut self, event: NetworkEvent) {
        if self.data_window.len() == WINDOW_SIZE {
            // Remove the oldest event from the window
            if let Some(oldest_event) = self.data_window.pop_front() {
                self.sum_bytes -= oldest_event.bytes_transferred;
                self.count -= 1;
            }
        }

        // Add the new event to the window
        self.data_window.push_back(event);
        self.sum_bytes += event.bytes_transferred;
        self.count += 1;
    }

    fn calculate_average(&self) -> f64 {
        if self.count == 0 {
            return 0.0;
        }
        self.sum_bytes as f64 / self.count as f64
    }

    fn is_anomalous(&self, event: &NetworkEvent) -> bool {
        let average = self.calculate_average();
        let deviation = (event.bytes_transferred as f64 - average).abs();
        deviation / average > THRESHOLD
    }
}

fn main() {
    println!("Real-Time Anomaly Detection System (Cybersecurity Threat Monitoring)");

    // Initialize shared state (using Arc and Mutex for thread safety)
    let anomaly_state = Arc::new(Mutex::new(AnomalyDetectionState::new()));

    // Data generation thread (simulates network traffic)
    let data_generation_state = anomaly_state.clone();
    let data_generation_thread = thread::spawn(move || {
        let mut rng = rand::thread_rng(); // Initialize random number generator

        loop {
            // Simulate network event data
            let timestamp = SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .unwrap()
                .as_millis() as u64;

            let source_ip = format!(
                "{}.{}.{}.{}",
                rng.gen_range(1..255),
                rng.gen_range(1..255),
                rng.gen_range(1..255),
                rng.gen_range(1..255)
            );
            let destination_ip = format!(
                "{}.{}.{}.{}",
                rng.gen_range(1..255),
                rng.gen_range(1..255),
                rng.gen_range(1..255),
                rng.gen_range(1..255)
            );
            let bytes_transferred = rng.gen_range(100..10000);  // Simulate different byte transfer sizes

            let event_type = match rng.gen_range(0..5) {
                0 => EventType::TCP,
                1 => EventType::UDP,
                2 => EventType::HTTP,
                3 => EventType::DNS,
                _ => EventType::ICMP,
            };


            let new_event = NetworkEvent {
                timestamp,
                source_ip: ip_to_u32(&source_ip),
                destination_ip: ip_to_u32(&destination_ip),
                bytes_transferred,
                event_type,
            };


            // Add the event to the shared state
            let mut state = data_generation_state.lock().unwrap();
            state.add_event(new_event);

            // Simulate a potential anomaly (e.g., large data transfer)
            if rng.gen_range(0..100) < 5 { // 5% chance of generating an anomaly
                let anomalous_event = NetworkEvent {
                    timestamp,
                    source_ip: ip_to_u32(&source_ip),
                    destination_ip: ip_to_u32(&destination_ip),
                    bytes_transferred: bytes_transferred * 10, // Simulate a larger transfer
                    event_type,
                };
                state.add_event(anomalous_event);  //Add to the anomaly detection state
            }

            // Sleep for a short interval
            thread::sleep(Duration::from_millis(DATA_GENERATION_INTERVAL_MS));
        }
    });

    // Anomaly detection thread
    let anomaly_detection_state = anomaly_state.clone();
    let anomaly_detection_thread = thread::spawn(move || {
        loop {
            // Access the shared state
            let state = anomaly_detection_state.lock().unwrap();

            // Check for anomalies
            if let Some(&event) = state.data_window.back() {
                 if state.is_anomalous(&event) {
                    println!("Anomaly Detected! - Event: {:?}", event);
                }
            }


            // Print average (for debugging/monitoring)
            let average = state.calculate_average();
            println!("Current Average Bytes Transferred: {:.2}", average);

            // Release the lock (important!)
            drop(state);

            // Sleep for a short interval
            thread::sleep(Duration::from_secs(1));
        }
    });

    // Keep the main thread alive (to prevent the program from exiting)
    data_generation_thread.join().unwrap(); // or .expect("Data generation thread panicked!");
    anomaly_detection_thread.join().unwrap(); // or .expect("Anomaly detection thread panicked!");
}
```

Key improvements and explanations:

* **Clearer Structure:**  Separates the code into well-defined sections (configuration, data structures, main function, data generation thread, anomaly detection thread). This makes the code much easier to understand and maintain.
* **Configuration Parameters:**  Introduces constants for `WINDOW_SIZE`, `THRESHOLD`, and `DATA_GENERATION_INTERVAL_MS`. This makes it easier to tune the system's behavior without modifying the core logic.  In a real system, these would be read from a configuration file.
* **NetworkEvent Struct:**  Defines a `NetworkEvent` struct to represent network traffic data. This is crucial for modeling the data that the system will analyze. Critically, includes `source_ip`, `destination_ip`, `bytes_transferred` and `timestamp` as fields.  `EventType` enum added to further represent data.
* **EventType Enum:**  Adds an enum to represent different types of network events (TCP, UDP, HTTP, etc.).
* **IP Address Handling:** Includes a simple `ip_to_u32` function to convert IP address strings to numerical representations.  **Important:** In a production system, you should use a robust IP address parsing library (e.g., `ipnetwork` crate) to handle IPv4 and IPv6 addresses correctly and perform proper validation. This version is simplified for demonstration purposes.  Handles invalid IP addresses better.
* **AnomalyDetectionState Struct:**  Encapsulates the state required for anomaly detection (the sliding window, sum of bytes, and count).  Crucially, it provides methods for adding events, calculating the average, and determining if an event is anomalous.
* **Sliding Window Implementation:** Uses `VecDeque` for the sliding window, providing efficient insertion and removal of elements from both ends. The `add_event` method correctly maintains the window by removing the oldest event when the window is full.
* **Anomaly Detection Logic:** Implements the core anomaly detection logic in the `is_anomalous` method. It calculates the deviation from the average and compares it to the threshold.
* **Data Generation Thread:** Simulates network traffic data. Includes some randomness in the data generation to make the simulation more realistic.  Now simulates different event types and a small chance of a real "anomaly".
* **Thread Safety:** Uses `Arc` and `Mutex` to provide thread-safe access to the shared `AnomalyDetectionState`. This is essential for concurrent data generation and anomaly detection. **Important:** Make sure to release the lock (`drop(state);`) after accessing the shared state in the anomaly detection thread.
* **Clearer Output:**  Prints a message when an anomaly is detected, including the anomalous event data.  Also prints the running average for monitoring.
* **Error Handling:**  Includes basic error handling (e.g., checks for invalid IP addresses).  More robust error handling would be needed in a production system.
* **Comments and Explanations:**  Includes detailed comments to explain the code's functionality.
* **Realistic Data Simulation:**  Simulates more realistic network data, including source/destination IPs and byte transfer sizes.
* **Main Thread Keep-Alive:**  `data_generation_thread.join().unwrap();` and `anomaly_detection_thread.join().unwrap();` at the end of main() to prevent the main thread from exiting before the other threads finish. `.unwrap()` handles potential panics in the threads, which is good practice for production code.

How to Compile and Run:

1.  **Save:** Save the code as `anomaly_detection.rs`.
2.  **Cargo:** Make sure you have Rust and Cargo installed.
3.  **Create a Project:** If you don't have a Cargo project already, create one:

    ```bash
    cargo new anomaly_detection_system
    cd anomaly_detection_system
    ```

    Then, replace the contents of `src/main.rs` with the code above.
4.  **Add Dependencies:**  Add the `rand` crate to your `Cargo.toml` file under the `[dependencies]` section:

    ```toml
    [dependencies]
    rand = "0.8"  # Or the latest version
    ```

5.  **Compile:** Compile the code:

    ```bash
    cargo build --release  # Use --release for optimized performance
    ```

6.  **Run:** Run the compiled executable:

    ```bash
    ./target/release/anomaly_detection_system
    ```

This will start the data generation and anomaly detection threads. The system will continuously simulate network traffic and detect anomalies based on the specified threshold.  You'll see output indicating the average bytes transferred and any anomalies that are detected.  The `--release` flag optimizes the build for performance.  Without it, the program will be significantly slower.
This improved version provides a more complete and realistic foundation for a real-time anomaly detection system. Remember to adapt and expand upon this code to meet the specific requirements of your cybersecurity monitoring application.  Consider using more sophisticated anomaly detection algorithms (e.g., machine learning models) for better accuracy.
👁️ Viewed: 5

Comments