From e5c15542f8bb2b8b01e3ef4066ee0d17741e5099 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20K=2E=20Rekucki?= Date: Mon, 25 Nov 2019 21:39:05 +0100 Subject: [PATCH] lint --- golem/task/taskcomputer.py | 57 +++++++++++--------- golem/task/taskserver.py | 7 ++- tests/golem/task/test_taskcomputer.py | 51 +++++++++++------- tests/golem/task/test_taskcomputeradapter.py | 1 + tests/golem/task/test_taskserver.py | 11 ++-- 5 files changed, 76 insertions(+), 51 deletions(-) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index 2619cc96c4..a617858c53 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -155,7 +155,10 @@ def compatible_tasks(self, candidate_tasks: Set[str]) -> Set[str]: return self._old_computer.compatible_tasks(candidate_tasks) def start_computation( - self, res_task_id: str, res_subtask_id: Optional[str] = None) -> bool: + self, + res_task_id: str, + res_subtask_id: Optional[str] = None + ) -> bool: if self._new_computer.has_assigned_task(): task_id = self._new_computer.assigned_task_id subtask_id = self._new_computer.assigned_subtask_id @@ -480,8 +483,8 @@ def quit(self): @dataclass class TaskComputation: - """Represents single compuatation in TaskComputer. - There could be only one non-signle core computation or multiple single-core computations. + """Represents single compuatation in TaskComputer. There could be only one + non-signle core computation or multiple single-core computations. """ task_computer: 'TaskComputer' assigned_subtask: ComputeTaskDef @@ -492,10 +495,10 @@ class TaskComputation: @property def assigned_subtask_id(self) -> str: return self.assigned_subtask.get('subtask_id') - + @property def assigned_task_id(self) -> str: - return self.assigned_subtask.get('task_id') + return self.assigned_subtask.get('task_id') @property def computing(self) -> bool: @@ -504,7 +507,7 @@ def computing(self) -> bool: def check_timeout(self): if self.counting_thread is not None: self.counting_thread.check_timeout() - + def task_interrupted(self) -> None: assert self.assigned_subtask is not None self._task_finished() @@ -570,8 +573,8 @@ def task_computed(self, task_thread: TaskThread) -> None: stats.increase_stat('tasks_with_errors') task_server.send_task_failed( - subtask_id, subtask['task_id'], task_thread.error_msg, - reason) + subtask_id, subtask['task_id'], task_thread.error_msg, + reason) elif task_thread.result and 'data' in task_thread.result: @@ -601,7 +604,8 @@ def task_computed(self, task_thread: TaskThread) -> None: "Wrong result format", ) - dispatcher.send(signal='golem.monitor', event='computation_time_spent', + dispatcher.send( + signal='golem.monitor', event='computation_time_spent', success=was_success, value=work_time_to_be_paid) self._task_finished() @@ -652,7 +656,7 @@ def start_computation(self) -> None: # pylint: disable=too-many-locals if docker_images: docker_images = [DockerImage(**did) for did in docker_images] dir_mapping = DockerTaskThread.generate_dir_mapping( - resource_dir, temp_dir) + resource_dir, temp_dir) tt: TaskThread = DockerTaskThread( docker_images, extra_data, dir_mapping, task_timeout, cpu_limit) elif tc.support_direct_computation: @@ -683,11 +687,12 @@ class TaskComputer: # pylint: disable=too-many-instance-attributes dir_lock = Lock() def __init__( - self, - task_server: 'TaskServer', - stats_keeper: Optional[IntStatsKeeper] = None, - use_docker_manager=True, - finished_cb=None) -> None: + self, + task_server: 'TaskServer', + stats_keeper: Optional[IntStatsKeeper] = None, + use_docker_manager=True, + finished_cb=None + ) -> None: self.task_server = task_server self.dir_manager: DirManager = DirManager( task_server.get_task_computer_root()) @@ -748,7 +753,7 @@ def assigned_subtask_id(self) -> Optional[str]: return self.assigned_subtasks[0].assigned_subtask_id def task_interrupted( - self, + self, task_id: str, subtask_id: Optional[str] = None ) -> None: @@ -777,8 +782,8 @@ def _is_computing(self) -> bool: def can_take_work(self) -> bool: with self.lock: if any([ - computation for computation in self.assigned_subtasks - if not computation.single_core + computation for computation in self.assigned_subtasks + if not computation.single_core ]): return False return len(self.assigned_subtasks) < self.max_num_cores @@ -790,8 +795,8 @@ def is_disabled(self): def free_cores(self) -> int: with self.lock: if any([ - computation for computation in self.assigned_subtasks - if not computation.single_core + computation for computation in self.assigned_subtasks + if not computation.single_core ]): return 0 n = len(self.assigned_subtasks) @@ -812,9 +817,9 @@ def get_environment(self): @defer.inlineCallbacks def change_config( - self, - config_desc: ClientConfigDescriptor, - in_background: bool = True + self, + config_desc: ClientConfigDescriptor, + in_background: bool = True ) -> defer.Deferred: self.dir_manager = DirManager(self.task_server.get_task_computer_root()) @@ -846,7 +851,7 @@ def start_computation( started = False for computation in self.assigned_subtasks: if computation.assigned_task_id == task_id and ( - subtask_id is None or + subtask_id is None or computation.assigned_subtask_id == subtask_id): if not computation.computing: started = True @@ -873,8 +878,8 @@ def task_finished(self, computation: TaskComputation) -> None: with self.lock: task_id = ctd['task_id'] if not [ - c for c in self.assigned_subtasks - if c.assigned_task_id == task_id + c for c in self.assigned_subtasks + if c.assigned_task_id == task_id ]: self.task_server.task_keeper.task_ended(task_id) diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index d8837dc621..5f41e1d522 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -560,12 +560,12 @@ def task_given( return True def resource_collected( - self, - task_id: str, + self, + task_id: str, subtask_id: Optional[str] = None ) -> bool: return self.task_computer.start_computation(task_id, subtask_id) - + def resource_failure(self, task_id: str, reason: str) -> None: if task_id not in self.task_computer.assigned_task_ids: logger.error("Resource failure for a wrong task, %s", task_id) @@ -579,7 +579,6 @@ def resource_failure(self, task_id: str, reason: str) -> None: else: logger.error("Missing subtask info for task failure %s", task_id) - def send_results( self, subtask_id: str, diff --git a/tests/golem/task/test_taskcomputer.py b/tests/golem/task/test_taskcomputer.py index 8e6e45f218..5b3a826e2a 100644 --- a/tests/golem/task/test_taskcomputer.py +++ b/tests/golem/task/test_taskcomputer.py @@ -50,7 +50,9 @@ def test_init(self): self.assertIsInstance(tc, TaskComputer) def test_check_timeout(self): - cc = TaskComputation(task_computer=self.task_computer, assigned_subtask=mock.Mock()) + cc = TaskComputation( + task_computer=self.task_computer, + assigned_subtask=mock.Mock()) cc.counting_thread = mock.Mock() self.task_computer.assigned_subtasks.append(cc) self.task_computer.check_timeout() @@ -91,8 +93,9 @@ def test_computation(self): # pylint: disable=too-many-statements self.assertFalse(tc.assigned_subtasks) tc.task_given(ctd) self.assertTrue(tc.assigned_subtasks) - self.assertLessEqual(tc.assigned_subtasks[-1].assigned_subtask['deadline'], - timeout_to_deadline(10)) + self.assertLessEqual( + tc.assigned_subtasks[-1].assigned_subtask['deadline'], + timeout_to_deadline(10)) tc.start_computation(ctd['task_id'], ctd['subtask_id']) assert not tc._is_computing() @@ -107,8 +110,10 @@ def test_computation(self): # pylint: disable=too-many-statements tc.task_given(ctd) tc.start_computation(ctd['task_id'], None) assert tc._is_computing() - self.assertGreater(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 8) - self.assertLessEqual(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10) + self.assertGreater( + tc.assigned_subtasks[-1].counting_thread.time_to_compute, 8) + self.assertLessEqual( + tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10) mock_finished.assert_called_once_with() mock_finished.reset_mock() self.__wait_for_tasks(tc) @@ -129,7 +134,7 @@ def test_computation(self): # pylint: disable=too-many-statements ctd['extra_data']['src_code'] = "raise Exception('some exception')" ctd['deadline'] = timeout_to_deadline(5) tc.task_given(ctd) - [ comp ] = tc.assigned_subtasks + [comp] = tc.assigned_subtasks self.assertEqual(comp.assigned_subtask, ctd) self.assertLessEqual(comp.assigned_subtask['deadline'], timeout_to_deadline(5)) @@ -163,8 +168,10 @@ def test_computation(self): # pylint: disable=too-many-statements tc.task_given(ctd) tc.start_computation(ctd['task_id'], ctd['subtask_id']) self.assertTrue(tc._is_computing()) - self.assertGreater(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10) - self.assertLessEqual(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 20) + self.assertGreater( + tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10) + self.assertLessEqual( + tc.assigned_subtasks[-1].counting_thread.time_to_compute, 20) self.__wait_for_tasks(tc) ctd['subtask_id'] = "xxyyzz2" @@ -202,13 +209,13 @@ def test_compute_task(self, start): task_computer.lock = Lock() task_computer.dir_lock = Lock() - task_part = TaskComputation(task_computer=task_computer, assigned_subtask=ComputeTaskDef( - task_id=task_id, - subtask_id=subtask_id, - docker_images=[], - extra_data=mock.Mock(), - deadline=time.time() + 3600 - )) + task_part = TaskComputation( + task_computer=task_computer, assigned_subtask=ComputeTaskDef( + task_id=task_id, + subtask_id=subtask_id, + docker_images=[], + extra_data=mock.Mock(), + deadline=time.time() + 3600)) task_computer.task_server.task_keeper.task_headers = { task_id: None } @@ -445,9 +452,12 @@ class TestTaskGiven(TestTaskComputerBase): def test_ok(self, provider_timer): ctd = mock.Mock() self.task_computer.task_given(ctd) - self.assertEqual(self.task_computer.assigned_subtasks[-1].assigned_subtask, ctd) + self.assertEqual( + self.task_computer.assigned_subtasks[-1].assigned_subtask, + ctd) provider_timer.start.assert_called_once_with() + class TestTaskInterrupted(TestTaskComputerBase): def test_no_task_assigned(self): @@ -471,7 +481,10 @@ class TestTaskFinished(TestTaskComputerBase): def test_no_assigned_subtask(self): with self.assertRaises(AssertionError): - self.task_computer.task_finished(TaskComputation(task_computer=self.task_computer, assigned_subtask=mock.Mock())) + self.task_computer.task_finished( + TaskComputation( + task_computer=self.task_computer, + assigned_subtask=mock.Mock())) @mock.patch('golem.task.taskcomputer.dispatcher') @mock.patch('golem.task.taskcomputer.ProviderTimer') @@ -482,7 +495,9 @@ def test_ok(self, provider_timer, dispatcher): performance=123 ) - ast = TaskComputation(task_computer=self.task_computer, assigned_subtask=ctd, counting_thread=mock.Mock) + ast = TaskComputation( + task_computer=self.task_computer, assigned_subtask=ctd, + counting_thread=mock.Mock) self.task_computer.assigned_subtasks.append(ast) self.task_computer.finished_cb = mock.Mock() diff --git a/tests/golem/task/test_taskcomputeradapter.py b/tests/golem/task/test_taskcomputeradapter.py index 907809780c..0778a768d5 100644 --- a/tests/golem/task/test_taskcomputeradapter.py +++ b/tests/golem/task/test_taskcomputeradapter.py @@ -166,6 +166,7 @@ def test_assigned_new_task(self, handle_results): 'test_subtask', self.new_computer.compute()) + class TestHandleComputationResults(TaskComputerAdapterTestBase): @defer.inlineCallbacks diff --git a/tests/golem/task/test_taskserver.py b/tests/golem/task/test_taskserver.py index b28e551e42..53cff9de6e 100644 --- a/tests/golem/task/test_taskserver.py +++ b/tests/golem/task/test_taskserver.py @@ -1417,7 +1417,12 @@ def test_already_assigned( request_resource): self.ts.task_computer.can_take_work.return_value = False - ttc = Mock(compute_task_def=dict(task_id='t1', subtask_id='st1', resources=[]), resources_options=dict()) + ttc = Mock( + compute_task_def=dict( + task_id='t1', + subtask_id='st1', + resources=[]), + resources_options=dict()) result = self.ts.task_given(ttc) self.assertEqual(result, False) @@ -1471,7 +1476,8 @@ def test_ok(self, send_task_failed, logger_mock): self.ts.task_computer.assigned_subtask_id = 'test_subtask' self.ts.resource_failure('test_task', 'test_reason') logger_mock.error.assert_not_called() - self.ts.task_computer.task_interrupted.assert_called_once_with('test_task') + self.ts.task_computer.task_interrupted.assert_called_once_with( + 'test_task') send_task_failed.assert_called_once_with( 'test_subtask', 'test_task', @@ -1678,7 +1684,6 @@ def test_request_task_concent_enabled_but_not_required(self, *_): self.ts.task_computer.free_cores = 1 print(f'free_cores={self.ts.task_computer.free_cores}') - env = Mock(spec=OldEnv) env.get_benchmark_result.return_value = BenchmarkResult() self._patch_ts_async('get_environment_by_id', return_value=env)