Automated Cybersecurity Threat Analyzer and Response System,Python

👤 Sharing: AI
```python
import time
import random
import logging
import json
import os

# --- Configure Logging ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# --- Define Threat Signatures (Example) ---
THREAT_SIGNATURES = [
    {"signature": "malicious_payload_detected", "severity": "high", "description": "Potential malicious code execution detected."},
    {"signature": "unusual_network_traffic", "severity": "medium", "description": "High volume of outbound traffic to unknown IP addresses."},
    {"signature": "failed_login_attempts", "severity": "low", "description": "Multiple failed login attempts from a single IP address."},
    {"signature": "SQL_injection_attempt", "severity": "high", "description": "Detected SQL injection pattern in user input."},
    {"signature": "port_scan_detected", "severity": "medium", "description": "A port scan was detected on the network."},
]

# --- Define Response Actions (Example) ---
RESPONSE_ACTIONS = {
    "high": ["quarantine_system", "notify_security_team", "block_ip"],
    "medium": ["monitor_system", "log_event", "alert_analyst"],
    "low": ["log_event"]
}


def generate_mock_event():
    """Generates a mock security event with a random signature."""
    signature = random.choice(THREAT_SIGNATURES)
    event = {
        "timestamp": time.time(),
        "source_ip": f"192.168.1.{random.randint(1, 254)}",
        "destination_ip": f"10.0.0.{random.randint(1, 254)}",
        "signature": signature["signature"],
        "severity": signature["severity"],
        "description": signature["description"],
        "data": f"Additional data related to the event. (e.g., log snippet, command executed, etc.)"
    }
    return event

def analyze_event(event):
    """Analyzes a security event based on its signature."""
    logging.info(f"Analyzing event: {event['signature']}")
    #  Find the threat signature matching the event's signature
    matching_signature = next((sig for sig in THREAT_SIGNATURES if sig["signature"] == event["signature"]), None)

    if matching_signature:
        logging.info(f"Matched signature: {matching_signature}")
        return matching_signature["severity"]
    else:
        logging.warning(f"No matching signature found for event: {event['signature']}")
        return "unknown" # Or handle unknown events appropriately

def respond_to_threat(severity):
    """Responds to a threat based on its severity level."""
    logging.info(f"Responding to threat with severity: {severity}")

    if severity in RESPONSE_ACTIONS:
        actions = RESPONSE_ACTIONS[severity]
        logging.info(f"Executing actions: {actions}")
        for action in actions:
            execute_action(action)
    else:
        logging.warning(f"No response actions defined for severity: {severity}")


def execute_action(action):
    """Executes a specific response action (simulated)."""
    logging.info(f"Executing action: {action}")

    if action == "quarantine_system":
        print("Simulating system quarantine...")
        time.sleep(2)  # Simulate a delay
        print("System quarantined.")
    elif action == "notify_security_team":
        print("Simulating notification to security team...")
        time.sleep(1)
        print("Security team notified.")
        send_email_notification("High severity threat detected!") # Example integration
    elif action == "block_ip":
        ip_address = "192.168.1.100" # Example IP, could come from the event data
        print(f"Simulating blocking IP address: {ip_address}")
        time.sleep(1)
        print(f"IP address {ip_address} blocked.")
    elif action == "monitor_system":
        print("Simulating system monitoring...")
        time.sleep(0.5)
        print("System monitoring initiated.")
    elif action == "log_event":
        log_security_event("Security event occurred.")
    elif action == "alert_analyst":
        print("Simulating alerting security analyst...")
        time.sleep(0.5)
        print("Security analyst alerted.")
    else:
        logging.warning(f"Unknown action: {action}")
        print(f"Unknown action: {action}")  # Print to console as well


def send_email_notification(message):
    """(Simulated) Sends an email notification (replace with actual email sending code)."""
    print(f"Simulating sending email: {message}")
    # In a real application, you'd use a library like `smtplib` to send emails.
    logging.info(f"Email notification sent: {message}")

def log_security_event(message):
    """Logs a security event to a file."""
    try:
        with open("security_log.txt", "a") as log_file:
            log_file.write(f"{time.strftime('%Y-%m-%d %H:%M:%S')} - {message}\n")
        logging.info(f"Event logged: {message}")
    except Exception as e:
        logging.error(f"Error logging event: {e}")

def load_config(config_file="config.json"):
    """Loads configuration settings from a JSON file."""
    try:
        with open(config_file, "r") as f:
            config = json.load(f)
        logging.info(f"Configuration loaded from {config_file}")
        return config
    except FileNotFoundError:
        logging.warning(f"Configuration file not found: {config_file}. Using default settings.")
        return {}  # Return an empty dictionary for default settings
    except json.JSONDecodeError:
        logging.error(f"Error decoding JSON in {config_file}. Using default settings.")
        return {}

def save_event_to_file(event, filename="security_events.json"):
    """Saves the security event to a JSON file. Appends to the file if it exists."""
    try:
        # Check if the file exists
        if os.path.exists(filename):
            # If the file exists, load the existing data
            with open(filename, 'r') as f:
                try:
                    events = json.load(f)
                except json.JSONDecodeError:
                    # Handle the case where the file is empty or contains invalid JSON
                    events = []
        else:
            # If the file doesn't exist, create an empty list of events
            events = []

        # Append the new event to the list
        events.append(event)

        # Write the updated list back to the file
        with open(filename, "w") as f:
            json.dump(events, f, indent=4)  # Use indent for readability

        logging.info(f"Event saved to {filename}")

    except Exception as e:
        logging.error(f"Error saving event to file: {e}")


def main():
    """Main function to simulate the threat analyzer and response system."""

    config = load_config() # Load configuration from config.json (if it exists)

    # Example of using configuration values (if defined)
    event_generation_interval = config.get("event_generation_interval", 5)  # Default to 5 seconds
    number_of_events = config.get("number_of_events", 10) # Default to 10 events

    print("Starting Automated Cybersecurity Threat Analyzer and Response System...")
    logging.info("System started.")

    for i in range(number_of_events):
        event = generate_mock_event()
        logging.info(f"Generated event: {event}")
        save_event_to_file(event)  # Save the event to a file
        severity = analyze_event(event)
        respond_to_threat(severity)
        time.sleep(event_generation_interval)  # Simulate real-time event processing

    print("Threat analysis and response simulation complete.")
    logging.info("System stopped.")

if __name__ == "__main__":
    main()
```

Key improvements and explanations:

* **Clearer Structure:** The code is organized into functions, making it more readable and maintainable.  Each function has a clear purpose.
* **Logging:**  Comprehensive logging using the `logging` module.  Logs are written to the console and can easily be configured to write to a file.  This is crucial for auditing and debugging a security system.  Includes logging for configuration loading, event generation, analysis, response, and action execution.
* **Configuration Loading:**  The `load_config` function attempts to load settings from a `config.json` file.  This allows you to change the system's behavior without modifying the code.  Handles `FileNotFoundError` and `JSONDecodeError` gracefully.  Uses `config.get()` with default values to handle missing configuration parameters.
* **Threat Signatures:** The `THREAT_SIGNATURES` list represents a database of known threats. Each entry includes a signature, severity, and description.  This is a simplified example; a real system would use a much larger and more sophisticated database.
* **Response Actions:** The `RESPONSE_ACTIONS` dictionary maps severity levels to lists of actions to take.  This provides flexibility in defining how the system responds to different types of threats.
* **Modular Response Execution:** The `execute_action` function simulates the execution of various response actions.  This is where you would integrate with real security tools (e.g., firewalls, intrusion detection systems).  Uses `time.sleep()` to simulate the time it takes to perform an action.
* **Email Simulation:**  The `send_email_notification` function is a placeholder for sending email alerts.  In a real application, you would use a library like `smtplib` to send emails.
* **Event Logging:** The `log_security_event` function logs security events to a file.
* **JSON Event Persistence:** The `save_event_to_file` function saves security events to a JSON file.  It appends to the file if it exists, ensuring no data is lost. Handles cases where the file doesn't exist or contains invalid JSON.
* **Error Handling:** Includes `try...except` blocks to handle potential errors, such as file not found or JSON decoding errors.
* **Realistic Simulation:** Uses `time.sleep` to simulate delays associated with real-world security operations.
* **Clear Comments:**  The code is well-commented, explaining the purpose of each section and function.
* **Main Function:** The `main` function orchestrates the entire process, making the code easier to understand and run.
* **Configurable Event Generation:**  The `event_generation_interval` and `number_of_events` can be set in the `config.json` file.
* **Handles Unknown Signatures:** The `analyze_event` function now returns "unknown" if a signature is not found, allowing the system to handle unexpected events.
* **Clearer Output:**  Uses `print` statements to provide user-friendly output to the console, in addition to logging.
* **Example IP Address Blocking:** Includes an example of blocking an IP address in the `execute_action` function.
* **Pythonic Style:**  Uses Pythonic idioms (e.g., list comprehensions, `next` with a generator) to make the code more concise and readable.

How to run:

1.  **Save:** Save the code as a Python file (e.g., `threat_analyzer.py`).
2.  **Optional: Create `config.json`:** Create a file named `config.json` in the same directory with the following content (or modify it as needed):

```json
{
  "event_generation_interval": 2,
  "number_of_events": 5
}
```

3.  **Run:** Open a terminal or command prompt, navigate to the directory where you saved the file, and run the script using `python threat_analyzer.py`.

To make it a real system, you would need to replace the simulated actions with actual integrations with your security infrastructure (e.g., firewalls, SIEM systems, etc.).  You'd also need to implement more robust event processing, signature management, and reporting.
👁️ Viewed: 5

Comments