Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed Nov 25, 2019
1 parent c9c047e commit e5c1554
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 51 deletions.
57 changes: 31 additions & 26 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ def compatible_tasks(self, candidate_tasks: Set[str]) -> Set[str]:
return self._old_computer.compatible_tasks(candidate_tasks)

def start_computation(
self, res_task_id: str, res_subtask_id: Optional[str] = None) -> bool:
self,
res_task_id: str,
res_subtask_id: Optional[str] = None
) -> bool:
if self._new_computer.has_assigned_task():
task_id = self._new_computer.assigned_task_id
subtask_id = self._new_computer.assigned_subtask_id
Expand Down Expand Up @@ -480,8 +483,8 @@ def quit(self):

@dataclass
class TaskComputation:
"""Represents single compuatation in TaskComputer.
There could be only one non-signle core computation or multiple single-core computations.
"""Represents single compuatation in TaskComputer. There could be only one
non-signle core computation or multiple single-core computations.
"""
task_computer: 'TaskComputer'
assigned_subtask: ComputeTaskDef
Expand All @@ -492,10 +495,10 @@ class TaskComputation:
@property
def assigned_subtask_id(self) -> str:
return self.assigned_subtask.get('subtask_id')

@property
def assigned_task_id(self) -> str:
return self.assigned_subtask.get('task_id')
return self.assigned_subtask.get('task_id')

@property
def computing(self) -> bool:
Expand All @@ -504,7 +507,7 @@ def computing(self) -> bool:
def check_timeout(self):
if self.counting_thread is not None:
self.counting_thread.check_timeout()

def task_interrupted(self) -> None:
assert self.assigned_subtask is not None
self._task_finished()
Expand Down Expand Up @@ -570,8 +573,8 @@ def task_computed(self, task_thread: TaskThread) -> None:
stats.increase_stat('tasks_with_errors')

task_server.send_task_failed(
subtask_id, subtask['task_id'], task_thread.error_msg,
reason)
subtask_id, subtask['task_id'], task_thread.error_msg,
reason)

elif task_thread.result and 'data' in task_thread.result:

Expand Down Expand Up @@ -601,7 +604,8 @@ def task_computed(self, task_thread: TaskThread) -> None:
"Wrong result format",
)

dispatcher.send(signal='golem.monitor', event='computation_time_spent',
dispatcher.send(
signal='golem.monitor', event='computation_time_spent',
success=was_success, value=work_time_to_be_paid)
self._task_finished()

Expand Down Expand Up @@ -652,7 +656,7 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals
if docker_images:
docker_images = [DockerImage(**did) for did in docker_images]
dir_mapping = DockerTaskThread.generate_dir_mapping(
resource_dir, temp_dir)
resource_dir, temp_dir)
tt: TaskThread = DockerTaskThread(
docker_images, extra_data, dir_mapping, task_timeout, cpu_limit)
elif tc.support_direct_computation:
Expand Down Expand Up @@ -683,11 +687,12 @@ class TaskComputer: # pylint: disable=too-many-instance-attributes
dir_lock = Lock()

def __init__(
self,
task_server: 'TaskServer',
stats_keeper: Optional[IntStatsKeeper] = None,
use_docker_manager=True,
finished_cb=None) -> None:
self,
task_server: 'TaskServer',
stats_keeper: Optional[IntStatsKeeper] = None,
use_docker_manager=True,
finished_cb=None
) -> None:
self.task_server = task_server
self.dir_manager: DirManager = DirManager(
task_server.get_task_computer_root())
Expand Down Expand Up @@ -748,7 +753,7 @@ def assigned_subtask_id(self) -> Optional[str]:
return self.assigned_subtasks[0].assigned_subtask_id

def task_interrupted(
self,
self,
task_id: str,
subtask_id: Optional[str] = None
) -> None:
Expand Down Expand Up @@ -777,8 +782,8 @@ def _is_computing(self) -> bool:
def can_take_work(self) -> bool:
with self.lock:
if any([
computation for computation in self.assigned_subtasks
if not computation.single_core
computation for computation in self.assigned_subtasks
if not computation.single_core
]):
return False
return len(self.assigned_subtasks) < self.max_num_cores
Expand All @@ -790,8 +795,8 @@ def is_disabled(self):
def free_cores(self) -> int:
with self.lock:
if any([
computation for computation in self.assigned_subtasks
if not computation.single_core
computation for computation in self.assigned_subtasks
if not computation.single_core
]):
return 0
n = len(self.assigned_subtasks)
Expand All @@ -812,9 +817,9 @@ def get_environment(self):

@defer.inlineCallbacks
def change_config(
self,
config_desc: ClientConfigDescriptor,
in_background: bool = True
self,
config_desc: ClientConfigDescriptor,
in_background: bool = True
) -> defer.Deferred:

self.dir_manager = DirManager(self.task_server.get_task_computer_root())
Expand Down Expand Up @@ -846,7 +851,7 @@ def start_computation(
started = False
for computation in self.assigned_subtasks:
if computation.assigned_task_id == task_id and (
subtask_id is None or
subtask_id is None or
computation.assigned_subtask_id == subtask_id):
if not computation.computing:
started = True
Expand All @@ -873,8 +878,8 @@ def task_finished(self, computation: TaskComputation) -> None:
with self.lock:
task_id = ctd['task_id']
if not [
c for c in self.assigned_subtasks
if c.assigned_task_id == task_id
c for c in self.assigned_subtasks
if c.assigned_task_id == task_id
]:
self.task_server.task_keeper.task_ended(task_id)

Expand Down
7 changes: 3 additions & 4 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,12 @@ def task_given(
return True

def resource_collected(
self,
task_id: str,
self,
task_id: str,
subtask_id: Optional[str] = None
) -> bool:
return self.task_computer.start_computation(task_id, subtask_id)

def resource_failure(self, task_id: str, reason: str) -> None:
if task_id not in self.task_computer.assigned_task_ids:
logger.error("Resource failure for a wrong task, %s", task_id)
Expand All @@ -579,7 +579,6 @@ def resource_failure(self, task_id: str, reason: str) -> None:
else:
logger.error("Missing subtask info for task failure %s", task_id)


def send_results(
self,
subtask_id: str,
Expand Down
51 changes: 33 additions & 18 deletions tests/golem/task/test_taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def test_init(self):
self.assertIsInstance(tc, TaskComputer)

def test_check_timeout(self):
cc = TaskComputation(task_computer=self.task_computer, assigned_subtask=mock.Mock())
cc = TaskComputation(
task_computer=self.task_computer,
assigned_subtask=mock.Mock())
cc.counting_thread = mock.Mock()
self.task_computer.assigned_subtasks.append(cc)
self.task_computer.check_timeout()
Expand Down Expand Up @@ -91,8 +93,9 @@ def test_computation(self): # pylint: disable=too-many-statements
self.assertFalse(tc.assigned_subtasks)
tc.task_given(ctd)
self.assertTrue(tc.assigned_subtasks)
self.assertLessEqual(tc.assigned_subtasks[-1].assigned_subtask['deadline'],
timeout_to_deadline(10))
self.assertLessEqual(
tc.assigned_subtasks[-1].assigned_subtask['deadline'],
timeout_to_deadline(10))

tc.start_computation(ctd['task_id'], ctd['subtask_id'])
assert not tc._is_computing()
Expand All @@ -107,8 +110,10 @@ def test_computation(self): # pylint: disable=too-many-statements
tc.task_given(ctd)
tc.start_computation(ctd['task_id'], None)
assert tc._is_computing()
self.assertGreater(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 8)
self.assertLessEqual(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
self.assertGreater(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 8)
self.assertLessEqual(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
mock_finished.assert_called_once_with()
mock_finished.reset_mock()
self.__wait_for_tasks(tc)
Expand All @@ -129,7 +134,7 @@ def test_computation(self): # pylint: disable=too-many-statements
ctd['extra_data']['src_code'] = "raise Exception('some exception')"
ctd['deadline'] = timeout_to_deadline(5)
tc.task_given(ctd)
[ comp ] = tc.assigned_subtasks
[comp] = tc.assigned_subtasks
self.assertEqual(comp.assigned_subtask, ctd)
self.assertLessEqual(comp.assigned_subtask['deadline'],
timeout_to_deadline(5))
Expand Down Expand Up @@ -163,8 +168,10 @@ def test_computation(self): # pylint: disable=too-many-statements
tc.task_given(ctd)
tc.start_computation(ctd['task_id'], ctd['subtask_id'])
self.assertTrue(tc._is_computing())
self.assertGreater(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
self.assertLessEqual(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 20)
self.assertGreater(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
self.assertLessEqual(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 20)
self.__wait_for_tasks(tc)

ctd['subtask_id'] = "xxyyzz2"
Expand Down Expand Up @@ -202,13 +209,13 @@ def test_compute_task(self, start):
task_computer.lock = Lock()
task_computer.dir_lock = Lock()

task_part = TaskComputation(task_computer=task_computer, assigned_subtask=ComputeTaskDef(
task_id=task_id,
subtask_id=subtask_id,
docker_images=[],
extra_data=mock.Mock(),
deadline=time.time() + 3600
))
task_part = TaskComputation(
task_computer=task_computer, assigned_subtask=ComputeTaskDef(
task_id=task_id,
subtask_id=subtask_id,
docker_images=[],
extra_data=mock.Mock(),
deadline=time.time() + 3600))
task_computer.task_server.task_keeper.task_headers = {
task_id: None
}
Expand Down Expand Up @@ -445,9 +452,12 @@ class TestTaskGiven(TestTaskComputerBase):
def test_ok(self, provider_timer):
ctd = mock.Mock()
self.task_computer.task_given(ctd)
self.assertEqual(self.task_computer.assigned_subtasks[-1].assigned_subtask, ctd)
self.assertEqual(
self.task_computer.assigned_subtasks[-1].assigned_subtask,
ctd)
provider_timer.start.assert_called_once_with()


class TestTaskInterrupted(TestTaskComputerBase):

def test_no_task_assigned(self):
Expand All @@ -471,7 +481,10 @@ class TestTaskFinished(TestTaskComputerBase):

def test_no_assigned_subtask(self):
with self.assertRaises(AssertionError):
self.task_computer.task_finished(TaskComputation(task_computer=self.task_computer, assigned_subtask=mock.Mock()))
self.task_computer.task_finished(
TaskComputation(
task_computer=self.task_computer,
assigned_subtask=mock.Mock()))

@mock.patch('golem.task.taskcomputer.dispatcher')
@mock.patch('golem.task.taskcomputer.ProviderTimer')
Expand All @@ -482,7 +495,9 @@ def test_ok(self, provider_timer, dispatcher):
performance=123
)

ast = TaskComputation(task_computer=self.task_computer, assigned_subtask=ctd, counting_thread=mock.Mock)
ast = TaskComputation(
task_computer=self.task_computer, assigned_subtask=ctd,
counting_thread=mock.Mock)
self.task_computer.assigned_subtasks.append(ast)
self.task_computer.finished_cb = mock.Mock()

Expand Down
1 change: 1 addition & 0 deletions tests/golem/task/test_taskcomputeradapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def test_assigned_new_task(self, handle_results):
'test_subtask',
self.new_computer.compute())


class TestHandleComputationResults(TaskComputerAdapterTestBase):

@defer.inlineCallbacks
Expand Down
11 changes: 8 additions & 3 deletions tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,12 @@ def test_already_assigned(
request_resource):

self.ts.task_computer.can_take_work.return_value = False
ttc = Mock(compute_task_def=dict(task_id='t1', subtask_id='st1', resources=[]), resources_options=dict())
ttc = Mock(
compute_task_def=dict(
task_id='t1',
subtask_id='st1',
resources=[]),
resources_options=dict())
result = self.ts.task_given(ttc)
self.assertEqual(result, False)

Expand Down Expand Up @@ -1471,7 +1476,8 @@ def test_ok(self, send_task_failed, logger_mock):
self.ts.task_computer.assigned_subtask_id = 'test_subtask'
self.ts.resource_failure('test_task', 'test_reason')
logger_mock.error.assert_not_called()
self.ts.task_computer.task_interrupted.assert_called_once_with('test_task')
self.ts.task_computer.task_interrupted.assert_called_once_with(
'test_task')
send_task_failed.assert_called_once_with(
'test_subtask',
'test_task',
Expand Down Expand Up @@ -1678,7 +1684,6 @@ def test_request_task_concent_enabled_but_not_required(self, *_):
self.ts.task_computer.free_cores = 1
print(f'free_cores={self.ts.task_computer.free_cores}')


env = Mock(spec=OldEnv)
env.get_benchmark_result.return_value = BenchmarkResult()
self._patch_ts_async('get_environment_by_id', return_value=env)
Expand Down

0 comments on commit e5c1554

Please sign in to comment.