-
Notifications
You must be signed in to change notification settings - Fork 0
/
malware_collector.py
100 lines (82 loc) · 3.5 KB
/
malware_collector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
import asyncio
import datetime
import logging
import os
import git
from typing import List, Optional, Tuple
log = logging.getLogger("malware_collector")
class MalwareCollector:
def __init__(self, max_file_size=50_000_000):
self.archive_prefix = os.environ.get("ARCHIVE_PATH", "/RAID")
self._cleanup = os.environ.get("CLEANUP", "false").lower() in ("true", "True", 1)
# don't open any file bigger than 50MB
self.max_file_size = max_file_size
self.path = None
self.loop = asyncio.get_event_loop()
self.repo: Optional[git.Repo] = None
def pull_from_repo(self, git_path, git_url) -> Tuple[bool, Optional[git.Commit]]:
"""
perform a git pull on the configured directory. If we had to clone the repo, return true, since changes
definitely occurred.
:return: boolean, whether there were any updates during the git pull
"""
if not os.path.exists(git_path):
os.mkdir(git_path)
git.Repo.clone_from(git_url, git_path)
self.repo = git.Repo(git_path)
return True, None
else:
try:
self.repo = git.Repo(git_path)
except git.exc.InvalidGitRepositoryError:
git.Repo.clone_from(git_url, git_path)
self.repo = git.Repo(git_path)
return True, None
current = self.repo.head.commit
self.repo.remotes.origin.pull()
return current != self.repo.head.commit, current
def get_changed_files_between_pulls(self, first: git.Commit) -> List[str]:
commits = list(self.repo.iter_commits(rev=f"{first}.."))
changed_files = list()
for item in commits[0].diff(commits[-1]):
if item.a_blob.path not in changed_files:
changed_files.append(item.a_blob.path)
if item.b_blob is not None and item.b_blob.path not in changed_files:
changed_files.append(item.b_blob.path)
return changed_files
def get(self):
raise NotImplemented
def cleanup(self):
raise NotImplemented
def make_day_directory(self):
timestamp = datetime.datetime.now(datetime.timezone.utc)
daydirname = os.path.join(self.path,
str(timestamp.year),
str(timestamp.month),
str(timestamp.day))
if not os.path.isdir(daydirname):
os.makedirs(daydirname)
return daydirname
def scan_single_file(self, stoq, metadata, target):
log.info(f"scanning {target}")
file_size = os.path.getsize(target)
if file_size > self.max_file_size:
log.error(f"File {target} too big to load for scanning: {file_size}")
return
filehandle = open(target, "rb")
if "file_path" not in metadata.extra_data or metadata.extra_data['file_path'] != target:
metadata.extra_data['file_path'] = target
try:
data = filehandle.read()
self.loop.run_until_complete(stoq.scan(content=data, request_meta=metadata))
except OSError:
log.exception(f"error reading data from {target}")
def scan_downloaded_files(self, stoq, metadata, files_downloaded):
for file_path in files_downloaded:
self.scan_single_file(stoq, metadata, file_path)
def run(self):
assert self.path is not None
if self._cleanup:
return self.cleanup()
else:
return self.get()