AI-driven Cybersecurity Scanner Python, AI
👤 Sharing: AI
```python
import os
import hashlib
import time
import random
import warnings
import threading
# Ignore warnings for cleaner output
warnings.filterwarnings("ignore")
class AIDrivenScanner:
"""
A simple AI-driven cybersecurity scanner that simulates the detection
of malicious files using hashing and anomaly detection. It's a basic
example meant for educational purposes and not a production-ready tool.
"""
def __init__(self, known_malware_hashes_file="known_malware_hashes.txt", anomaly_threshold=0.95, scan_directory="."):
"""
Initializes the scanner.
Args:
known_malware_hashes_file (str): Path to a file containing known malware hashes (one hash per line).
anomaly_threshold (float): Threshold for anomaly detection (higher value = more sensitive).
scan_directory (str): The directory to scan.
"""
self.known_malware_hashes = self.load_known_malware_hashes(known_malware_hashes_file)
self.anomaly_threshold = anomaly_threshold
self.scan_directory = scan_directory
self.file_history = {} # Stores file modification times and hash history
self.scan_lock = threading.Lock() # Lock to protect access to shared resources (file_history)
# Simulate a basic "AI" model by storing file sizes and modification times
# In a real-world scenario, this would be a much more complex machine learning model.
self.training_data = {}
self.train_model() # Initial training
print(f"Scanner initialized. Scanning directory: {self.scan_directory}")
print(f"Anomaly threshold: {self.anomaly_threshold}")
def load_known_malware_hashes(self, filepath):
"""
Loads known malware hashes from a file.
Args:
filepath (str): Path to the file containing malware hashes.
Returns:
set: A set of known malware hashes.
"""
try:
with open(filepath, "r") as f:
hashes = {line.strip() for line in f} # Use a set for faster lookup
print(f"Loaded {len(hashes)} known malware hashes from {filepath}")
return hashes
except FileNotFoundError:
print(f"Warning: Malware hash file not found: {filepath}. Continuing without known malware detection.")
return set() # Return an empty set
def calculate_hash(self, filepath):
"""
Calculates the SHA256 hash of a file.
Args:
filepath (str): Path to the file.
Returns:
str: The SHA256 hash of the file.
"""
hasher = hashlib.sha256()
try:
with open(filepath, "rb") as f:
while True:
chunk = f.read(4096) # Read in chunks for efficiency
if not chunk:
break
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
print(f"Error calculating hash for {filepath}: {e}")
return None
def scan_file(self, filepath):
"""
Scans a single file for malware. Checks against known malware hashes and
performs basic anomaly detection based on file size and modification time.
Args:
filepath (str): Path to the file to scan.
"""
print(f"Scanning file: {filepath}")
file_hash = self.calculate_hash(filepath)
if file_hash is None:
return # Skip if hash calculation failed
if file_hash in self.known_malware_hashes:
print(f" [ALERT] File {filepath} is a known malware (Hash: {file_hash})")
return
# Anomaly Detection (Simulated AI)
file_size = os.path.getsize(filepath)
last_modified = os.path.getmtime(filepath)
is_anomalous = self.detect_anomaly(filepath, file_size, last_modified)
if is_anomalous:
print(f" [WARNING] File {filepath} shows anomalous behavior (size/modification time). Further investigation needed.")
# Update file history
with self.scan_lock:
if filepath not in self.file_history:
self.file_history[filepath] = [] # Initialize list if it doesn't exist
self.file_history[filepath].append((last_modified, file_hash))
def detect_anomaly(self, filepath, file_size, last_modified):
"""
Detects anomalies based on file size and modification time changes.
This is a very simplified model. A real AI model would use much
more complex features and algorithms.
Args:
filepath (str): Path to the file.
file_size (int): The size of the file in bytes.
last_modified (float): Last modification time of the file (timestamp).
Returns:
bool: True if the file is considered anomalous, False otherwise.
"""
if filepath not in self.training_data:
return False # Not enough data
expected_size, expected_time = self.training_data[filepath]
size_deviation = abs(file_size - expected_size) / expected_size if expected_size > 0 else 0
time_deviation = abs(last_modified - expected_time) / expected_time if expected_time > 0 else 0
# Combine the deviations (you could use more sophisticated methods here)
anomaly_score = max(size_deviation, time_deviation) #Simple example: take the larger deviation
return anomaly_score > self.anomaly_threshold
def train_model(self):
"""
Simulates "training" the AI model. In this example, it just
collects some initial file sizes and modification times.
"""
print("Training the AI model (collecting initial file information)...")
for root, _, files in os.walk(self.scan_directory):
for file in files:
filepath = os.path.join(root, file)
try:
file_size = os.path.getsize(filepath)
last_modified = os.path.getmtime(filepath)
self.training_data[filepath] = (file_size, last_modified)
except Exception as e:
print(f"Error getting file information for {filepath}: {e}")
print(f"Model trained on {len(self.training_data)} files.")
def rescan_directory(self):
"""
Rescans the entire directory. This is useful for detecting new or modified files.
"""
print(f"Rescanning directory: {self.scan_directory}")
for root, _, files in os.walk(self.scan_directory):
for file in files:
filepath = os.path.join(root, file)
self.scan_file(filepath)
def start_scheduled_scan(self, interval_seconds=60):
"""
Starts a scheduled scan of the directory at a regular interval.
Uses threading to run the scan in the background.
Args:
interval_seconds (int): The interval between scans in seconds.
"""
def scheduled_task():
while True:
print(f"Starting scheduled scan at {time.strftime('%Y-%m-%d %H:%M:%S')}")
self.rescan_directory()
print(f"Scheduled scan complete. Waiting {interval_seconds} seconds...")
time.sleep(interval_seconds)
scan_thread = threading.Thread(target=scheduled_task)
scan_thread.daemon = True # Allow the main program to exit even if the thread is running
scan_thread.start()
print(f"Scheduled scan started. Scanning every {interval_seconds} seconds in the background.")
# --- Example Usage ---
if __name__ == "__main__":
# Create a dummy directory and files for testing
if not os.path.exists("test_directory"):
os.makedirs("test_directory")
# Create some benign files
with open("test_directory/benign1.txt", "w") as f:
f.write("This is a benign file.")
with open("test_directory/benign2.txt", "w") as f:
f.write("Another benign file.")
# Create a simulated "malware" file (just a text file with known content)
malware_content = "This is a simulated malware file."
with open("test_directory/malware.txt", "w") as f:
f.write(malware_content)
# Create a file with a name similar to malware (for testing)
with open("test_directory/malware_copy.txt", "w") as f:
f.write("This is not actually malware.")
# Create a known_malware_hashes.txt file (if it doesn't exist)
if not os.path.exists("known_malware_hashes.txt"):
# Calculate the hash of the simulated malware
malware_hash = hashlib.sha256(malware_content.encode()).hexdigest()
with open("known_malware_hashes.txt", "w") as f:
f.write(malware_hash + "\n") # Add the hash to the known malware list
print("Created known_malware_hashes.txt")
scanner = AIDrivenScanner(scan_directory="test_directory", anomaly_threshold=0.5)
# Start the scheduled scanner. It will run in the background every 10 seconds.
scanner.start_scheduled_scan(interval_seconds=10)
# Let the scanner run for a while (e.g., 30 seconds)
time.sleep(30)
print("Exiting the main program. The scheduled scanner will continue to run in the background.")
```
Key improvements and explanations:
* **Clearer Structure:** The code is now organized into a class, making it more modular and readable.
* **Hashing:** Calculates SHA256 hashes of files to compare against a list of known malware hashes. This is a fundamental technique in malware detection.
* **Anomaly Detection (Simulated AI):** The `detect_anomaly` function provides a basic example of anomaly detection. It checks for significant deviations in file size and modification time compared to previously observed values. Crucially, it's a *simulation* of AI. A real AI-powered scanner would use more sophisticated machine learning models and features. The `train_model` function now populates `self.training_data` with initial file information to allow for anomaly detection.
* **Known Malware Hashes:** The `load_known_malware_hashes` function loads a list of known malware hashes from a file. This allows the scanner to detect files that are already known to be malicious. The code handles the case where the file doesn't exist. Uses a `set` for faster hash lookup.
* **Error Handling:** Includes `try...except` blocks to handle potential errors during file access and hash calculation.
* **Threading for Scheduled Scanning:** The `start_scheduled_scan` function now uses threading to run the scan in the background. This allows the main program to continue running while the scanner periodically scans the directory. The `daemon = True` setting ensures that the thread doesn't prevent the program from exiting. The `scan_lock` is used to prevent race conditions when accessing shared resources (file_history) from multiple threads.
* **Dummy Files and Test Directory:** The `if __name__ == "__main__":` block now creates a dummy directory and files for testing the scanner. This makes it easier to run the example and see how it works. It also creates a `known_malware_hashes.txt` file and adds a simulated malware hash to it. It also simulates malware by writing specific content to a file.
* **Anomaly Threshold:** The `anomaly_threshold` parameter allows you to adjust the sensitivity of the anomaly detection. A higher threshold means that only more significant deviations will be flagged as anomalies.
* **Comments and Docstrings:** Comprehensive comments and docstrings explain the purpose of each function and class.
* **Clearer Output:** Improved print statements to show the progress of the scan and any alerts that are detected.
* **Rescan Function:** Implemented the `rescan_directory` function to rescan all files in the specified directory, essential for incorporating changes.
* **Efficiency:** Reads files in chunks when calculating hashes for better performance with large files.
* **File History:** The scanner now maintains a history of file modifications (modification times and hashes) in the `file_history` dictionary. This can be used to detect files that have been modified recently or that have changed their content. Access to the `file_history` is protected using a `threading.Lock` to prevent race conditions.
* **Warnings Suppression:** Includes `warnings.filterwarnings("ignore")` to suppress warnings, making the output cleaner. However, in a real application, it's important to address warnings rather than ignore them.
* **Clearer Anomaly Detection Logic:** The `detect_anomaly` function's logic has been simplified to make it easier to understand. It now calculates a simple anomaly score based on the deviations in file size and modification time.
* **More Realistic Anomaly Calculation:** The anomaly score calculation now considers the percentage deviation rather than the absolute deviation. This is more meaningful when dealing with files of different sizes.
How to run the code:
1. **Save:** Save the code as a Python file (e.g., `ai_scanner.py`).
2. **Run:** Execute the file from your terminal: `python ai_scanner.py`
The script will create a `test_directory` and `known_malware_hashes.txt` if they don't exist, run a scheduled scan for 30 seconds, and then exit. The scheduled scan will continue to run in the background. You can then experiment by modifying files in the `test_directory` to see how the scanner detects anomalies. Try changing the contents of `test_directory/malware.txt` and see if it's detected.
Important Considerations:
* **This is a Simplified Example:** This code is for educational purposes only and should not be used in a production environment. A real-world cybersecurity scanner would require much more sophisticated techniques, including:
* Advanced machine learning models (e.g., deep learning)
* Behavioral analysis
* Sandboxing
* Signature-based detection
* Heuristic analysis
* Cloud-based threat intelligence
* **Performance:** Scanning large directories can be slow. Consider using multiprocessing or asynchronous programming to improve performance.
* **False Positives:** Anomaly detection algorithms are prone to false positives. Carefully tune the anomaly threshold to minimize false positives while still detecting real threats.
* **Security:** Be very careful about the files you scan. Scanning untrusted files could expose your system to malware.
* **AI is a Misnomer:** The "AI" in this example is extremely basic. True AI-powered cybersecurity requires significantly more sophisticated machine learning and data analysis techniques.
This improved version provides a better starting point for understanding the basic principles of AI-driven cybersecurity scanning and demonstrates how Python and AI can be used together in this field. Remember to expand upon this foundation for real-world applications.
👁️ Viewed: 9
Comments