Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Task API computation flow
Browse files Browse the repository at this point in the history
  • Loading branch information
Krigpl committed Jul 31, 2019
1 parent 1189656 commit a5bdfe0
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 14 deletions.
14 changes: 12 additions & 2 deletions golem/task/taskcomputer.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from dataclasses import dataclass
from golem_messages.message.tasks import ComputeTaskDef, TaskHeader
from golem_task_api import ProviderAppClient
from golem_task_api import ProviderAppClient, constants as task_api_constants
from pydispatch import dispatcher
from twisted.internet import defer

Expand Down Expand Up @@ -118,6 +118,13 @@ def support_direct_computation(self) -> bool:
def support_direct_computation(self, value: bool) -> None:
self._old_computer.support_direct_computation = value

def get_task_resources_dir(self) -> Path:
if not self._new_computer.has_assigned_task():
raise ValueError(
'Task resources directory only available when a task-api task '
'is assigned')
return self._new_computer.get_task_resources_dir()

def start_computation(self) -> None:
if self._new_computer.has_assigned_task():
task_id = self.assigned_task_id
Expand All @@ -143,7 +150,7 @@ def _handle_computation_results(
self._task_server.send_results(
subtask_id=subtask_id,
task_id=task_id,
result=[output_file],
task_api_result=output_file,
)
except Exception as e: # pylint: disable=broad-except
self._task_server.send_task_failed(
Expand Down Expand Up @@ -265,6 +272,9 @@ def assigned_subtask_id(self) -> Optional[str]:
return None
return self._assigned_task.subtask_id

def get_task_resources_dir(self) -> Path:
return self._get_task_dir() / task_api_constants.NETWORK_RESOURCES_DIR

def _is_computing(self) -> bool:
return self._computation is not None

Expand Down
48 changes: 37 additions & 11 deletions golem/task/taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from golem_messages import message
from golem_messages.datastructures import tasks as dt_tasks
from pydispatch import dispatcher
from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, Deferred, \
TimeoutError as DeferredTimeoutError

Expand Down Expand Up @@ -436,12 +437,25 @@ def task_given(
return False

self.task_computer.task_given(msg.compute_task_def)
self.request_resource(
msg.task_id,
msg.subtask_id,
msg.compute_task_def['resources'],
msg.resources_options,
)
if msg.want_to_compute_task.task_header.environment_prerequisites:
deferreds = []
for resource_id in msg.compute_task_def['resources']:
deferreds.append(self.new_resource_manager.download(
resource_id,
self.task_computer.get_task_resources_dir(),
msg.resources_options,
))
defer.gatherResults(deferreds).addBoth(
lambda _: self.resource_collected(msg.task_id),
lambda e: self.resource_failure(msg.task_id, e),
)
else:
self.request_resource(
msg.task_id,
msg.subtask_id,
msg.compute_task_def['resources'],
msg.resources_options,
)
self.requested_tasks.clear()
update_requestor_assigned_sum(msg.requestor_id, msg.price)
dispatcher.send(
Expand Down Expand Up @@ -473,9 +487,15 @@ def resource_failure(self, task_id: str, reason: str) -> None:
f'Error downloading resources: {reason}',
)

def send_results(self, subtask_id: str, task_id: str, result: List[Path]):
if not result:
raise ValueError('Not results to send')
def send_results(
self,
subtask_id: str,
task_id: str,
result: Optional[List[Path]] = None,
task_api_result: Optional[Path] = None,
) -> None:
if not result and not task_api_result:
raise ValueError('No results to send')

if subtask_id in self.results_to_send:
raise RuntimeError("Incorrect subtask_id: {}".format(subtask_id))
Expand All @@ -495,12 +515,18 @@ def send_results(self, subtask_id: str, task_id: str, result: List[Path]):
wtr = WaitingTaskResult(
task_id=task_id,
subtask_id=subtask_id,
result=result,
result=result or task_api_result,
last_sending_trial=last_sending_trial,
delay_time=delay_time,
owner=header.task_owner)

self._create_and_set_result_package(wtr)
if result:
self._create_and_set_result_package(wtr)
else:
resource_id = \
sync_wait(self.new_resource_manager.share(task_api_result))
wtr.result_hash = resource_id

self.results_to_send[subtask_id] = wtr

Trust.REQUESTED.increase(header.task_owner.key)
Expand Down
21 changes: 20 additions & 1 deletion tests/golem/task/test_taskcomputeradapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def test_ok(self):
self.task_server.send_results.assert_called_once_with(
task_id='test_task',
subtask_id='test_subtask',
result=[output_file],
task_api_result=output_file,
)

@defer.inlineCallbacks
Expand Down Expand Up @@ -360,6 +360,25 @@ def test_both_computers_reconfigured(self):
)


class TestTaskResourcesDir(TaskComputerAdapterTestBase):
def test_old_assigned(self):
self.new_computer.has_assigned_task.return_value = False
self.old_computer.has_assigned_task.return_value = True
with self.assertRaisesRegex(
ValueError,
'Task resources directory only available when a task-api task'
' is assigned'):
self.adapter.get_task_resources_dir()

def test_new_assigned(self):
self.new_computer.has_assigned_task.return_value = True
self.old_computer.has_assigned_task.return_value = False
self.assertEqual(
self.new_computer.get_task_resources_dir.return_value,
self.adapter.get_task_resources_dir(),
)


class TestQuit(TaskComputerAdapterTestBase):

def test_quit(self):
Expand Down
43 changes: 43 additions & 0 deletions tests/golem/task/test_taskserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from golem.envs.docker.cpu import DockerCPUEnvironment
from golem.network.hyperdrive.client import HyperdriveClientOptions, \
HyperdriveClient, to_hyperg_peer
from golem.resource import resourcemanager
from golem.resource.dirmanager import DirManager
from golem.resource.hyperdrive.resource import ResourceError
from golem.resource.hyperdrive.resourcesmanager import HyperdriveResourceManager
Expand Down Expand Up @@ -1110,6 +1111,24 @@ def test_ok(self, trust):

trust.REQUESTED.increase.assert_called_once_with(header.task_owner.key)

def test_task_api(self):
subtask_id = 'test_subtask_id'
task_id = 'test_task_id'
filepath = 'test_filepath'
self.ts.task_keeper.task_headers[task_id] = Mock()
self.ts.new_resource_manager = \
Mock(spec=resourcemanager.ResourceManager)

self.ts.send_results(subtask_id, task_id, task_api_result=filepath)

self.ts.new_resource_manager.share.assert_called_once_with(filepath)
wtr = self.ts.results_to_send[subtask_id]
self.assertEqual(
self.ts.new_resource_manager.share.return_value,
wtr.result_hash,
)
self.assertEqual(filepath, wtr.result)


@patch('golem.task.taskserver.TaskServer.request_resource')
@patch('golem.task.taskserver.update_requestor_assigned_sum')
Expand Down Expand Up @@ -1163,6 +1182,30 @@ def test_already_assigned(
dispatcher_mock.send.assert_not_called()
logger_mock.error.assert_called()

def test_task_api(
self, _logger_mock, _dispatcher_mock,
_update_requestor_assigned_sum, _request_resource):
self.ts.task_computer.has_assigned_task.return_value = False
ttc = msg_factories.tasks.TaskToComputeFactory()
ttc.want_to_compute_task.task_header.environment_prerequisites = Mock()
self.assertTrue(ttc.compute_task_def['resources']) # noqa pylint: disable=unsubscriptable-object
self.ts.new_resource_manager = \
Mock(spec=resourcemanager.ResourceManager)
self.ts.task_computer._new_computer = Mock()

self.ts.task_given(ttc)

for resource in ttc.compute_task_def['resources']: # noqa pylint: disable=unsubscriptable-object
self.ts.new_resource_manager.download.assert_any_call(
resource,
self.ts.task_computer.get_task_resources_dir.return_value,
ttc.resources_options,
)
self.assertEqual(
len(ttc.compute_task_def['resources']), # noqa pylint: disable=unsubscriptable-object
self.ts.new_resource_manager.download.call_count,
)


@patch('golem.task.taskserver.logger')
class TestResourceCollected(TaskServerTestBase):
Expand Down

0 comments on commit a5bdfe0

Please sign in to comment.