Skip to content
This repository has been archived by the owner on Oct 31, 2023. It is now read-only.

Commit

Permalink
Merge pull request #4770 from golemfactory/task_api_dirutils
Browse files Browse the repository at this point in the history
 Upgrade to Task API v0.20.0
  • Loading branch information
Wiezzel authored Sep 26, 2019
2 parents fd680a7 + bd7eab3 commit 3b4dfad
Show file tree
Hide file tree
Showing 9 changed files with 94 additions and 103 deletions.
123 changes: 56 additions & 67 deletions golem/task/requestedtaskmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@

from dataclasses import dataclass
from golem_messages import idgenerator
from golem_task_api import constants
from golem_task_api.dirutils import RequestorDir, RequestorTaskDir
from golem_task_api.enums import VerifyResult
from golem_task_api.client import RequestorAppClient
from peewee import fn

from golem.app_manager import AppManager
from golem.app_manager import AppManager, AppId
from golem.model import (
ComputingNode,
default_now,
Expand All @@ -25,7 +26,6 @@
from golem.task.task_api import EnvironmentTaskApiService
from golem.task.timer import ProviderComputeTimers


logger = logging.getLogger(__name__)

TaskId = str
Expand Down Expand Up @@ -59,60 +59,55 @@ class SubtaskDefinition:
deadline: int


class DirManager:
def __init__(self, root_path: Path):
self._root_path = root_path

def get_app_dir(self, app_id: str) -> Path:
app_dir = self._root_path / app_id
app_dir.mkdir(exist_ok=True)
return app_dir

def prepare_task_dir(self, app_id: str, task_id: TaskId) -> Path:
task_dir = self._get_task_dir(app_id, task_id)
task_dir.mkdir()
task_inputs_dir = self.get_task_inputs_dir(app_id, task_id)
task_inputs_dir.mkdir()
subtask_inputs_dir = self.get_subtask_inputs_dir(app_id, task_id)
subtask_inputs_dir.mkdir()
task_outputs_dir = task_dir / constants.TASK_OUTPUTS_DIR
task_outputs_dir.mkdir()
subtask_outputs_dir = self.get_subtask_outputs_dir(app_id, task_id)
subtask_outputs_dir.mkdir()
return task_inputs_dir

def get_task_inputs_dir(self, app_id: str, task_id: TaskId) -> Path:
task_dir = self._get_task_dir(app_id, task_id)
return task_dir / constants.TASK_INPUTS_DIR

def get_subtask_inputs_dir(self, app_id: str, task_id: TaskId) -> Path:
task_dir = self._get_task_dir(app_id, task_id)
return task_dir / constants.SUBTASK_INPUTS_DIR

def get_subtask_outputs_dir(self, app_id: str, task_id: TaskId) -> Path:
task_dir = self._get_task_dir(app_id, task_id)
return task_dir / constants.SUBTASK_OUTPUTS_DIR

def _get_task_dir(self, app_id: str, task_id: TaskId) -> Path:
return self.get_app_dir(app_id) / task_id


class RequestedTaskManager:

def __init__(
self,
env_manager: EnvironmentManager,
app_manager: AppManager,
public_key: bytes,
root_path: Path,
):
) -> None:
logger.debug('RequestedTaskManager(public_key=%r, root_path=%r)',
public_key, root_path)
self._dir_manager = DirManager(root_path)
self._root_path = root_path
self._env_manager = env_manager
self._app_manager = app_manager
self._public_key: bytes = public_key
self._app_clients: Dict[EnvId, RequestorAppClient] = {}

def _app_dir(self, app_id: AppId) -> RequestorDir:
app_dir = RequestorDir(self._root_path / app_id)
app_dir.mkdir(exist_ok=True)
return app_dir

def _task_dir(self, task_id: TaskId) -> RequestorTaskDir:
task = RequestedTask.get(RequestedTask.task_id == task_id)
return self._app_dir(task.app_id).task_dir(task_id)

def get_task_inputs_dir(self, task_id: TaskId) -> Path:
""" Return a path to the directory where task resources should be
placed. """
return self._task_dir(task_id).task_inputs_dir

def get_task_outputs_dir(self, task_id: TaskId) -> Path:
""" Return a path to the directory where task results should be
placed. """
return self._task_dir(task_id).task_outputs_dir

def get_subtask_inputs_dir(self, task_id: TaskId) -> Path:
""" Return a path to the directory of the task network resources. """
return self._task_dir(task_id).subtask_inputs_dir

def get_subtask_outputs_dir(
self,
task_id: TaskId,
subtask_id: SubtaskId
) -> Path:
""" Return a path to the directory where subtasks outputs should be
placed. """
return self._task_dir(task_id).subtask_outputs_dir(subtask_id)

def create_task(
self,
golem_params: CreateTaskParams,
Expand Down Expand Up @@ -152,12 +147,11 @@ def create_task(
task.task_id,
task.app_id,
)
task_inputs_dir = self._dir_manager.prepare_task_dir(
task.app_id,
task.task_id)
task_dir = self._task_dir(task.task_id)
task_dir.prepare()
# Move resources to task_inputs_dir
for resource in golem_params.resources:
shutil.copy2(resource, task_inputs_dir)
shutil.copy2(resource, task_dir.task_inputs_dir)
logger.info(
"Creating task. id=%s, app=%r",
task.task_id,
Expand Down Expand Up @@ -225,17 +219,6 @@ def is_task_finished(task_id: TaskId) -> bool:
task = RequestedTask.get(RequestedTask.task_id == task_id)
return task.status.is_completed()

def get_subtask_inputs_dir(self, task_id: TaskId) -> Path:
""" Return a path to the directory of the task network resources. """
task = RequestedTask.get(RequestedTask.task_id == task_id)
return self._dir_manager.get_subtask_inputs_dir(task.app_id, task_id)

def get_subtask_outputs_dir(self, task_id: TaskId) -> Path:
""" Return a path to the directory where subtasks outputs should be
placed. """
task = RequestedTask.get(RequestedTask.task_id == task_id)
return self._dir_manager.get_subtask_outputs_dir(task.app_id, task_id)

async def has_pending_subtasks(self, task_id: TaskId) -> bool:
""" Return True is there are pending subtasks waiting for
computation at the given moment. If there are the next call to
Expand Down Expand Up @@ -333,32 +316,35 @@ async def verify(self, task_id: TaskId, subtask_id: SubtaskId) -> bool:
subtask.status = SubtaskStatus.verifying
subtask.save()
try:
result = await app_client.verify(task_id, subtask_id)
result, _ = await app_client.verify(task_id, subtask_id)
except Exception as e:
logger.warning(
"Verification failed. subtask=%s, task=%s, exception=%r",
subtask_id,
task_id,
e
)
result = False
result, _ = VerifyResult.FAILURE, str(e)

ProviderComputeTimers.finish(subtask_id)
if result:
if result is VerifyResult.SUCCESS:
subtask.status = SubtaskStatus.finished
else:
elif result is VerifyResult.FAILURE:
subtask.status = SubtaskStatus.failure
else:
# TODO: Handle other results
raise NotImplementedError(f"Unexpected verify result: {result}")
subtask.save()

if result:
if result is VerifyResult.SUCCESS:
# Check if task completed
if not await self.has_pending_subtasks(task_id):
if not self._get_pending_subtasks(task_id):
task.status = TaskStatus.finished
task.save()
await self._shutdown_app_client(task.app_id)

return result
return result is VerifyResult.SUCCESS

async def abort_task(self, task_id):
task = RequestedTask.get(RequestedTask.task_id == task_id)
Expand Down Expand Up @@ -391,7 +377,7 @@ async def restart_task(self, task_id: TaskId) -> None:

async def duplicate_task(self, task_id: TaskId, output_dir: Path) -> TaskId:
task = RequestedTask.get(RequestedTask.task_id == task_id)
inputs_dir = self._dir_manager.get_task_inputs_dir(task.app_id, task_id)
inputs_dir = self.get_task_inputs_dir(task_id)
resources = list(map(lambda f: inputs_dir / f, os.listdir(inputs_dir)))
golem_params = CreateTaskParams(
app_id=task.app_id,
Expand All @@ -417,7 +403,10 @@ async def discard_subtasks(
for subtask in RequestedSubtask.select().where(
RequestedSubtask.subtask_id.in_(subtask_ids)):
assert subtask.task_id == task_id
discarded_subtask_ids = await app_client.discard_subtasks(subtask_ids)
discarded_subtask_ids = await app_client.discard_subtasks(
task_id,
subtask_ids
)
for subtask in RequestedSubtask.select().where(
RequestedSubtask.subtask_id.in_(discarded_subtask_ids)):
subtask.status = SubtaskStatus.cancelled
Expand Down Expand Up @@ -474,7 +463,7 @@ def _get_task_api_service(
env = self._env_manager.environment(env_id)
payload_builder = self._env_manager.payload_builder(env_id)
prereq = env.parse_prerequisites(app.requestor_prereq)
shared_dir = self._dir_manager.get_app_dir(app_id)
shared_dir = self._app_dir(app_id)

return EnvironmentTaskApiService(
env=env,
Expand Down
31 changes: 17 additions & 14 deletions golem/task/server/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ def computed_task_reported(
fgtr = message.concents.ForceGetTaskResult(
report_computed_task=report_computed_task
)
subtask_id = report_computed_task.subtask_id
concent_service.submit_task_message(
report_computed_task.subtask_id,
subtask_id,
fgtr,
msg_helpers.maximum_download_time(
report_computed_task.size,
Expand All @@ -45,7 +46,7 @@ def on_success(extracted_pkg, *_args, **_kwargs):
logger.debug("Task result extracted %r", extracted_pkg.__dict__)

concent_service.cancel_task_message(
report_computed_task.subtask_id,
subtask_id,
'ForceGetTaskResult',
)
task_server.verify_results(
Expand All @@ -57,7 +58,7 @@ def on_success(extracted_pkg, *_args, **_kwargs):
def on_error(exc, *_args, **_kwargs):
logger.warning(
"Task result error: %s (%s)",
report_computed_task.subtask_id,
subtask_id,
exc or "unspecified",
)

Expand All @@ -66,7 +67,7 @@ def on_error(exc, *_args, **_kwargs):
# to obtain the task results
logger.debug('[CONCENT] sending ForceGetTaskResult: %s', fgtr)
concent_service.submit_task_message(
report_computed_task.subtask_id,
subtask_id,
fgtr,
)
after_error()
Expand All @@ -76,12 +77,14 @@ def on_error(exc, *_args, **_kwargs):
report_computed_task.options
)

requested_task_manager = task_server.requested_task_manager
if requested_task_manager.task_exists(task_id):
rtm = task_server.requested_task_manager
if rtm.task_exists(task_id):
download_dir = rtm.get_subtask_outputs_dir(task_id, subtask_id)
download_dir.mkdir()
deferred = task_server.new_resource_manager.download(
report_computed_task.multihash,
requested_task_manager.get_subtask_outputs_dir(task_id),
client_options,
resource_id=report_computed_task.multihash,
directory=download_dir,
client_options=client_options,
)
deferred.addCallback(on_success)
deferred.addErrback(on_error)
Expand All @@ -90,12 +93,12 @@ def on_error(exc, *_args, **_kwargs):
task = task_manager.tasks.get(task_id, None)
output_dir = task.tmp_dir if hasattr(task, 'tmp_dir') else None
# Request results
task_manager.task_result_incoming(report_computed_task.subtask_id)
task_manager.task_result_incoming(subtask_id)
task_manager.task_result_manager.pull_package(
report_computed_task.multihash,
report_computed_task.task_id,
report_computed_task.subtask_id,
report_computed_task.secret,
content_hash=report_computed_task.multihash,
task_id=task_id,
subtask_id=subtask_id,
key_or_secret=report_computed_task.secret,
success=on_success,
error=on_error,
client_options=client_options,
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ eventlet==0.24.1
fs==2.4.4
Golem-Messages==3.11.0
Golem-Smart-Contracts-Interface==1.10.3
golem_task_api==0.18.3
golem_task_api==0.20.0
greenlet==0.4.15
h2==3.0.1
hpack==3.0.0
Expand Down
2 changes: 1 addition & 1 deletion requirements_to-freeze.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ eth-utils==1.0.3
ethereum==1.6.1
Golem-Messages==3.11.0
Golem-Smart-Contracts-Interface==1.10.3
golem_task_api==0.18.3
golem_task_api==0.20.0
html2text==2018.1.9
humanize==0.5.1
incremental==17.5.0
Expand Down
2 changes: 1 addition & 1 deletion scripts/task_api_tests/basic_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ async def test_task(
result_path = await task_computer.compute()
shutil.copy2(
result_path,
rtm.get_subtask_outputs_dir(task_id),
rtm.get_subtask_outputs_dir(task_id, subtask_def.subtask_id),
)
print('Starting verification')
verdict = await deferred_from_future(rtm.verify(
Expand Down
5 changes: 3 additions & 2 deletions tests/golem/envs/localhost.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import dill
from dataclasses import dataclass, asdict
from golem_task_api import RequestorAppHandler, ProviderAppHandler, entrypoint
from golem_task_api.enums import VerifyResult
from golem_task_api.structs import Subtask, Task
from twisted.internet import defer, threads

Expand Down Expand Up @@ -54,7 +55,7 @@ class LocalhostPrerequisites(Prerequisites):
create_task: Callable[[], Awaitable[Task]] = _not_implemented
next_subtask: Callable[[], Awaitable[Optional[Subtask]]] = _not_implemented
has_pending_subtasks: Callable[[], Awaitable[bool]] = _not_implemented
verify: Callable[[str], Awaitable[Tuple[bool, Optional[str]]]] = \
verify: Callable[[str], Awaitable[Tuple[VerifyResult, Optional[str]]]] = \
_not_implemented

def to_dict(self) -> dict:
Expand Down Expand Up @@ -114,7 +115,7 @@ async def verify(
self,
task_work_dir: Path,
subtask_id: str
) -> Tuple[bool, Optional[str]]:
) -> Tuple[VerifyResult, Optional[str]]:
return await self._prereq.verify(subtask_id) # type: ignore

async def discard_subtasks(
Expand Down
22 changes: 9 additions & 13 deletions tests/golem/envs/test_localhost_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
TaskApiService,
RequestorAppClient
)
from golem_task_api.enums import VerifyResult
from golem_task_api.structs import Subtask, Task
from grpclib.exceptions import StreamTerminatedError
from twisted.internet.defer import inlineCallbacks
Expand Down Expand Up @@ -163,25 +164,20 @@ def test_verify(self):

async def verify(subtask_id):
if subtask_id == good_subtask_id:
return True, None
return VerifyResult.SUCCESS, None
elif subtask_id == bad_subtask_id:
return False, 'test_error'
return VerifyResult.FAILURE, 'error'

prereq = LocalhostPrerequisites(verify=verify)
service = self._get_service(prereq)
client_future = asyncio.ensure_future(
RequestorAppClient.create(service))
client = yield deferred_from_future(client_future)
client = yield deferred_from_future(RequestorAppClient.create(service))

good_verify_future = asyncio.ensure_future(
good_verify_result = yield deferred_from_future(
client.verify('test_task', good_subtask_id))
good_verify_result = yield deferred_from_future(good_verify_future)
self.assertTrue(good_verify_result)
self.assertEqual(good_verify_result, (VerifyResult.SUCCESS, ''))

bad_verify_future = asyncio.ensure_future(
bad_verify_result = yield deferred_from_future(
client.verify('test_task', bad_subtask_id))
bad_verify_result = yield deferred_from_future(bad_verify_future)
self.assertFalse(bad_verify_result)
self.assertEqual(bad_verify_result, (VerifyResult.FAILURE, 'error'))

shutdown_future = asyncio.ensure_future(client.shutdown())
yield deferred_from_future(shutdown_future)
yield deferred_from_future(client.shutdown())
Loading

0 comments on commit 3b4dfad

Please sign in to comment.