Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Task API computation flow #4561

Merged
merged 1 commit into from
Aug 12, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dataclasses import dataclass
from golem_messages.message.tasks import ComputeTaskDef, TaskHeader
from golem_task_api import ProviderAppClient
from golem_task_api import ProviderAppClient, constants as task_api_constants
from pydispatch import dispatcher
from twisted.internet import defer

Expand Down Expand Up @@ -118,6 +118,13 @@ def support_direct_computation(self) -> bool:
def support_direct_computation(self, value: bool) -> None:
self._old_computer.support_direct_computation = value

def get_task_resources_dir(self) -> Path:
if not self._new_computer.has_assigned_task():
raise ValueError(
'Task resources directory only available when a task-api task '
'is assigned')
return self._new_computer.get_task_resources_dir()

def start_computation(self) -> None:
if self._new_computer.has_assigned_task():
task_id = self.assigned_task_id
Expand All @@ -143,7 +150,7 @@ def _handle_computation_results(
self._task_server.send_results(
subtask_id=subtask_id,
task_id=task_id,
result=[output_file],
task_api_result=output_file,
)
except Exception as e: # pylint: disable=broad-except
self._task_server.send_task_failed(
Expand Down Expand Up @@ -265,6 +272,9 @@ def assigned_subtask_id(self) -> Optional[str]:
return None
return self._assigned_task.subtask_id

def get_task_resources_dir(self) -> Path:
return self._get_task_dir() / task_api_constants.NETWORK_RESOURCES_DIR

def _is_computing(self) -> bool:
return self._computation is not None

Expand Down
48 changes: 37 additions & 11 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from golem_messages import message
from golem_messages.datastructures import tasks as dt_tasks
from pydispatch import dispatcher
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, Deferred, \
TimeoutError as DeferredTimeoutError

Expand Down Expand Up @@ -436,12 +437,25 @@ def task_given(
return False

self.task_computer.task_given(msg.compute_task_def)
self.request_resource(
msg.task_id,
msg.subtask_id,
msg.compute_task_def['resources'],
msg.resources_options,
)
if msg.want_to_compute_task.task_header.environment_prerequisites:
deferreds = []
for resource_id in msg.compute_task_def['resources']:
deferreds.append(self.new_resource_manager.download(
resource_id,
self.task_computer.get_task_resources_dir(),
msg.resources_options,
))
defer.gatherResults(deferreds).addBoth(
lambda _: self.resource_collected(msg.task_id),
lambda e: self.resource_failure(msg.task_id, e),
)
else:
self.request_resource(
msg.task_id,
msg.subtask_id,
msg.compute_task_def['resources'],
msg.resources_options,
)
self.requested_tasks.clear()
update_requestor_assigned_sum(msg.requestor_id, msg.price)
dispatcher.send(
Expand Down Expand Up @@ -473,9 +487,15 @@ def resource_failure(self, task_id: str, reason: str) -> None:
f'Error downloading resources: {reason}',
)

def send_results(self, subtask_id: str, task_id: str, result: List[Path]):
if not result:
raise ValueError('Not results to send')
def send_results(
self,
subtask_id: str,
task_id: str,
result: Optional[List[Path]] = None,
task_api_result: Optional[Path] = None,
) -> None:
if not result and not task_api_result:
raise ValueError('No results to send')

if subtask_id in self.results_to_send:
raise RuntimeError("Incorrect subtask_id: {}".format(subtask_id))
Expand All @@ -495,12 +515,18 @@ def send_results(self, subtask_id: str, task_id: str, result: List[Path]):
wtr = WaitingTaskResult(
task_id=task_id,
subtask_id=subtask_id,
result=result,
result=result or task_api_result,
last_sending_trial=last_sending_trial,
delay_time=delay_time,
owner=header.task_owner)

self._create_and_set_result_package(wtr)
if result:
self._create_and_set_result_package(wtr)
else:
resource_id = \
sync_wait(self.new_resource_manager.share(task_api_result))
wtr.result_hash = resource_id

self.results_to_send[subtask_id] = wtr

Trust.REQUESTED.increase(header.task_owner.key)
Expand Down
21 changes: 20 additions & 1 deletion tests/golem/task/test_taskcomputeradapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_ok(self):
self.task_server.send_results.assert_called_once_with(
task_id='test_task',
subtask_id='test_subtask',
result=[output_file],
task_api_result=output_file,
)

@defer.inlineCallbacks
Expand Down Expand Up @@ -360,6 +360,25 @@ def test_both_computers_reconfigured(self):
)


class TestTaskResourcesDir(TaskComputerAdapterTestBase):
def test_old_assigned(self):
self.new_computer.has_assigned_task.return_value = False
self.old_computer.has_assigned_task.return_value = True
with self.assertRaisesRegex(
ValueError,
'Task resources directory only available when a task-api task'
' is assigned'): # pylint: disable=bad-continuation
self.adapter.get_task_resources_dir()

def test_new_assigned(self):
self.new_computer.has_assigned_task.return_value = True
self.old_computer.has_assigned_task.return_value = False
self.assertEqual(
self.new_computer.get_task_resources_dir.return_value,
self.adapter.get_task_resources_dir(),
)


class TestQuit(TaskComputerAdapterTestBase):

def test_quit(self):
Expand Down
43 changes: 43 additions & 0 deletions tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.network.hyperdrive.client import HyperdriveClientOptions, \
HyperdriveClient, to_hyperg_peer
from golem.resource import resourcemanager
from golem.resource.dirmanager import DirManager
from golem.resource.hyperdrive.resource import ResourceError
from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager
Expand Down Expand Up @@ -1110,6 +1111,24 @@ def test_ok(self, trust):

trust.REQUESTED.increase.assert_called_once_with(header.task_owner.key)

def test_task_api(self):
subtask_id = 'test_subtask_id'
task_id = 'test_task_id'
filepath = 'test_filepath'
self.ts.task_keeper.task_headers[task_id] = Mock()
self.ts.new_resource_manager = \
Mock(spec=resourcemanager.ResourceManager)

self.ts.send_results(subtask_id, task_id, task_api_result=filepath)

self.ts.new_resource_manager.share.assert_called_once_with(filepath)
wtr = self.ts.results_to_send[subtask_id]
self.assertEqual(
self.ts.new_resource_manager.share.return_value,
wtr.result_hash,
)
self.assertEqual(filepath, wtr.result)


@patch('golem.task.taskserver.TaskServer.request_resource')
@patch('golem.task.taskserver.update_requestor_assigned_sum')
Expand Down Expand Up @@ -1163,6 +1182,30 @@ def test_already_assigned(
dispatcher_mock.send.assert_not_called()
logger_mock.error.assert_called()

def test_task_api(
self, _logger_mock, _dispatcher_mock,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

one argument per line from the style guide

_update_requestor_assigned_sum, _request_resource):
self.ts.task_computer.has_assigned_task.return_value = False
ttc = msg_factories.tasks.TaskToComputeFactory()
ttc.want_to_compute_task.task_header.environment_prerequisites = Mock()
self.assertTrue(ttc.compute_task_def['resources']) # noqa pylint: disable=unsubscriptable-object
self.ts.new_resource_manager = \
Mock(spec=resourcemanager.ResourceManager)
self.ts.task_computer._new_computer = Mock()

self.ts.task_given(ttc)

for resource in ttc.compute_task_def['resources']: # noqa pylint: disable=unsubscriptable-object
self.ts.new_resource_manager.download.assert_any_call(
resource,
self.ts.task_computer.get_task_resources_dir.return_value,
ttc.resources_options,
)
self.assertEqual(
len(ttc.compute_task_def['resources']), # noqa pylint: disable=unsubscriptable-object
self.ts.new_resource_manager.download.call_count,
)


@patch('golem.task.taskserver.logger')
class TestResourceCollected(TaskServerTestBase):
Expand Down