Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
prekucki committed Nov 25, 2019
1 parent c9c047e commit d568c03
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 41 deletions.
40 changes: 22 additions & 18 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,10 @@ def compatible_tasks(self, candidate_tasks: Set[str]) -> Set[str]:
return self._old_computer.compatible_tasks(candidate_tasks)

def start_computation(
self, res_task_id: str, res_subtask_id: Optional[str] = None) -> bool:
self,
res_task_id: str,
res_subtask_id: Optional[str] = None
) -> bool:
if self._new_computer.has_assigned_task():
task_id = self._new_computer.assigned_task_id
subtask_id = self._new_computer.assigned_subtask_id
Expand Down Expand Up @@ -480,8 +483,8 @@ def quit(self):

@dataclass
class TaskComputation:
"""Represents single compuatation in TaskComputer.
There could be only one non-signle core computation or multiple single-core computations.
"""Represents single compuatation in TaskComputer. There could be only one
non-signle core computation or multiple single-core computations.
"""
task_computer: 'TaskComputer'
assigned_subtask: ComputeTaskDef
Expand Down Expand Up @@ -683,11 +686,12 @@ class TaskComputer: # pylint: disable=too-many-instance-attributes
dir_lock = Lock()

def __init__(
self,
task_server: 'TaskServer',
stats_keeper: Optional[IntStatsKeeper] = None,
use_docker_manager=True,
finished_cb=None) -> None:
self,
task_server: 'TaskServer',
stats_keeper: Optional[IntStatsKeeper] = None,
use_docker_manager=True,
finished_cb=None
) -> None:
self.task_server = task_server
self.dir_manager: DirManager = DirManager(
task_server.get_task_computer_root())
Expand Down Expand Up @@ -748,7 +752,7 @@ def assigned_subtask_id(self) -> Optional[str]:
return self.assigned_subtasks[0].assigned_subtask_id

def task_interrupted(
self,
self,
task_id: str,
subtask_id: Optional[str] = None
) -> None:
Expand Down Expand Up @@ -777,8 +781,8 @@ def _is_computing(self) -> bool:
def can_take_work(self) -> bool:
with self.lock:
if any([
computation for computation in self.assigned_subtasks
if not computation.single_core
computation for computation in self.assigned_subtasks
if not computation.single_core
]):
return False
return len(self.assigned_subtasks) < self.max_num_cores
Expand All @@ -790,8 +794,8 @@ def is_disabled(self):
def free_cores(self) -> int:
with self.lock:
if any([
computation for computation in self.assigned_subtasks
if not computation.single_core
computation for computation in self.assigned_subtasks
if not computation.single_core
]):
return 0
n = len(self.assigned_subtasks)
Expand All @@ -812,9 +816,9 @@ def get_environment(self):

@defer.inlineCallbacks
def change_config(
self,
config_desc: ClientConfigDescriptor,
in_background: bool = True
self,
config_desc: ClientConfigDescriptor,
in_background: bool = True
) -> defer.Deferred:

self.dir_manager = DirManager(self.task_server.get_task_computer_root())
Expand Down Expand Up @@ -873,8 +877,8 @@ def task_finished(self, computation: TaskComputation) -> None:
with self.lock:
task_id = ctd['task_id']
if not [
c for c in self.assigned_subtasks
if c.assigned_task_id == task_id
c for c in self.assigned_subtasks
if c.assigned_task_id == task_id
]:
self.task_server.task_keeper.task_ended(task_id)

Expand Down
6 changes: 3 additions & 3 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -560,12 +560,12 @@ def task_given(
return True

def resource_collected(
self,
task_id: str,
self,
task_id: str,
subtask_id: Optional[str] = None
) -> bool:
return self.task_computer.start_computation(task_id, subtask_id)

def resource_failure(self, task_id: str, reason: str) -> None:
if task_id not in self.task_computer.assigned_task_ids:
logger.error("Resource failure for a wrong task, %s", task_id)
Expand Down
50 changes: 32 additions & 18 deletions tests/golem/task/test_taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ def test_init(self):
self.assertIsInstance(tc, TaskComputer)

def test_check_timeout(self):
cc = TaskComputation(task_computer=self.task_computer, assigned_subtask=mock.Mock())
cc = TaskComputation(
task_computer=self.task_computer,
assigned_subtask=mock.Mock())
cc.counting_thread = mock.Mock()
self.task_computer.assigned_subtasks.append(cc)
self.task_computer.check_timeout()
Expand Down Expand Up @@ -91,8 +93,9 @@ def test_computation(self): # pylint: disable=too-many-statements
self.assertFalse(tc.assigned_subtasks)
tc.task_given(ctd)
self.assertTrue(tc.assigned_subtasks)
self.assertLessEqual(tc.assigned_subtasks[-1].assigned_subtask['deadline'],
timeout_to_deadline(10))
self.assertLessEqual(
tc.assigned_subtasks[-1].assigned_subtask['deadline'],
timeout_to_deadline(10))

tc.start_computation(ctd['task_id'], ctd['subtask_id'])
assert not tc._is_computing()
Expand All @@ -107,8 +110,10 @@ def test_computation(self): # pylint: disable=too-many-statements
tc.task_given(ctd)
tc.start_computation(ctd['task_id'], None)
assert tc._is_computing()
self.assertGreater(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 8)
self.assertLessEqual(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
self.assertGreater(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 8)
self.assertLessEqual(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
mock_finished.assert_called_once_with()
mock_finished.reset_mock()
self.__wait_for_tasks(tc)
Expand All @@ -129,7 +134,7 @@ def test_computation(self): # pylint: disable=too-many-statements
ctd['extra_data']['src_code'] = "raise Exception('some exception')"
ctd['deadline'] = timeout_to_deadline(5)
tc.task_given(ctd)
[ comp ] = tc.assigned_subtasks
[comp] = tc.assigned_subtasks
self.assertEqual(comp.assigned_subtask, ctd)
self.assertLessEqual(comp.assigned_subtask['deadline'],
timeout_to_deadline(5))
Expand Down Expand Up @@ -163,8 +168,10 @@ def test_computation(self): # pylint: disable=too-many-statements
tc.task_given(ctd)
tc.start_computation(ctd['task_id'], ctd['subtask_id'])
self.assertTrue(tc._is_computing())
self.assertGreater(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
self.assertLessEqual(tc.assigned_subtasks[-1].counting_thread.time_to_compute, 20)
self.assertGreater(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 10)
self.assertLessEqual(
tc.assigned_subtasks[-1].counting_thread.time_to_compute, 20)
self.__wait_for_tasks(tc)

ctd['subtask_id'] = "xxyyzz2"
Expand Down Expand Up @@ -202,13 +209,13 @@ def test_compute_task(self, start):
task_computer.lock = Lock()
task_computer.dir_lock = Lock()

task_part = TaskComputation(task_computer=task_computer, assigned_subtask=ComputeTaskDef(
task_id=task_id,
subtask_id=subtask_id,
docker_images=[],
extra_data=mock.Mock(),
deadline=time.time() + 3600
))
task_part = TaskComputation(
task_computer=task_computer, assigned_subtask=ComputeTaskDef(
task_id=task_id,
subtask_id=subtask_id,
docker_images=[],
extra_data=mock.Mock(),
deadline=time.time() + 3600))
task_computer.task_server.task_keeper.task_headers = {
task_id: None
}
Expand Down Expand Up @@ -445,7 +452,9 @@ class TestTaskGiven(TestTaskComputerBase):
def test_ok(self, provider_timer):
ctd = mock.Mock()
self.task_computer.task_given(ctd)
self.assertEqual(self.task_computer.assigned_subtasks[-1].assigned_subtask, ctd)
self.assertEqual(
self.task_computer.assigned_subtasks[-1].assigned_subtask,
ctd)
provider_timer.start.assert_called_once_with()

class TestTaskInterrupted(TestTaskComputerBase):
Expand All @@ -471,7 +480,10 @@ class TestTaskFinished(TestTaskComputerBase):

def test_no_assigned_subtask(self):
with self.assertRaises(AssertionError):
self.task_computer.task_finished(TaskComputation(task_computer=self.task_computer, assigned_subtask=mock.Mock()))
self.task_computer.task_finished(
TaskComputation(
task_computer=self.task_computer,
assigned_subtask=mock.Mock()))

@mock.patch('golem.task.taskcomputer.dispatcher')
@mock.patch('golem.task.taskcomputer.ProviderTimer')
Expand All @@ -482,7 +494,9 @@ def test_ok(self, provider_timer, dispatcher):
performance=123
)

ast = TaskComputation(task_computer=self.task_computer, assigned_subtask=ctd, counting_thread=mock.Mock)
ast = TaskComputation(
task_computer=self.task_computer, assigned_subtask=ctd,
counting_thread=mock.Mock)
self.task_computer.assigned_subtasks.append(ast)
self.task_computer.finished_cb = mock.Mock()

Expand Down
10 changes: 8 additions & 2 deletions tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,12 @@ def test_already_assigned(
request_resource):

self.ts.task_computer.can_take_work.return_value = False
ttc = Mock(compute_task_def=dict(task_id='t1', subtask_id='st1', resources=[]), resources_options=dict())
ttc = Mock(
compute_task_def=dict(
task_id='t1',
subtask_id='st1',
resources=[]),
resources_options=dict())
result = self.ts.task_given(ttc)
self.assertEqual(result, False)

Expand Down Expand Up @@ -1471,7 +1476,8 @@ def test_ok(self, send_task_failed, logger_mock):
self.ts.task_computer.assigned_subtask_id = 'test_subtask'
self.ts.resource_failure('test_task', 'test_reason')
logger_mock.error.assert_not_called()
self.ts.task_computer.task_interrupted.assert_called_once_with('test_task')
self.ts.task_computer.task_interrupted.assert_called_once_with(
'test_task')
send_task_failed.assert_called_once_with(
'test_subtask',
'test_task',
Expand Down

0 comments on commit d568c03

Please sign in to comment.