Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Security enhancements and code optimizations. #6

Merged
merged 1 commit into from
Aug 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions src/integrity.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
import hashlib
import logging

logger = logging.getLogger(__name__)

def calculate_file_hash(file_path):
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
try:
with open(file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256_hash.update(byte_block)
return sha256_hash.hexdigest()
except IOError:
logger.error(f"Unable to read file: {file_path}")
return None

def check_file_integrity(file_path):
known_good_hashes = {
Expand All @@ -15,12 +21,15 @@ def check_file_integrity(file_path):
}

calculated_hash = calculate_file_hash(file_path)
logging.info(f"Calculated hash for {file_path}: {calculated_hash}")
if calculated_hash is None:
return "integrity", False

logger.info(f"Calculated hash for {file_path}: {calculated_hash}")

for version, known_hash in known_good_hashes.items():
if calculated_hash == known_hash:
logging.info(f"File integrity check passed for version {version}")
return True
logger.info(f"File integrity check passed for version {version}")
return "integrity", True

logging.warning("File integrity check failed")
return False
logger.warning("File integrity check failed")
return "integrity", False
16 changes: 9 additions & 7 deletions src/memory_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import re
import logging

logger = logging.getLogger(__name__)

def scan_process_memory(process_name):
suspicious_patterns = [
rb'CVE-2023-38831',
Expand All @@ -21,15 +23,15 @@ def scan_process_memory(process_name):
content = process.memory_maps()[0].read(mem.addr, mem.size)
for pattern in suspicious_patterns:
if re.search(pattern, content):
logging.warning(f"Suspicious pattern found in process memory: {pattern}")
return True
logger.warning(f"Suspicious pattern found in process memory: {pattern}")
return "memory", True
except psutil.AccessDenied:
logging.warning(f"Access denied to memory region of process {pid}")
logger.warning(f"Access denied to memory region of process {pid}")
except Exception as e:
logging.error(f"Error scanning memory: {str(e)}")
logger.error(f"Error scanning memory: {str(e)}")
except psutil.NoSuchProcess:
logging.error(f"Process {pid} no longer exists")
logger.error(f"Process {pid} no longer exists")
except Exception as e:
logging.error(f"Error accessing process {pid}: {str(e)}")
logger.error(f"Error accessing process {pid}: {str(e)}")

return False
return "memory", False
11 changes: 8 additions & 3 deletions src/network_analyzer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
import scapy.all as scapy
import logging
from scapy.error import Scapy_Exception

logger = logging.getLogger(__name__)

def analyze_network_traffic(interface, duration=60):
suspicious_patterns = [
Expand All @@ -13,13 +16,15 @@ def packet_callback(packet):
payload = packet[scapy.Raw].load
for pattern in suspicious_patterns:
if pattern in payload:
logging.warning(f"Suspicious network traffic detected: {pattern}")
logger.warning(f"Suspicious network traffic detected: {pattern}")
return True
return False

try:
scapy.sniff(iface=interface, prn=packet_callback, timeout=duration)
except Scapy_Exception as e:
logger.error(f"Scapy error during network traffic analysis: {str(e)}")
except Exception as e:
logging.error(f"Error during network traffic analysis: {str(e)}")
logger.error(f"Error during network traffic analysis: {str(e)}")

return False
return "network", False
21 changes: 12 additions & 9 deletions src/sandbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,37 +4,40 @@
import shutil
import logging

logger = logging.getLogger(__name__)

class Sandbox:
def __init__(self):
self.temp_dir = None

def __enter__(self):
self.temp_dir = tempfile.mkdtemp()
logging.info(f"Created sandbox environment in {self.temp_dir}")
logger.info(f"Created sandbox environment in {self.temp_dir}")
return self

def __exit__(self, exc_type, exc_val, exc_tb):
if self.temp_dir:
shutil.rmtree(self.temp_dir)
logging.info(f"Removed sandbox environment {self.temp_dir}")
logger.info(f"Removed sandbox environment {self.temp_dir}")

def run_command(self, command):
try:
result = subprocess.run(command, cwd=self.temp_dir, capture_output=True, text=True, timeout=30)
logging.info(f"Command executed in sandbox: {command}")
logger.info(f"Command executed in sandbox: {command}")
return result.returncode, result.stdout, result.stderr
except subprocess.TimeoutExpired:
logging.warning(f"Command timed out in sandbox: {command}")
logger.warning(f"Command timed out in sandbox: {command}")
return -1, "", "Timeout"
except Exception as e:
logging.error(f"Error running command in sandbox: {str(e)}")
logger.error(f"Error running command in sandbox: {str(e)}")
return -1, "", str(e)

def copy_file_to_sandbox(self, file_path):
try:
shutil.copy(file_path, self.temp_dir)
logging.info(f"Copied {file_path} to sandbox")
return os.path.join(self.temp_dir, os.path.basename(file_path))
dest_path = os.path.join(self.temp_dir, os.path.basename(file_path))
shutil.copy2(file_path, dest_path)
logger.info(f"Copied {file_path} to sandbox")
return dest_path
except Exception as e:
logging.error(f"Error copying file to sandbox: {str(e)}")
logger.error(f"Error copying file to sandbox: {str(e)}")
return None
59 changes: 37 additions & 22 deletions src/scanner.py
Original file line number Diff line number Diff line change
@@ -1,62 +1,77 @@
import os
import winreg
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from .utils import get_winrar_path
from .report import generate_report
from .integrity import check_file_integrity
from .memory_scanner import scan_process_memory
from .network_analyzer import analyze_network_traffic
from .sandbox import Sandbox
from .database import Database

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

def scan_for_cve_2023_38831():
winrar_path = get_winrar_path()
if not winrar_path:
logging.warning("WinRAR not found on the system.")
logger.warning("WinRAR not found on the system.")
return False, "WinRAR not found on the system."

vulnerable_version = False
version = ""

try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\WinRAR\Version")
version = winreg.QueryValueEx(key, "Version")[0]
winreg.CloseKey(key)
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\WinRAR\Version") as key:
version = winreg.QueryValueEx(key, "Version")[0]

major, minor, build = map(int, version.split('.'))
if major < 6 or (major == 6 and minor == 0 and build <= 2):
vulnerable_version = True
except WindowsError:
logging.error("Unable to determine WinRAR version.")
logger.error("Unable to determine WinRAR version.")
return False, "Unable to determine WinRAR version."

# Check file integrity
integrity_status = check_file_integrity(winrar_path)
if not integrity_status:
logging.warning(f"File integrity check failed for {winrar_path}")
return vulnerable_version, f"{version} (Integrity check failed)"
with ThreadPoolExecutor(max_workers=3) as executor:
futures = []
futures.append(executor.submit(check_file_integrity, winrar_path))
futures.append(executor.submit(scan_process_memory, "WinRAR.exe"))
futures.append(executor.submit(analyze_network_traffic, "eth0", duration=30))

# Scan process memory
if scan_process_memory("WinRAR.exe"):
logging.warning("Suspicious patterns found in WinRAR process memory")
vulnerable_version = True
for future in as_completed(futures):
result = future.result()
if isinstance(result, tuple):
operation, status = result
if operation == "integrity" and not status:
logger.warning(f"File integrity check failed for {winrar_path}")
vulnerable_version = True
elif operation == "memory" and status:
logger.warning("Suspicious patterns found in WinRAR process memory")
vulnerable_version = True
elif operation == "network" and status:
logger.warning("Suspicious network traffic detected")
vulnerable_version = True

# Analyze network traffic
if analyze_network_traffic("eth0", duration=30):
logging.warning("Suspicious network traffic detected")
vulnerable_version = True

# Run in sandbox
with Sandbox() as sandbox:
sandbox_winrar_path = sandbox.copy_file_to_sandbox(winrar_path)
if sandbox_winrar_path:
returncode, stdout, stderr = sandbox.run_command([sandbox_winrar_path, "--version"])
if returncode != 0:
logging.warning(f"Unexpected behavior in sandbox: {stderr}")
logger.warning(f"Unexpected behavior in sandbox: {stderr}")
vulnerable_version = True

logging.info(f"Scan completed. WinRAR version: {version}, Vulnerable: {vulnerable_version}")
db = Database()
db.insert_scan_result(
vulnerable_version,
version,
"Integrity check passed" if not vulnerable_version else "Integrity check failed",
"No suspicious patterns in memory" if not vulnerable_version else "Suspicious patterns in memory",
"No suspicious network traffic" if not vulnerable_version else "Suspicious network traffic detected",
"No issues in sandbox environment" if not vulnerable_version else "Issues detected in sandbox environment"
)

logger.info(f"Scan completed. WinRAR version: {version}, Vulnerable: {vulnerable_version}")
return vulnerable_version, version

def main():
Expand Down
9 changes: 6 additions & 3 deletions src/utils.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import winreg
import logging

logger = logging.getLogger(__name__)

def get_winrar_path():
try:
key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\WinRAR")
path = winreg.QueryValueEx(key, "exe64")[0]
winreg.CloseKey(key)
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, r"SOFTWARE\WinRAR") as key:
path = winreg.QueryValueEx(key, "exe64")[0]
return path if os.path.exists(path) else None
except WindowsError:
logger.error("Unable to find WinRAR installation path")
return None