From 6c907e0b2740360dc8229ce584c6fd0c6cc1e21d Mon Sep 17 00:00:00 2001 From: Igor Adamski Date: Thu, 25 Jul 2019 17:25:50 +0200 Subject: [PATCH] Task API computation flow --- golem/task/taskcomputer.py | 14 +++++- golem/task/taskserver.py | 48 +++++++++++++++----- tests/golem/task/test_taskcomputeradapter.py | 21 ++++++++- tests/golem/task/test_taskserver.py | 43 ++++++++++++++++++ 4 files changed, 112 insertions(+), 14 deletions(-) diff --git a/golem/task/taskcomputer.py b/golem/task/taskcomputer.py index 44f6a56745..3fee9d7e67 100644 --- a/golem/task/taskcomputer.py +++ b/golem/task/taskcomputer.py @@ -10,7 +10,7 @@ from dataclasses import dataclass from golem_messages.message.tasks import ComputeTaskDef, TaskHeader -from golem_task_api import ProviderAppClient +from golem_task_api import ProviderAppClient, constants as task_api_constants from pydispatch import dispatcher from twisted.internet import defer @@ -118,6 +118,13 @@ def support_direct_computation(self) -> bool: def support_direct_computation(self, value: bool) -> None: self._old_computer.support_direct_computation = value + def get_task_resources_dir(self) -> Path: + if not self._new_computer.has_assigned_task(): + raise ValueError( + 'Task resources directory only available when a task-api task ' + 'is assigned') + return self._new_computer.get_task_resources_dir() + def start_computation(self) -> None: if self._new_computer.has_assigned_task(): task_id = self.assigned_task_id @@ -143,7 +150,7 @@ def _handle_computation_results( self._task_server.send_results( subtask_id=subtask_id, task_id=task_id, - result=[output_file], + task_api_result=output_file, ) except Exception as e: # pylint: disable=broad-except self._task_server.send_task_failed( @@ -265,6 +272,9 @@ def assigned_subtask_id(self) -> Optional[str]: return None return self._assigned_task.subtask_id + def get_task_resources_dir(self) -> Path: + return self._get_task_dir() / task_api_constants.NETWORK_RESOURCES_DIR + def _is_computing(self) -> bool: return self._computation is not None diff --git a/golem/task/taskserver.py b/golem/task/taskserver.py index b13abc4012..211bbbf331 100644 --- a/golem/task/taskserver.py +++ b/golem/task/taskserver.py @@ -21,6 +21,7 @@ from golem_messages import message from golem_messages.datastructures import tasks as dt_tasks from pydispatch import dispatcher +from twisted.internet import defer from twisted.internet.defer import inlineCallbacks, Deferred, \ TimeoutError as DeferredTimeoutError @@ -436,12 +437,25 @@ def task_given( return False self.task_computer.task_given(msg.compute_task_def) - self.request_resource( - msg.task_id, - msg.subtask_id, - msg.compute_task_def['resources'], - msg.resources_options, - ) + if msg.want_to_compute_task.task_header.environment_prerequisites: + deferreds = [] + for resource_id in msg.compute_task_def['resources']: + deferreds.append(self.new_resource_manager.download( + resource_id, + self.task_computer.get_task_resources_dir(), + msg.resources_options, + )) + defer.gatherResults(deferreds).addBoth( + lambda _: self.resource_collected(msg.task_id), + lambda e: self.resource_failure(msg.task_id, e), + ) + else: + self.request_resource( + msg.task_id, + msg.subtask_id, + msg.compute_task_def['resources'], + msg.resources_options, + ) self.requested_tasks.clear() update_requestor_assigned_sum(msg.requestor_id, msg.price) dispatcher.send( @@ -473,9 +487,15 @@ def resource_failure(self, task_id: str, reason: str) -> None: f'Error downloading resources: {reason}', ) - def send_results(self, subtask_id: str, task_id: str, result: List[Path]): - if not result: - raise ValueError('Not results to send') + def send_results( + self, + subtask_id: str, + task_id: str, + result: Optional[List[Path]] = None, + task_api_result: Optional[Path] = None, + ) -> None: + if not result and not task_api_result: + raise ValueError('No results to send') if subtask_id in self.results_to_send: raise RuntimeError("Incorrect subtask_id: {}".format(subtask_id)) @@ -495,12 +515,18 @@ def send_results(self, subtask_id: str, task_id: str, result: List[Path]): wtr = WaitingTaskResult( task_id=task_id, subtask_id=subtask_id, - result=result, + result=result or task_api_result, last_sending_trial=last_sending_trial, delay_time=delay_time, owner=header.task_owner) - self._create_and_set_result_package(wtr) + if result: + self._create_and_set_result_package(wtr) + else: + resource_id = \ + sync_wait(self.new_resource_manager.share(task_api_result)) + wtr.result_hash = resource_id + self.results_to_send[subtask_id] = wtr Trust.REQUESTED.increase(header.task_owner.key) diff --git a/tests/golem/task/test_taskcomputeradapter.py b/tests/golem/task/test_taskcomputeradapter.py index c1d6c778b5..3c634280a5 100644 --- a/tests/golem/task/test_taskcomputeradapter.py +++ b/tests/golem/task/test_taskcomputeradapter.py @@ -196,7 +196,7 @@ def test_ok(self): self.task_server.send_results.assert_called_once_with( task_id='test_task', subtask_id='test_subtask', - result=[output_file], + task_api_result=output_file, ) @defer.inlineCallbacks @@ -360,6 +360,25 @@ def test_both_computers_reconfigured(self): ) +class TestTaskResourcesDir(TaskComputerAdapterTestBase): + def test_old_assigned(self): + self.new_computer.has_assigned_task.return_value = False + self.old_computer.has_assigned_task.return_value = True + with self.assertRaisesRegex( + ValueError, + 'Task resources directory only available when a task-api task' + ' is assigned'): # pylint: disable=bad-continuation + self.adapter.get_task_resources_dir() + + def test_new_assigned(self): + self.new_computer.has_assigned_task.return_value = True + self.old_computer.has_assigned_task.return_value = False + self.assertEqual( + self.new_computer.get_task_resources_dir.return_value, + self.adapter.get_task_resources_dir(), + ) + + class TestQuit(TaskComputerAdapterTestBase): def test_quit(self): diff --git a/tests/golem/task/test_taskserver.py b/tests/golem/task/test_taskserver.py index 0a56890a63..f11fac177e 100644 --- a/tests/golem/task/test_taskserver.py +++ b/tests/golem/task/test_taskserver.py @@ -35,6 +35,7 @@ from golem.envs.docker.cpu import DockerCPUEnvironment from golem.network.hyperdrive.client import HyperdriveClientOptions, \ HyperdriveClient, to_hyperg_peer +from golem.resource import resourcemanager from golem.resource.dirmanager import DirManager from golem.resource.hyperdrive.resource import ResourceError from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager @@ -1110,6 +1111,24 @@ def test_ok(self, trust): trust.REQUESTED.increase.assert_called_once_with(header.task_owner.key) + def test_task_api(self): + subtask_id = 'test_subtask_id' + task_id = 'test_task_id' + filepath = 'test_filepath' + self.ts.task_keeper.task_headers[task_id] = Mock() + self.ts.new_resource_manager = \ + Mock(spec=resourcemanager.ResourceManager) + + self.ts.send_results(subtask_id, task_id, task_api_result=filepath) + + self.ts.new_resource_manager.share.assert_called_once_with(filepath) + wtr = self.ts.results_to_send[subtask_id] + self.assertEqual( + self.ts.new_resource_manager.share.return_value, + wtr.result_hash, + ) + self.assertEqual(filepath, wtr.result) + @patch('golem.task.taskserver.TaskServer.request_resource') @patch('golem.task.taskserver.update_requestor_assigned_sum') @@ -1163,6 +1182,30 @@ def test_already_assigned( dispatcher_mock.send.assert_not_called() logger_mock.error.assert_called() + def test_task_api( + self, _logger_mock, _dispatcher_mock, + _update_requestor_assigned_sum, _request_resource): + self.ts.task_computer.has_assigned_task.return_value = False + ttc = msg_factories.tasks.TaskToComputeFactory() + ttc.want_to_compute_task.task_header.environment_prerequisites = Mock() + self.assertTrue(ttc.compute_task_def['resources']) # noqa pylint: disable=unsubscriptable-object + self.ts.new_resource_manager = \ + Mock(spec=resourcemanager.ResourceManager) + self.ts.task_computer._new_computer = Mock() + + self.ts.task_given(ttc) + + for resource in ttc.compute_task_def['resources']: # noqa pylint: disable=unsubscriptable-object + self.ts.new_resource_manager.download.assert_any_call( + resource, + self.ts.task_computer.get_task_resources_dir.return_value, + ttc.resources_options, + ) + self.assertEqual( + len(ttc.compute_task_def['resources']), # noqa pylint: disable=unsubscriptable-object + self.ts.new_resource_manager.download.call_count, + ) + @patch('golem.task.taskserver.logger') class TestResourceCollected(TaskServerTestBase):