Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
multi core on wasm
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed Nov 25, 2019
1 parent e18581b commit 3a915d5
Show file tree
Hide file tree
Showing 10 changed files with 198 additions and 173 deletions.
4 changes: 4 additions & 0 deletions apps/wasm/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ class WasmTaskEnvironment(DockerEnvironment):
DOCKER_TAG = "0.5.4"
ENV_ID = "WASM"
SHORT_DESCRIPTION = "WASM Sandbox"

@classmethod
def is_single_core(cls):
return True
5 changes: 5 additions & 0 deletions golem/environments/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ def check_support(self) -> SupportStatus:
"""
return SupportStatus.ok()

@classmethod
def is_single_core(cls) -> bool:
""" Returns true if task runs on single cpu core """
return False

def is_accepted(self) -> bool:
""" Check if user wants to compute tasks from this environment """
return self.accept_tasks
Expand Down
77 changes: 63 additions & 14 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# flake8: noqa

import asyncio
import logging
from pathlib import Path
from typing import Optional, TYPE_CHECKING, Callable, Any
from typing import Optional, TYPE_CHECKING, Callable, Any, List, Set

import os
import time
Expand Down Expand Up @@ -105,6 +107,12 @@ def has_assigned_task(self) -> bool:
return self._new_computer.has_assigned_task() \
or self._old_computer.has_assigned_task()

@property
def assigned_task_ids(self) -> Set[str]:
if self._new_computer.has_assigned_task():
return {self._new_computer.assigned_task_id}
return self._old_computer.assigned_task_ids

@property
def assigned_task_id(self) -> Optional[str]:
return self._new_computer.assigned_task_id \
Expand All @@ -130,16 +138,28 @@ def get_subtask_inputs_dir(self) -> Path:
'is assigned')
return self._new_computer.get_subtask_inputs_dir()

def start_computation(self) -> None:
def compatible_tasks(self, candidate_tasks: Set[str]) -> Set[str]:
"""finds compatible tasks subset"""
assert not self._new_computer.has_assigned_task()
return self._old_computer.compatible_tasks(candidate_tasks)

def start_computation(
self, res_task_id: str, res_subtask_id: Optional[str] = None) -> bool:
if self._new_computer.has_assigned_task():
task_id = self.assigned_task_id
subtask_id = self.assigned_subtask_id
task_id = self._new_computer.assigned_task_id
subtask_id = self._new_computer.assigned_subtask_id
if task_id != res_task_id:
logger.error(
"Resource collected for a wrong task, %s", res_task_id)
return False
computation = self._new_computer.compute()
self._task_server.task_keeper.task_started(task_id)
# Fire and forget because it resolves when computation ends
self._handle_computation_results(task_id, subtask_id, computation)
return True
elif self._old_computer.has_assigned_task():
self._old_computer.start_computation()
return self._old_computer.start_computation(
res_task_id, res_subtask_id)
else:
raise RuntimeError('start_computation: No task assigned.')

Expand Down Expand Up @@ -170,14 +190,19 @@ def _handle_computation_results(
self._task_server.task_keeper.task_ended(task_id)
self._finished_cb()

def task_interrupted(self) -> None:
def task_interrupted(self, task_id: str) -> None:
if self._new_computer.has_assigned_task():
self._new_computer.task_interrupted()
elif self._old_computer.has_assigned_task():
self._old_computer.task_interrupted()
self._old_computer.task_interrupted(task_id)
else:
raise RuntimeError('task_interrupted: No task assigned.')

def can_take_work(self) -> bool:
if self._old_computer.has_assigned_task():
return self._old_computer.can_take_work()
return not self._new_computer.has_assigned_task()

def check_timeout(self) -> None:
# No active timeout checking is needed for the new computer
if self._old_computer.has_assigned_task():
Expand Down Expand Up @@ -572,8 +597,8 @@ def task_computed(self, task_thread: TaskThread) -> None:
was_success = True

else:
self.stats.increase_stat('tasks_with_errors')
self.task_server.send_task_failed(
stats.increase_stat('tasks_with_errors')
task_server.send_task_failed(
subtask_id,
subtask['task_id'],
"Wrong result format",
Expand Down Expand Up @@ -738,16 +763,40 @@ def _task_finished(self) -> None:
subtask_id=ctd['subtask_id'],
min_performance=ctd['performance'],
)

with self.lock:
self.counting_thread = None
self.task_server.task_keeper.task_ended(ctd['task_id'])
task_id = ctd['task_id']
if not [
c for c in self.assigned_subtasks
if c.assigned_task_id == task_id
]:
self.task_server.task_keeper.task_ended(task_id)

if self.finished_cb:
self.finished_cb()

def compatible_tasks(self, candidate_tasks: Set[str]) -> Set[str]:
"""Finds subset of candidate tasks that can be executed with current
running tasks.
:param candidate_tasks:
:return:
"""
if not self.assigned_subtasks:
return candidate_tasks
tasks = candidate_tasks
for c in self.assigned_subtasks:
if not c.single_core:
return set()
tasks = tasks - {c.assigned_task_id}
return {
task_id for task_id in candidate_tasks
if self._is_single_core_task(task_id)
}

def quit(self):
if self.counting_thread is not None:
self.counting_thread.end_comp()
for computation in self.assigned_subtasks:
if computation.counting_thread is not None:
computation.counting_thread.end_comp()


class PyTaskThread(TaskThread):
Expand Down
72 changes: 40 additions & 32 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# -*- coding: utf-8 -*-
# pylint: disable=too-many-instance-attributes,too-many-public-methods,
# pylint: disable=too-many-lines

import asyncio
import functools
import itertools
Expand All @@ -7,7 +9,6 @@
import shutil
import time
import weakref
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import (
Expand All @@ -20,6 +21,7 @@
Tuple,
TYPE_CHECKING,
)
from dataclasses import dataclass, field

from golem_messages import exceptions as msg_exceptions
from golem_messages import message
Expand Down Expand Up @@ -70,6 +72,7 @@
update_requestor_assigned_sum,
update_requestor_efficiency,
)
from golem.resource.resourcehandshake import ResourceHandshake
from golem.resource.resourcemanager import ResourceManager
from golem.rpc import utils as rpc_utils
from golem.task import helpers as task_helpers
Expand Down Expand Up @@ -207,21 +210,21 @@ def __init__(self,
self.max_trust = 1.0
self.min_trust = 0.0

self.last_messages = []
self.last_messages: List[Any] = []

self.results_to_send = {}
self.failures_to_send = {}
self.results_to_send: Dict[str, Any] = {}
self.failures_to_send: Dict[str, Any] = {}

self.use_ipv6 = use_ipv6

self.forwarded_session_request_timeout = \
config_desc.waiting_for_task_session_timeout
self.forwarded_session_requests = {}
self.forwarded_session_requests: Dict[str, Any] = {}
self.acl = get_acl(
self.client, max_times=config_desc.disallow_id_max_times)
self.acl_ip = DenyAcl(
self.client, max_times=config_desc.disallow_ip_max_times)
self.resource_handshakes = {}
self.resource_handshakes: Dict[str, ResourceHandshake] = {}
self.requested_tasks: Set[str] = set()
self._last_task_request_time: float = time.time()

Expand Down Expand Up @@ -334,12 +337,18 @@ def _request_random_task(self) -> None:
< self.config_desc.task_request_interval:
return

if self.task_computer.has_assigned_task() \
or (not self.task_computer.compute_tasks) \
if (not self.task_computer.compute_tasks) \
or (not self.task_computer.runnable):
return

task_header = self.task_keeper.get_task(self.requested_tasks)
if not self.task_computer.can_take_work():
return

compatibile_tasks = self.task_computer.compatible_tasks(
set(self.task_keeper.supported_tasks))

task_header = self.task_keeper.get_task(
exclude=self.requested_tasks, supported_tasks=compatibile_tasks)
if task_header is None:
return

Expand All @@ -357,6 +366,7 @@ def _request_task_error(e):
deferred.addErrback(_request_task_error) # pylint: disable=no-member

@inlineCallbacks
# pylint: disable=too-many-return-statements,too-many-branches
def _request_task(self, theader: dt_tasks.TaskHeader) -> Deferred:
try:
supported = self.should_accept_requestor(theader.task_owner.key)
Expand Down Expand Up @@ -398,11 +408,16 @@ def _request_task(self, theader: dt_tasks.TaskHeader) -> Deferred:
)
return None

num_subtasks = 1
# Check performance
if isinstance(env, OldEnv):
benchmark_result = env.get_benchmark_result()
benchmark_score = benchmark_result.performance
benchmark_cpu_usage = benchmark_result.cpu_usage
if env.is_single_core():
num_subtasks = self.task_computer.free_cores
if num_subtasks == 0:
return None
else: # NewEnv
try:
future = asyncio.run_coroutine_threadsafe(
Expand Down Expand Up @@ -452,10 +467,7 @@ def _request_task(self, theader: dt_tasks.TaskHeader) -> Deferred:
ProviderPricing(
price_per_wallclock_h=self.config_desc.min_price,
price_per_cpu_h=self.config_desc.price_per_cpu_h,
),
theader.max_price,
theader.task_owner.key
)
), theader.max_price, theader.task_owner.key)
self.task_manager.add_comp_task_request(
theader=theader, price=price,
performance=benchmark_score
Expand Down Expand Up @@ -483,7 +495,7 @@ def _request_task(self, theader: dt_tasks.TaskHeader) -> Deferred:
return theader.task_id
except Exception as err: # pylint: disable=broad-except
logger.warning("Cannot send request for task: %s", err)
logger.debug("Detailed traceback", exc_info=True)
logger.warning("Detailed traceback", exc_info=True)
self.remove_task_header(theader.task_id)

return None
Expand All @@ -492,7 +504,7 @@ def task_given(
self,
msg: message.tasks.TaskToCompute,
) -> bool:
if self.task_computer.has_assigned_task():
if not self.task_computer.can_take_work():
logger.error("Trying to assign a task, when it's already assigned")
return False

Expand All @@ -518,7 +530,8 @@ def task_given(

defer.gatherResults(deferred_list, consumeErrors=True)\
.addCallbacks(
lambda _: self.resource_collected(msg.task_id),
lambda _: self.resource_collected(msg.task_id,
msg.subtask_id),
lambda e: self.resource_failure(msg.task_id, e))
else:
self.request_resource(
Expand Down Expand Up @@ -573,7 +586,7 @@ def send_results(
raise RuntimeError("Incorrect subtask_id: {}".format(subtask_id))

# this is purely for tests
if self.config_desc.overwrite_results:
if self.config_desc.overwrite_results and result is not None:
for file_path in result:
shutil.copyfile(
src=self.config_desc.overwrite_results,
Expand All @@ -584,11 +597,15 @@ def send_results(
delay_time = 0.0
last_sending_trial = 0
stats = stats or {}
if result is None:
task_result: Tuple = (str(task_api_result),)
else:
task_result = tuple(result)

wtr = WaitingTaskResult(
task_id=task_id,
subtask_id=subtask_id,
result=result or (str(task_api_result),),
result=task_result,
last_sending_trial=last_sending_trial,
delay_time=delay_time,
owner=header.task_owner,
Expand Down Expand Up @@ -620,8 +637,7 @@ def on_result_share_error(err):
task_api_result,
client_options)
deferred.addCallbacks( # pylint: disable=no-member
on_result_share_success,
on_result_share_error)
on_result_share_success, on_result_share_error)

def _create_and_set_result_package(self, wtr):
task_result_manager = self.task_manager.task_result_manager
Expand Down Expand Up @@ -911,7 +927,6 @@ def finished_subtask_listener(self, # pylint: disable=too-many-arguments
keeper = self.task_manager.comp_task_keeper

try:

task_id = keeper.get_task_id_for_subtask(subtask_id)
header = keeper.get_task_header(task_id)
performance = keeper.active_tasks[task_id].performance
Expand Down Expand Up @@ -1007,12 +1022,8 @@ class RejectedReason(Enum):
not_accepted = 'not accepted'

def should_accept_provider( # pylint: disable=too-many-return-statements
self,
node_id: str,
ip_addr: str,
task_id: str,
provider_perf: float,
max_memory_size: int,
self, node_id: str, ip_addr: str, task_id: str,
provider_perf: float, max_memory_size: int,
offer_hash: str) -> bool:

node_name_id = short_node_id(node_id)
Expand Down Expand Up @@ -1200,10 +1211,7 @@ def _sync_forwarded_session_requests(self):
del self.forwarded_session_requests[key_id]
self.final_conn_failure(data['conn_id'])

def _get_factory(self):
return self.factory(self)

def _listening_established(self, port, **kwargs):
def _listening_established(self, port: int) -> None:
logger.debug('_listening_established(%r)', port)
self.cur_port = port
logger.info(" Port {} opened - listening".format(self.cur_port))
Expand Down
6 changes: 3 additions & 3 deletions golem/task/tasksession.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
from twisted.internet.protocol import Protocol

from .requestedtaskmanager import RequestedTaskManager
from .taskcomputer import TaskComputer
from .taskcomputer import TaskComputerAdapter
from .taskmanager import TaskManager
from .taskserver import TaskServer
from golem.network.concent.client import ConcentClientService
Expand Down Expand Up @@ -144,7 +144,7 @@ def requested_task_manager(self) -> 'RequestedTaskManager':
return self.task_server.requested_task_manager

@property
def task_computer(self) -> 'TaskComputer':
def task_computer(self) -> 'TaskComputerAdapter':
return self.task_server.task_computer

@property
Expand Down Expand Up @@ -606,7 +606,7 @@ def _cannot_compute(reason):

reasons = message.tasks.CannotComputeTask.REASON

if self.task_computer.has_assigned_task():
if not self.task_computer.can_take_work():
_cannot_compute(reasons.OfferCancelled)
return

Expand Down
Loading

0 comments on commit 3a915d5

Please sign in to comment.