Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial dask implementation (local only) #94

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions deepaas/api/v2/predict.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,7 @@ async def post(self, request, wsk_args=None):
if wsk_args:
args.update(wsk_args)
task = self.model_obj.predict(**args)
await task

ret = task.result()['output']

if isinstance(ret, model.v2.wrapper.ReturnedFile):
ret = open(ret.filename, 'rb')
ret = await task.result()

accept = args.get("accept", "application/json")
if accept not in ["application/json", "*/*"]:
Expand Down
27 changes: 15 additions & 12 deletions deepaas/api/v2/train.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ def __init__(self, model_name, model_obj):
self._trainings = {}

@staticmethod
def build_train_response(uuid, training):
async def build_train_response(uuid, training):
if not training:
return

Expand All @@ -57,18 +57,20 @@ def build_train_response(uuid, training):
if training["task"].cancelled():
ret["status"] = "cancelled"
elif training["task"].done():
exc = training["task"].exception()
exc = await training["task"].exception()
if exc:
ret["status"] = "error"
ret["message"] = "%s" % exc
else:
ret["status"] = "done"
ret["result"] = training["task"].result()
end = datetime.strptime(ret["result"]["finish_date"],
'%Y-%m-%d %H:%M:%S.%f')
start = datetime.strptime(ret["date"],
'%Y-%m-%d %H:%M:%S.%f')
ret["result"]["duration"] = str(end - start)
ret["result"] = {}
ret["result"]["output"] = await training["task"].result()
# FIXME(aloga): when moving to dask, this has disspared.
# end = datetime.strptime(ret["result"]["finish_date"],
# '%Y-%m-%d %H:%M:%S.%f')
# start = datetime.strptime(ret["date"],
# '%Y-%m-%d %H:%M:%S.%f')
# ret["result"]["duration"] = str(end - start)
else:
ret["status"] = "running"
return ret
Expand All @@ -87,7 +89,8 @@ async def post(self, request, args, wsk_args=None):
"task": train_task,
"args": args,
}
ret = self.build_train_response(uuid_, self._trainings[uuid_])
ret = await self.build_train_response(uuid_,
self._trainings[uuid_])
return web.json_response(ret)

@aiohttp_apispec.docs(
Expand All @@ -105,7 +108,7 @@ async def delete(self, request, wsk_args=None):
except asyncio.TimeoutError:
pass
LOG.info("Training %s has been cancelled" % uuid_)
ret = self.build_train_response(uuid_, training)
ret = await self.build_train_response(uuid_, training)
return web.json_response(ret)

@aiohttp_apispec.docs(
Expand All @@ -117,7 +120,7 @@ async def index(self, request, wsk_args=None):
ret = []
for uuid_, training in self._trainings.items():
training = self._trainings.get(uuid_, None)
aux = self.build_train_response(uuid_, training)
aux = await self.build_train_response(uuid_, training)
ret.append(aux)

return web.json_response(ret)
Expand All @@ -130,7 +133,7 @@ async def index(self, request, wsk_args=None):
async def get(self, request, wsk_args=None):
uuid_ = request.match_info["uuid"]
training = self._trainings.get(uuid_, None)
ret = self.build_train_response(uuid_, training)
ret = await self.build_train_response(uuid_, training)
if ret:
return web.json_response(ret)
raise web.HTTPNotFound()
Expand Down
33 changes: 30 additions & 3 deletions deepaas/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,37 @@
cfg.IntOpt('workers',
short='p',
default=1,
deprecated_for_removal=True,
help="""
Specify the number of workers to spawn. If using a CPU you probably want to
increase this number, if using a GPU probably you want to leave it to 1.
(defaults to 1)
Specify the number of workers to spawn for prediction tasks. If using a CPU you
probably want to increase this number, if using a GPU probably you want to
leave it to 1. (defaults to 1)

This option is deprecated for removal, as DEEPaaS has switched to Dask to
manage the execution of background tasks. Please check the documentation
for the 'dask-config' option for more details.
"""),
cfg.IntOpt('train-workers',
default=1,
deprecated_for_removal=True,
help="""
Specify the number of workers to spawn for training tasks. Unless you know what
you are doing you should leave this number to 1. (defaults to 1)

This option is deprecated for removal, as DEEPaaS has switched to Dask to
manage the execution of background tasks. Please check the documentation
for the 'dask-config' option for more details.
>>>>>>> 68dcd2d (dask: deprecate old worker options)
"""),
cfg.StrOpt('dask-config',
default=None,
help="""
Specify the path where the dask configuration file is stored. If no file is
provided, the default Dask configuration file locations will be used. If none
of them exists, default configuration values will be used.

Check https://docs.dask.org/en/latest/configuration.html for more information
about Dask configurations.
"""),
cfg.IntOpt('client-max-size',
default=0,
Expand Down
77 changes: 77 additions & 0 deletions deepaas/model/v2/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# -*- coding: utf-8 -*-

# Copyright 2021 Spanish National Research Council (CSIC)
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

import dask.distributed
from oslo_config import cfg
from oslo_log import log

LOG = log.getLogger(__name__)

CONF = cfg.CONF


def get_default_executor():
return LocalDaskExecutor()


class Executor:
def __repr__(self) -> str:
return "<Executor: {}>".format(type(self).__name__)

def submit(self, fn, *args, extra_context=None, **kwargs):
"""Submit a function to the executor for execution.

:param fn: function that is being submitted for execution
:param *args: arguments to be passed to `fn`
:param extra_context: an optional dictionary with extra information
about the submitted task
:param **kwargs: keyword arguments to be passed to `fn`

:returns: a future-like object
"""
raise NotImplementedError()

def shutdown(self):
"""Shut down the executor."""
raise NotImplementedError()


class LocalDaskExecutor(Executor):
def __init__(self):
config = {
"temporary-directory": "/tmp/",
}
paths = CONF.dask_config
if paths:
tmp_conf = dask.config.collect(paths=paths)
else:
tmp_conf = dask.config.collect()
config.update(tmp_conf)
dask.config.set(config)

self.client = dask.distributed.Client(
asynchronous=True,
)
super().__init__()

def submit(self, fn, *args, **kwargs):
"""FIXME(aloga): document this."""

fut = self.client.submit(fn, *args, **kwargs)
return fut

def shutdown(self):
self.client.shutdown()
Loading