Skip to content

Commit

Permalink
Further updates to scheduler, db and az machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
cccs-kevin committed Jul 27, 2022
1 parent 90af111 commit bc4c40b
Show file tree
Hide file tree
Showing 8 changed files with 222 additions and 126 deletions.
32 changes: 29 additions & 3 deletions conf/az.conf
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,12 @@
# Specify the Azure Region (for example, CanadaCentral). This is case-sensitive
region_name = <region_name>

# Resource Group for Azure
resource_group = <resource_group>
# Resource Groups for Azure
# The service principal that controls the Azure resources should have READ
# access on the virtual network where it lives, which should be in a different resource
# group than where the sandbox resources live.
vnet_resource_group = <resource_group>
sandbox_resource_group = <resource_group>

# Subscription ID for Azure
subscription_id = <subscription_id>
Expand Down Expand Up @@ -37,6 +41,20 @@ total_machines_limit = 50
# Specify the machine's instance type(for example, Standard_F2s_v2, Standard_DS3_v2)
instance_type = <instance_type>

# This boolean flag is used to indicate if we want to programmatically determine how many cores are used
# per VM of the instance_type mentioned above.
# NOTE: If enabled, this is a long call that takes ~ 1 minute to complete. It takes place at
# the initialization of the machinery. If disabled, you need to specify the instance_type_cores below.
find_number_of_cores_for_sku = true

# The number of cores (vCPUs) that a VM of the instance_type mentioned above uses.
# If find_number_of_cores_for_sku is enabled, this value will be ignored.
# Set to 0 if you want to programmatically determine this value.
# See note above. Otherwise, set to an integer.
# For example for the instance_type Standard_F2s_v2, there are 2 cores per VM so the value for
# instance_type_cores should be 2.
instance_type_cores = 0

# Specify the IP of the Result Server, as your virtual machine sees it.
# It should be the nest ip address.
resultserver_ip = <resultserver_ip>
Expand All @@ -53,9 +71,13 @@ storage_account_type = <storage_account_type>
# Initial virtual machine pool size for each scale set
initial_pool_size = 1

# Reset pool size to initial_pool_size on CAPE restart
reset_pool_size = true

# Specify a comma-separated list of scale sets to be used, either available or to be created.
# For each specified ID you have to define a dedicated section containing the details
# about the respective scale set. (E.g. cuckoo1,cuckoo2,cuckoo3)
# NOTE: NO SPACES
scale_sets = cuckoo1

# A percentage to be used for overprovisioning a scale set. To disable overprovisiong, set to 0
Expand All @@ -69,6 +91,10 @@ wait_time_to_reimage = 15
# normal instances
spot_instances = false

# This boolean value is used to indicate if we want to wait for each VM to have its agent running before we
# start pulling tasks off of the stack
wait_for_agent_before_starting = true

[cuckoo1]
# The gallery image name to use when creating the virtual machine scale set.
gallery_image_name = <gallery_image_name>
Expand All @@ -84,4 +110,4 @@ arch = x64

# A tag used to specify on which guest scale set a sample should be run. All
# virtual machines in this scale set will have this tag
tag = <tag>
pool_tag = <tag>
4 changes: 4 additions & 0 deletions conf/web.conf
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ hostname = https://127.0.0.1/
;hostname = https://www.capesandbox.com/
# Check if config exists or try to extract before accept task as static
check_config_exists = no
# Assign architecture to task to fetch correct VM type
dynamic_arch_determination = yes
# Assign platform to task to fetch correct VM type
dynamic_platform_determination = yes

# ratelimit for anon users
[ratelimit]
Expand Down
7 changes: 4 additions & 3 deletions lib/cuckoo/common/abstracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,19 +265,20 @@ def availables(self):
"""
return self.db.count_machines_available()

def acquire(self, machine_id=None, platform=None, tags=None):
def acquire(self, machine_id=None, platform=None, tags=None, arch=None):
"""Acquire a machine to start analysis.
@param machine_id: machine ID.
@param platform: machine platform.
@param tags: machine tags
@param arch: machine arch
@return: machine or None.
"""
if machine_id:
return self.db.lock_machine(label=machine_id)
elif platform:
return self.db.lock_machine(platform=platform, tags=tags)
return self.db.lock_machine(platform=platform, tags=tags, arch=arch)
else:
return self.db.lock_machine(tags=tags)
return self.db.lock_machine(tags=tags, arch=arch)

def release(self, label=None):
"""Release a machine.
Expand Down
9 changes: 6 additions & 3 deletions lib/cuckoo/common/web_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@

db = Database()

DYNAMIC_PLATFORM_DETERMINATION = web_cfg.general.dynamic_platform_determination

HAVE_DIST = False
# Distributed CAPE
if repconf.distributed.enabled:
Expand Down Expand Up @@ -655,7 +657,8 @@ def download_file(**kwargs):
if not kwargs.get("task_machines", []):
kwargs["task_machines"] = [None]

platform = get_platform(magic_type)
if DYNAMIC_PLATFORM_DETERMINATION:
platform = get_platform(magic_type)
if platform == "linux" and not linux_enabled and "Python" not in magic_type:
return "error", {"error": "Linux binaries analysis isn't enabled"}

Expand Down Expand Up @@ -1084,9 +1087,9 @@ def force_bool(value):
if not value:
return False

if value in ("False", "false", "FALSE"):
if value.lower() in ("false", "no", "off", "0"):
return False
elif value in ("True", "true", "TRUE"):
elif value.lower() in ("true", "yes", "on", "1"):
return True
else:
log.warning("Value of %s cannot be converted from string to bool", value)
Expand Down
119 changes: 70 additions & 49 deletions lib/cuckoo/core/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,10 @@
repconf = Config("reporting")
web_conf = Config("web")
LINUX_ENABLED = web_conf.linux.enabled
DYNAMIC_ARCH_DETERMINATION = web_conf.general.dynamic_arch_determination

if repconf.mongodb.enabled:
from dev_utils.mongodb import mongo_find, mongo_find_one
from dev_utils.mongodb import mongo_find
if repconf.elasticsearchdb.enabled:
from dev_utils.elasticsearchdb import elastic_handler, get_analysis_index

Expand Down Expand Up @@ -799,13 +800,27 @@ def set_task_vm(self, task_id, vmname, vm_id):
session.close()

@classlock
def fetch(self, machine, categories: list = [], need_VM: bool = True):
def is_relevant_machine_available(self, task: Task) -> bool:
"""Checks if a machine that is relevant to the given task is available
@return: boolean indicating if a relevant machine is available
"""
# Are there available machines that match up with a task?
task_arch = next((tag.name for tag in task.tags if tag.name in ["x86", "x64"]), "")
task_tags = [tag.name for tag in task.tags if tag.name != task_arch]
if len(self.list_machines(locked=False, platform=task.platform, tags=task_tags, arch=task_arch)) > 0:
# There are? Awesome!
self.set_status(task_id=task.id, status=TASK_RUNNING)
return True
else:
return False

@classlock
def fetch_task(self, categories: list = []):
"""Fetches a task waiting to be processed and locks it for running.
@return: None or task
"""
session = self.Session()
row = None
arch_cond = False
try:
row = (
session.query(Task)
Expand All @@ -815,23 +830,11 @@ def fetch(self, machine, categories: list = [], need_VM: bool = True):
.filter(not_(Task.options.contains("node=")))
)

if machine:
# set filter to get tasks with acceptable arch
if "x64" in machine.arch:
arch_cond = or_(*[Task.tags.any(name="x64"), Task.tags.any(name="x86")])
else:
arch_cond = or_(*[Task.tags.any(name=machine.arch)])
row = row.filter(arch_cond)

if categories:
row = row.filter(Task.category.in_(categories))
row = row.first()

if row:
if need_VM and row.machine and row.machine != machine.label:
log.debug("Task id %d - needs VM: %s. %s - %s", row.id, need_VM, row.machine, machine.label)
return None
else:
if not row:
return None

self.set_status(task_id=row.id, status=TASK_RUNNING)
Expand Down Expand Up @@ -942,33 +945,36 @@ def guest_stop(self, guest_id):
session.close()

@classlock
def list_machines(self, locked=False, platform="", tags=[]):
def list_machines(self, locked=False, platform="", tags=[], arch=""):
"""Lists virtual machines.
@return: list of virtual machines
"""
session = self.Session()
try:
machines = session.query(Machine).options(joinedload("tags"))
if locked:
machines = session.query(Machine).options(joinedload("tags")).filter_by(locked=True).all()
elif platform:
machines = session.query(Machine).options(joinedload("tags")).filter_by(platform=platform).all()
else:
machines = session.query(Machine).options(joinedload("tags")).all()
machines = machines.filter_by(locked=True)
if platform:
machines = machines.filter_by(platform=platform)
if arch:
machines = machines.filter_by(arch=arch)
if tags:
machines = [machine for tag in tags for machine in machines if tag in machine.to_dict()["tags"]]
return machines
for tag in tags:
machines = machines.filter(Machine.tags.any(name=tag))
return machines.all()
except SQLAlchemyError as e:
log.debug("Database error listing machines: %s", e)
return []
finally:
session.close()

@classlock
def lock_machine(self, label=None, platform=None, tags=None):
def lock_machine(self, label=None, platform=None, tags=None, arch=None):
"""Places a lock on a free virtual machine.
@param label: optional virtual machine label
@param platform: optional virtual machine platform
@param tags: optional tags required (list)
@param arch: optional virtual machine arch
@return: locked machine
"""
session = self.Session()
Expand All @@ -989,17 +995,23 @@ def lock_machine(self, label=None, platform=None, tags=None):
machines = session.query(Machine)
if label:
machines = machines.filter_by(label=label)
elif platform:
if platform:
machines = machines.filter_by(platform=platform)
elif tags:
if arch:
machines = machines.filter_by(arch=arch)
if tags:
for tag in tags:
machines = machines.filter(Machine.tags.any(name=tag.name))
machines = machines.filter(Machine.tags.any(name=tag))

# Check if there are any machines that satisfy the
# selection requirements.
if not machines.count():
session.close()
raise CuckooOperationalError("No machines match selection criteria")
raise CuckooOperationalError(
"No machines match selection criteria of label: '%s', platform: '%s', arch: '%s', tags: '%s'" % (
label, platform, arch, tags
)
)

# Get the first free machine.
machine = machines.filter_by(locked=False).first()
Expand Down Expand Up @@ -1078,7 +1090,7 @@ def get_available_machines(self):
"""
session = self.Session()
try:
machines = session.query(Machine).filter_by(locked=False).all()
machines = session.query(Machine).options(joinedload("tags")).filter_by(locked=False).all()
return machines
except SQLAlchemyError as e:
log.debug("Database error getting available machines: %s", e)
Expand Down Expand Up @@ -1290,26 +1302,27 @@ def add(
session.close()
return None

# Assign architecture to task to fetch correct VM type
# This isn't 100% full proof
if "PE32+" in file_type or "64-bit" in file_type or package.endswith("_x64"):
if tags:
tags += ",x64"
if DYNAMIC_ARCH_DETERMINATION:
# Assign architecture to task to fetch correct VM type
# This isn't 100% full proof
if "PE32+" in file_type or "64-bit" in file_type or package.endswith("_x64"):
if tags:
tags += ",x64"
else:
tags = "x64"
else:
tags = "x64"
else:
if LINUX_ENABLED:
linux_arch = _get_linux_vm_tag(file_type)
if linux_arch:
if LINUX_ENABLED:
linux_arch = _get_linux_vm_tag(file_type)
if linux_arch:
if tags:
tags += f",{linux_arch}"
else:
tags = linux_arch
else:
if tags:
tags += f",{linux_arch}"
tags += ",x86"
else:
tags = linux_arch
else:
if tags:
tags += ",x86"
else:
tags = "x86"
tags = "x86"
try:
task = Task(obj.file_path)
task.sample_id = sample.id
Expand Down Expand Up @@ -2001,6 +2014,7 @@ def list_tasks(
id_before=None,
id_after=None,
options_like=False,
options_not_like=False,
tags_tasks_like=False,
task_ids=False,
inclide_hashes=False,
Expand All @@ -2019,7 +2033,8 @@ def list_tasks(
@param added_before: tasks added before a specific timestamp
@param id_before: filter by tasks which is less than this value
@param id_after filter by tasks which is greater than this value
@param options_like: filter tasks by specific option insde of the options
@param options_like: filter tasks by specific option inside of the options
@param options_not_like: filter tasks by specific option not inside of the options
@param tags_tasks_like: filter tasks by specific tag
@param task_ids: list of task_id
@param inclide_hashes: return task+samples details
Expand Down Expand Up @@ -2053,13 +2068,19 @@ def list_tasks(
# Replace '*' wildcards with wildcard for sql
options_like = options_like.replace("*", "%")
search = search.filter(Task.options.like(f"%{options_like}%"))
if options_not_like:
# Replace '*' wildcards with wildcard for sql
options_not_like = options_not_like.replace("*", "%")
search = search.filter(Task.options.notlike(f"%{options_not_like}%"))
if tags_tasks_like:
search = search.filter(Task.tags_tasks.like(f"%{tags_tasks_like}%"))
if task_ids:
search = search.filter(Task.id.in_(task_ids))
if user_id:
search = search.filter(Task.user_id == user_id)
if order_by is not None:
if order_by is not None and isinstance(order_by, tuple):
search = search.order_by(*order_by)
elif order_by is not None:
search = search.order_by(order_by)
else:
search = search.order_by(Task.added_on.desc())
Expand Down
2 changes: 1 addition & 1 deletion lib/cuckoo/core/plugins.py
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ def stop(self):
except NotImplementedError:
pass
except Exception as e:
log.warning("Unable to stop auxiliary module: %s", e)
log.warning("Unable to stop auxiliary module: %s", e, exc_info=True)
else:
log.debug("Stopped auxiliary module: %s", module.__class__.__name__)

Expand Down
Loading

0 comments on commit bc4c40b

Please sign in to comment.