From d883c34c516e302f2afd32aafb3e3b36ef0c892e Mon Sep 17 00:00:00 2001 From: Yao Weng Date: Wed, 16 Sep 2020 18:46:45 -0400 Subject: [PATCH] kfserver add load/unload endpoint (#1082) * kfserver add load/unload endpoint * address comments * revert license * fix python packages bugs * revert crd serving.kubeflow.org_inferenceservices * add missing packages in setup.py * update package import and trigger build/test * revert back pytorchserver * update * fix sklearn docker * fix model package init * revert back sklearn/xgboost and add SKLearnModelRepository/XGBoostModelRepository * remove unused file * mount directory follows /mnt/models/ * remove unused files * create storage destination dir if not exist * model dir -> /mnt/models/ * fix unit test * revert back model repo * pass model dir to KFModelRepository * move registered_models to the last on the argument list --- .../alibiexplainer/explainer.py | 2 +- python/kfserving/kfserving/handlers/http.py | 11 ++- python/kfserving/kfserving/kfmodel.py | 3 +- .../kfserving/kfserving/kfmodel_repository.py | 54 +++++++++++++ python/kfserving/kfserving/kfserver.py | 74 +++++++++++++++--- python/kfserving/kfserving/storage.py | 2 + python/kfserving/requirements.txt | 2 +- python/kfserving/test/test_server.py | 68 +++++++++++++++- .../example_model/{ => model}/cifar10.py | 0 .../example_model/{ => model}/model.pt | Bin python/pytorchserver/pytorchserver/model.py | 5 +- .../pytorchserver/pytorchserver/test_model.py | 9 +-- python/sklearnserver/setup.py | 1 + .../sklearnserver/sklearnserver/__init__.py | 1 + .../sklearnserver/sklearnserver/__init__.pyc | Bin 170 -> 0 bytes .../sklearnserver/sklearnserver/__main__.py | 17 ++-- .../joblib/{ => model}/model.joblib | Bin .../pickle/{ => model}/model.pickle | Bin .../example_models/pkl/model/model.pkl | Bin 0 -> 4516 bytes python/sklearnserver/sklearnserver/model.py | 17 ++-- .../sklearnserver/sklearn_model_repository.py | 29 +++++++ .../sklearnserver/sklearnserver/test_model.py | 10 +-- .../test_sklearn_model_repository.py | 51 ++++++++++++ python/xgbserver/setup.py | 1 + python/xgbserver/xgbserver/__init__.py | 1 + python/xgbserver/xgbserver/__main__.py | 20 +++-- .../example_model/{ => model}/model.bst | Bin python/xgbserver/xgbserver/model.py | 15 ++-- python/xgbserver/xgbserver/test_model.py | 7 +- .../test_xgboost_model_repository.py | 39 +++++++++ .../xgbserver/xgboost_model_repository.py | 29 +++++++ 31 files changed, 406 insertions(+), 62 deletions(-) create mode 100644 python/kfserving/kfserving/kfmodel_repository.py rename python/pytorchserver/pytorchserver/example_model/{ => model}/cifar10.py (100%) rename python/pytorchserver/pytorchserver/example_model/{ => model}/model.pt (100%) delete mode 100644 python/sklearnserver/sklearnserver/__init__.pyc rename python/sklearnserver/sklearnserver/example_models/joblib/{ => model}/model.joblib (100%) rename python/sklearnserver/sklearnserver/example_models/pickle/{ => model}/model.pickle (100%) create mode 100644 python/sklearnserver/sklearnserver/example_models/pkl/model/model.pkl create mode 100644 python/sklearnserver/sklearnserver/sklearn_model_repository.py create mode 100644 python/sklearnserver/sklearnserver/test_sklearn_model_repository.py rename python/xgbserver/xgbserver/example_model/{ => model}/model.bst (100%) create mode 100644 python/xgbserver/xgbserver/test_xgboost_model_repository.py create mode 100644 python/xgbserver/xgbserver/xgboost_model_repository.py diff --git a/python/alibiexplainer/alibiexplainer/explainer.py b/python/alibiexplainer/alibiexplainer/explainer.py index 61fe0ff1b45..74650da2ff8 100644 --- a/python/alibiexplainer/alibiexplainer/explainer.py +++ b/python/alibiexplainer/alibiexplainer/explainer.py @@ -61,7 +61,7 @@ def __init__( # pylint:disable=too-many-arguments else: raise NotImplementedError - def load(self): + def load(self) -> bool: pass def _predict_fn(self, arr: Union[np.ndarray, List]) -> np.ndarray: diff --git a/python/kfserving/kfserving/handlers/http.py b/python/kfserving/kfserving/handlers/http.py index 7b8984c963b..25d4b6bcd69 100644 --- a/python/kfserving/kfserving/handlers/http.py +++ b/python/kfserving/kfserving/handlers/http.py @@ -15,22 +15,21 @@ import inspect import tornado.web import json -from typing import Dict from http import HTTPStatus -from kfserving.kfmodel import KFModel +from kfserving.kfmodel_repository import KFModelRepository class HTTPHandler(tornado.web.RequestHandler): - def initialize(self, models: Dict[str, KFModel]): - self.models = models # pylint:disable=attribute-defined-outside-init + def initialize(self, models: KFModelRepository): + self.models = models # pylint:disable=attribute-defined-outside-init def get_model(self, name: str): - if name not in self.models: + model = self.models.get_model(name) + if model is None: raise tornado.web.HTTPError( status_code=HTTPStatus.NOT_FOUND, reason="Model with name %s does not exist." % name ) - model = self.models[name] if not model.ready: model.load() return model diff --git a/python/kfserving/kfserving/kfmodel.py b/python/kfserving/kfserving/kfmodel.py index 215e815a3ad..9fa57068603 100644 --- a/python/kfserving/kfserving/kfmodel.py +++ b/python/kfserving/kfserving/kfmodel.py @@ -43,8 +43,9 @@ def _http_client(self): self._http_client_instance = AsyncHTTPClient(max_clients=sys.maxsize) return self._http_client_instance - def load(self): + def load(self) -> bool: self.ready = True + return self.ready def preprocess(self, request: Dict) -> Dict: return request diff --git a/python/kfserving/kfserving/kfmodel_repository.py b/python/kfserving/kfserving/kfmodel_repository.py new file mode 100644 index 00000000000..9a01db2c61c --- /dev/null +++ b/python/kfserving/kfserving/kfmodel_repository.py @@ -0,0 +1,54 @@ +# Copyright 2020 kubeflow.org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import List, Optional +from kfserving import KFModel + +MODEL_MOUNT_DIRS = "/mnt/models" + + +class KFModelRepository: + """ + Model repository interface, follows NVIDIA Triton's `model-repository` + extension. + """ + + def __init__(self, models_dir: str = MODEL_MOUNT_DIRS): + self.models = {} + self.models_dir = models_dir + + def set_models_dir(self, models_dir): # used for unit tests + self.models_dir = models_dir + + def get_model(self, name: str) -> Optional[KFModel]: + return self.models.get(name, None) + + def get_models(self) -> List[KFModel]: + return list(self.models.values()) + + def is_model_ready(self, name: str): + model = self.get_model(name) + return False if model is None else model.ready + + def update(self, model: KFModel): + self.models[model.name] = model + + def load(self, name: str) -> bool: + pass + + def unload(self, name: str): + if name in self.models: + del self.models[name] + else: + raise KeyError(f"model {name} does not exist") diff --git a/python/kfserving/kfserving/kfserver.py b/python/kfserving/kfserving/kfserver.py index 923197b7ca0..52b2d9fa23d 100644 --- a/python/kfserving/kfserving/kfserver.py +++ b/python/kfserving/kfserving/kfserver.py @@ -15,13 +15,17 @@ import argparse import logging import json -from typing import List, Dict +import inspect +import sys +from typing import List, Optional import tornado.ioloop import tornado.web import tornado.httpserver import tornado.log + from kfserving.handlers.http import PredictHandler, ExplainHandler from kfserving import KFModel +from kfserving.kfmodel_repository import KFModelRepository DEFAULT_HTTP_PORT = 8080 DEFAULT_GRPC_PORT = 8081 @@ -40,17 +44,19 @@ tornado.log.enable_pretty_logging() + class KFServer: def __init__(self, http_port: int = args.http_port, grpc_port: int = args.grpc_port, max_buffer_size: int = args.max_buffer_size, - workers: int = args.workers): - self.registered_models = {} + workers: int = args.workers, + registered_models: KFModelRepository = KFModelRepository()): + self.registered_models = registered_models self.http_port = http_port self.grpc_port = grpc_port self.max_buffer_size = max_buffer_size self.workers = workers - self._http_server = None + self._http_server: Optional[tornado.httpserver.HTTPServer] = None def create_application(self): return tornado.web.Application([ @@ -65,6 +71,10 @@ def create_application(self): PredictHandler, dict(models=self.registered_models)), (r"/v1/models/([a-zA-Z0-9_-]+):explain", ExplainHandler, dict(models=self.registered_models)), + (r"/v1/models/([a-zA-Z0-9_-]+)/load", + LoadHandler, dict(models=self.registered_models)), + (r"/v1/models/([a-zA-Z0-9_-]+)/unload", + UnloadHandler, dict(models=self.registered_models)), ]) def start(self, models: List[KFModel], nest_asyncio: bool = False): @@ -92,7 +102,7 @@ def register_model(self, model: KFModel): if not model.name: raise Exception( "Failed to register model, model.name must be provided.") - self.registered_models[model.name] = model + self.registered_models.update(model) logging.info("Registering model: %s", model.name) @@ -102,17 +112,17 @@ def get(self): class HealthHandler(tornado.web.RequestHandler): - def initialize(self, models: Dict[str, KFModel]): + def initialize(self, models: KFModelRepository): self.models = models # pylint:disable=attribute-defined-outside-init def get(self, name: str): - if name not in self.models: + model = self.models.get_model(name) + if model is None: raise tornado.web.HTTPError( status_code=404, reason="Model with name %s does not exist." % name ) - model = self.models[name] if not model.ready: raise tornado.web.HTTPError( status_code=503, @@ -126,8 +136,52 @@ def get(self, name: str): class ListHandler(tornado.web.RequestHandler): - def initialize(self, models: Dict[str, KFModel]): + def initialize(self, models: KFModelRepository): self.models = models # pylint:disable=attribute-defined-outside-init def get(self): - self.write(json.dumps(list(self.models.values()))) + self.write(json.dumps([ob.name for ob in self.models.get_models()])) + + +class LoadHandler(tornado.web.RequestHandler): + def initialize(self, models: KFModelRepository): # pylint:disable=attribute-defined-outside-init + self.models = models + + async def post(self, name: str): + try: + (await self.models.load(name)) if inspect.iscoroutinefunction(self.models.load) else self.models.load(name) + except Exception as e: + ex_type, ex_value, ex_traceback = sys.exc_info() + raise tornado.web.HTTPError( + status_code=500, + reason=f"Model with name {name} is not ready. " + f"Error type: {ex_type} error msg: {ex_value}" + ) + + if not self.models.is_model_ready(name): + raise tornado.web.HTTPError( + status_code=503, + reason=f"Model with name {name} is not ready." + ) + self.write(json.dumps({ + "name": name, + "load": True + })) + + +class UnloadHandler(tornado.web.RequestHandler): + def initialize(self, models: KFModelRepository): # pylint:disable=attribute-defined-outside-init + self.models = models + + def post(self, name: str): + try: + self.models.unload(name) + except KeyError: + raise tornado.web.HTTPError( + status_code=404, + reason="Model with name %s does not exist." % name + ) + self.write(json.dumps({ + "name": name, + "unload": True + })) diff --git a/python/kfserving/kfserving/storage.py b/python/kfserving/kfserving/storage.py index dfda1f5353b..664e22506a1 100644 --- a/python/kfserving/kfserving/storage.py +++ b/python/kfserving/kfserving/storage.py @@ -50,6 +50,8 @@ def download(uri: str, out_dir: str = None) -> str: # noop if out_dir is not set and the path is local return Storage._download_local(uri) out_dir = tempfile.mkdtemp() + elif not os.path.exists(out_dir): + os.mkdir(out_dir) if uri.startswith(_GCS_PREFIX): Storage._download_gcs(uri, out_dir) diff --git a/python/kfserving/requirements.txt b/python/kfserving/requirements.txt index 455c148da27..37d0f4d52d8 100644 --- a/python/kfserving/requirements.txt +++ b/python/kfserving/requirements.txt @@ -11,4 +11,4 @@ google-cloud-storage>=1.16.0 adal>=1.2.2 table_logger>=0.3.5 numpy>=1.17.3 -azure-storage-blob>=1.3.0,<=2.1.0 \ No newline at end of file +azure-storage-blob>=1.3.0,<=2.1.0 diff --git a/python/kfserving/test/test_server.py b/python/kfserving/test/test_server.py index dad18002a33..7dcb57ddc67 100644 --- a/python/kfserving/test/test_server.py +++ b/python/kfserving/test/test_server.py @@ -13,11 +13,13 @@ # limitations under the License. import pytest -import kfserving +from kfserving import kfmodel +from kfserving import kfserver from tornado.httpclient import HTTPClientError +from kfserving.kfmodel_repository import KFModelRepository -class DummyModel(kfserving.KFModel): +class DummyModel(kfmodel.KFModel): def __init__(self, name): super().__init__(name) self.name = name @@ -33,13 +35,28 @@ async def explain(self, request): return {"predictions": request["instances"]} +class DummyKFModelRepository(KFModelRepository): + def __init__(self, test_load_success: bool): + super().__init__() + self.test_load_success = test_load_success + + async def load(self, name: str) -> bool: + if self.test_load_success: + model = DummyModel(name) + model.load() + self.update(model) + return model.ready + else: + return False + + class TestTFHttpServer(): @pytest.fixture(scope="class") def app(self): # pylint: disable=no-self-use model = DummyModel("TestModel") model.load() - server = kfserving.KFServer() + server = kfserver.KFServer() server.register_model(model) return server.create_application() @@ -67,13 +84,56 @@ async def test_explain(self, http_server_client): assert resp.body == b'{"predictions": [[1, 2]]}' assert resp.headers['content-type'] == "application/json; charset=UTF-8" + async def test_list(self, http_server_client): + resp = await http_server_client.fetch('/v1/models') + assert resp.code == 200 + assert resp.body == b'["TestModel"]' + + +class TestTFHttpServerLoadAndUnLoad(): + @pytest.fixture(scope="class") + def app(self): # pylint: disable=no-self-use + server = kfserver.KFServer(registered_models=DummyKFModelRepository(test_load_success=True)) + return server.create_application() + + async def test_load(self, http_server_client): + resp = await http_server_client.fetch('/v1/models/model/load', + method="POST", body=b'') + assert resp.code == 200 + assert resp.body == b'{"name": "model", "load": true}' + + async def test_unload(self, http_server_client): + resp = await http_server_client.fetch('/v1/models/model/unload', + method="POST", body=b'') + assert resp.code == 200 + assert resp.body == b'{"name": "model", "unload": true}' + + +class TestTFHttpServerLoadAndUnLoadFailure(): + @pytest.fixture(scope="class") + def app(self): # pylint: disable=no-self-use + server = kfserver.KFServer(registered_models=DummyKFModelRepository(test_load_success=False)) + return server.create_application() + + async def test_load_fail(self, http_server_client): + with pytest.raises(HTTPClientError) as err: + _ = await http_server_client.fetch('/v1/models/model/load', + method="POST", body=b'') + assert err.value.code == 503 + + async def test_unload_fail(self, http_server_client): + with pytest.raises(HTTPClientError) as err: + _ = await http_server_client.fetch('/v1/models/model/unload', + method="POST", body=b'') + assert err.value.code == 404 + class TestTFHttpServerModelNotLoaded(): @pytest.fixture(scope="class") def app(self): # pylint: disable=no-self-use model = DummyModel("TestModel") - server = kfserving.KFServer() + server = kfserver.KFServer() server.register_model(model) return server.create_application() diff --git a/python/pytorchserver/pytorchserver/example_model/cifar10.py b/python/pytorchserver/pytorchserver/example_model/model/cifar10.py similarity index 100% rename from python/pytorchserver/pytorchserver/example_model/cifar10.py rename to python/pytorchserver/pytorchserver/example_model/model/cifar10.py diff --git a/python/pytorchserver/pytorchserver/example_model/model.pt b/python/pytorchserver/pytorchserver/example_model/model/model.pt similarity index 100% rename from python/pytorchserver/pytorchserver/example_model/model.pt rename to python/pytorchserver/pytorchserver/example_model/model/model.pt diff --git a/python/pytorchserver/pytorchserver/model.py b/python/pytorchserver/pytorchserver/model.py index d9a159f6624..c2fdb17b69a 100644 --- a/python/pytorchserver/pytorchserver/model.py +++ b/python/pytorchserver/pytorchserver/model.py @@ -32,8 +32,8 @@ def __init__(self, name: str, model_class_name: str, model_dir: str): self.model = None self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu') - def load(self): - model_file_dir = kfserving.Storage.download(self.model_dir) + def load(self) -> bool: + model_file_dir = kfserving.Storage.download(self.model_dir, self.name) model_file = os.path.join(model_file_dir, PYTORCH_FILE) py_files = [] for filename in os.listdir(model_file_dir): @@ -58,6 +58,7 @@ def load(self): self.model.load_state_dict(torch.load(model_file, map_location=self.device)) self.model.eval() self.ready = True + return self.ready def predict(self, request: Dict) -> Dict: inputs = [] diff --git a/python/pytorchserver/pytorchserver/test_model.py b/python/pytorchserver/pytorchserver/test_model.py index 30656049f36..3c7f1de5ea0 100644 --- a/python/pytorchserver/pytorchserver/test_model.py +++ b/python/pytorchserver/pytorchserver/test_model.py @@ -18,12 +18,11 @@ import torchvision.transforms as transforms import os -model_dir = model_dir = os.path.join(os.path.dirname(__file__), "example_model") -MODEL_FILE = "model.pt" +model_dir = model_dir = os.path.join(os.path.dirname(__file__), "example_model", "model") def test_model(): - server = PyTorchModel("pytorchmodel", "Net", model_dir) + server = PyTorchModel("model", "Net", model_dir) server.load() transform = transforms.Compose([transforms.ToTensor(), @@ -35,6 +34,6 @@ def test_model(): dataiter = iter(testloader) images, _ = dataiter.next() - request = {"instances" : images[0:1].tolist()} + request = {"instances": images[0:1].tolist()} response = server.predict(request) - assert isinstance(response["instances"][0], list) + assert isinstance(response["predictions"][0], list) diff --git a/python/sklearnserver/setup.py b/python/sklearnserver/setup.py index 75d996135a5..ab4efa4824b 100644 --- a/python/sklearnserver/setup.py +++ b/python/sklearnserver/setup.py @@ -16,6 +16,7 @@ tests_require = [ 'pytest', + 'pytest-asyncio', 'pytest-tornasync', 'mypy' ] diff --git a/python/sklearnserver/sklearnserver/__init__.py b/python/sklearnserver/sklearnserver/__init__.py index 3fab53d97b3..fe4f23dd0db 100644 --- a/python/sklearnserver/sklearnserver/__init__.py +++ b/python/sklearnserver/sklearnserver/__init__.py @@ -13,3 +13,4 @@ # limitations under the License. from .model import SKLearnModel +from .sklearn_model_repository import SKLearnModelRepository diff --git a/python/sklearnserver/sklearnserver/__init__.pyc b/python/sklearnserver/sklearnserver/__init__.pyc deleted file mode 100644 index 697bc7646168de52867dbde44564aefa0e970d94..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 170 zcmZSn%*z$O_IgY*0~9a;X$K%K76B3|K*Y$9!@!Ws$PmTIz?j0s5Ujxrl*nWR5*i?) zga=3jd;6p&7UlWor=;fiX@HC#!JvUk6iWj6#o0L^O~t82WvNB_@$s2? bnI-Y@dIgmw96(tcpytw?R6CFz#UR}PJsux5 diff --git a/python/sklearnserver/sklearnserver/__main__.py b/python/sklearnserver/sklearnserver/__main__.py index 30bbbb36a38..11c5e0a9882 100644 --- a/python/sklearnserver/sklearnserver/__main__.py +++ b/python/sklearnserver/sklearnserver/__main__.py @@ -12,11 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import kfserving import argparse +import logging +import sys - -from sklearnserver import SKLearnModel +import kfserving +from sklearnserver import SKLearnModel, SKLearnModelRepository DEFAULT_MODEL_NAME = "model" DEFAULT_LOCAL_MODEL_DIR = "/tmp/model" @@ -30,5 +31,11 @@ if __name__ == "__main__": model = SKLearnModel(args.model_name, args.model_dir) - model.load() - kfserving.KFServer().start([model]) + try: + model.load() + except Exception as e: + ex_type, ex_value, _ = sys.exc_info() + logging.error(f"fail to load model {args.model_name} from dir {args.model_dir}. " + f"exception type {ex_type}, exception msg: {ex_value}") + model.ready = False + kfserving.KFServer(registered_models=SKLearnModelRepository(args.model_dir)).start([model] if model.ready else []) diff --git a/python/sklearnserver/sklearnserver/example_models/joblib/model.joblib b/python/sklearnserver/sklearnserver/example_models/joblib/model/model.joblib similarity index 100% rename from python/sklearnserver/sklearnserver/example_models/joblib/model.joblib rename to python/sklearnserver/sklearnserver/example_models/joblib/model/model.joblib diff --git a/python/sklearnserver/sklearnserver/example_models/pickle/model.pickle b/python/sklearnserver/sklearnserver/example_models/pickle/model/model.pickle similarity index 100% rename from python/sklearnserver/sklearnserver/example_models/pickle/model.pickle rename to python/sklearnserver/sklearnserver/example_models/pickle/model/model.pickle diff --git a/python/sklearnserver/sklearnserver/example_models/pkl/model/model.pkl b/python/sklearnserver/sklearnserver/example_models/pkl/model/model.pkl new file mode 100644 index 0000000000000000000000000000000000000000..67aeb9a99cda737cbd4de94a24e391a3341be2bb GIT binary patch literal 4516 zcmcIo>u(fQ6yLVAEZc|ly=_4dMasjq@+?@{LXiRs1-Cp_9*(oyX?N+q&dh9UF=~k^ zP>Ci;FbaQw;fulO2aFK#i;<{_0?}X$@r(M+6dweIfalIVX9i|L!01hWJ2Us5$M2kb z?%laFTog0QhrG6};BH z5mS>IOt9`_8LhVke4Qv_HOq>bNkVC7a6@$6*-J;SMGA1F1IjvsVHK4Kwa5uz2b7_kH)GTe?>j#zeI%h|P#Ch^>fih#iP1LPP9Cq!AeeAWXy{VhHgf;$_4uh~0=+5hI8_h*89A zi2aDy5pN(4Al^hAL>xjuy9vvpk%@TA*&NVfW=1y@Sk77P5LsOjhNrEphQ{rImGNn# z%ZG6heffoGb?v=Yt7xBRwW~pSJr_Luj%c;iUuuub{xd^5((a$G@hY-k*e?10Zu7g)zQB-za^ws<%*+$^n5nITl2vAE3)OJ-!aO6BjtCz?Vs}` z^W(S2Wxo@@9ItbYm)ZPyT>A6c{N?kNptR?w{1xPl%Q))o{K~qjCi~ZD9vet+9LYKP zW&ZM7V&_NN`OS`B^5OZZq&!?E`%Uy*LUC5xRoVK?UtYa-ex%+NJHIl2g``($&##oLqi4>4 zCB@JA^^;#O@ptM+>ht<9q;Vg~bA9sq=KSzHT_S#5A5MNGUtY&t53FD1nx96BYl59$ z$;(6iyuOP`ua@MlQ~lNvU$)PDt-k{_<*pCT*EPF7WSw*Uv;GyTPu^ckh!4l(AwCX2 zX~$U~a$fz!*QsCGe~s)r^K*OsNfe)#{Bgdy{-%)q3Oiq%Ke^6$f8zbk9iQK>A31Lv zud}~$eM-Ce`IG(3&xv2g(w_;s59 ze0|QJA6m`-+-t-?cPI>N={0U$eCcRMnRxk~jPN6% z%Y^mPKu=ChD&h@Zfemcn8DStChHhb?Fx0jOHj2Og)8f!T?dPHCod;H(r%-sy8Ve?F zoCqA7`axblsSV%Xb?lT|{*#Z+hfhwtkk{{)yT9jrKmShOZO8rW8NNTe`SJ_q9`%-2 zesjn8PaVJU`FAINyyN)Y{MYP?9(=F)>mbQ_KK=D%L({v5g5SPh@kiZXU)lYK-#*g2 zzx*OSJM%B~?)90yxaIHE@0J@2Zu@0r^XfBa@51BW?Va2AegC#2Ke_C&K3{oW|F0|0 z`+sjAHkq(luKYwA&%H4%i$|#~yz(PrL zJ0(5_1o}Qc1=~&7A&+(9K)y_Y=h%=U3_TwPRT#o!Zd4fZK+J?5M`5`F8Y}b)g}yMv zNg;s>aTMPfwGRBf3DY`|=kSQqAM#jxpY<(65hIWcy(La4DN9zlDY7)%&j|b3FaSlS t|8r!938wU{+_XdH;@#G%NulpA3Vuxx-&bJ3I@`7c=C%ap!C)>L_!|vQ+KT`H literal 0 HcmV?d00001 diff --git a/python/sklearnserver/sklearnserver/model.py b/python/sklearnserver/sklearnserver/model.py index 8a16dddac59..f5d37b2da27 100644 --- a/python/sklearnserver/sklearnserver/model.py +++ b/python/sklearnserver/sklearnserver/model.py @@ -16,26 +16,29 @@ import joblib import numpy as np import os -from typing import List, Dict +from typing import Dict MODEL_BASENAME = "model" MODEL_EXTENSIONS = [".joblib", ".pkl", ".pickle"] -class SKLearnModel(kfserving.KFModel): #pylint:disable=c-extension-no-member +class SKLearnModel(kfserving.KFModel): # pylint:disable=c-extension-no-member def __init__(self, name: str, model_dir: str): super().__init__(name) self.name = name self.model_dir = model_dir self.ready = False - def load(self): + def load(self) -> bool: model_path = kfserving.Storage.download(self.model_dir) paths = [os.path.join(model_path, MODEL_BASENAME + model_extension) for model_extension in MODEL_EXTENSIONS] - model_file = next(path for path in paths if os.path.exists(path)) - self._model = joblib.load(model_file) #pylint:disable=attribute-defined-outside-init - self.ready = True + for path in paths: + if os.path.exists(path): + self._model = joblib.load(path) + self.ready = True + break + return self.ready def predict(self, request: Dict) -> Dict: instances = request["instances"] @@ -46,6 +49,6 @@ def predict(self, request: Dict) -> Dict: "Failed to initialize NumPy array from inputs: %s, %s" % (e, instances)) try: result = self._model.predict(inputs).tolist() - return { "predictions" : result } + return {"predictions": result} except Exception as e: raise Exception("Failed to predict %s" % e) diff --git a/python/sklearnserver/sklearnserver/sklearn_model_repository.py b/python/sklearnserver/sklearnserver/sklearn_model_repository.py new file mode 100644 index 00000000000..867406aee72 --- /dev/null +++ b/python/sklearnserver/sklearnserver/sklearn_model_repository.py @@ -0,0 +1,29 @@ +# Copyright 2020 kubeflow.org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from kfserving.kfmodel_repository import KFModelRepository, MODEL_MOUNT_DIRS +from sklearnserver import SKLearnModel + + +class SKLearnModelRepository(KFModelRepository): + + def __init__(self, model_dir: str = MODEL_MOUNT_DIRS): + super().__init__(model_dir) + + async def load(self, name: str) -> bool: + model = SKLearnModel(name, os.path.join(self.models_dir, name)) + if model.load(): + self.update(model) + return model.ready diff --git a/python/sklearnserver/sklearnserver/test_model.py b/python/sklearnserver/sklearnserver/test_model.py index 1f861f6844c..628c8fe5c1e 100644 --- a/python/sklearnserver/sklearnserver/test_model.py +++ b/python/sklearnserver/sklearnserver/test_model.py @@ -20,9 +20,9 @@ import os _MODEL_DIR = os.path.join(os.path.dirname(__file__), "example_models") -JOBLIB_FILE = [os.path.join(_MODEL_DIR, "joblib"), "model.joblib"] -PICKLE_FILES = [[os.path.join(_MODEL_DIR, "pkl"), "model.pkl"], - [os.path.join(_MODEL_DIR, "pickle"), "model.pickle"]] +JOBLIB_FILE = [os.path.join(_MODEL_DIR, "joblib", "model"), "model.joblib"] +PICKLE_FILES = [[os.path.join(_MODEL_DIR, "pkl", "model"), "model.pkl"], + [os.path.join(_MODEL_DIR, "pickle", "model"), "model.pickle"]] def _train_sample_model(): @@ -37,7 +37,7 @@ def _run_pickle_model(model_dir, model_name): sklearn_model, data = _train_sample_model() model_file = os.path.join(model_dir, model_name) pickle.dump(sklearn_model, open(model_file, 'wb')) - model = SKLearnModel("sklearnmodel", model_dir) + model = SKLearnModel("model", model_dir) model.load() request = data[0:1].tolist() response = model.predict({"instances": request}) @@ -48,7 +48,7 @@ def test_model_joblib(): sklearn_model, data = _train_sample_model() model_file = os.path.join(JOBLIB_FILE[0], JOBLIB_FILE[1]) joblib.dump(value=sklearn_model, filename=model_file) - model = SKLearnModel("sklearnmodel", JOBLIB_FILE[0]) + model = SKLearnModel("model", JOBLIB_FILE[0]) model.load() request = data[0:1].tolist() response = model.predict({"instances": request}) diff --git a/python/sklearnserver/sklearnserver/test_sklearn_model_repository.py b/python/sklearnserver/sklearnserver/test_sklearn_model_repository.py new file mode 100644 index 00000000000..8fe5049a500 --- /dev/null +++ b/python/sklearnserver/sklearnserver/test_sklearn_model_repository.py @@ -0,0 +1,51 @@ +# Copyright 2020 kubeflow.org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pytest +from sklearnserver import SKLearnModelRepository + +_MODEL_DIR = os.path.join(os.path.dirname(__file__), "example_models") +JOBLIB_FILE_DIR = os.path.join(_MODEL_DIR, "joblib") +PICKLE_FILE_DIRS = [os.path.join(_MODEL_DIR, "pkl"), os.path.join(_MODEL_DIR, "pickle")] +INVALID_MODEL_DIR = os.path.join(os.path.dirname(__file__), "models_not_exist") + + +@pytest.mark.asyncio +async def test_load_pickle(): + for model_dir in PICKLE_FILE_DIRS: + repo = SKLearnModelRepository(model_dir) + model_name = "model" + await repo.load(model_name) + assert repo.get_model(model_name) is not None + assert repo.is_model_ready(model_name) + + +@pytest.mark.asyncio +async def test_load_joblib(): + repo = SKLearnModelRepository(JOBLIB_FILE_DIR) + model_name = "model" + await repo.load(model_name) + assert repo.get_model(model_name) is not None + assert repo.is_model_ready(model_name) + + +@pytest.mark.asyncio +async def test_load_fail(): + repo = SKLearnModelRepository(INVALID_MODEL_DIR) + model_name = "model" + with pytest.raises(Exception): + await repo.load(model_name) + assert repo.get_model(model_name) is None + assert not repo.is_model_ready(model_name) diff --git a/python/xgbserver/setup.py b/python/xgbserver/setup.py index b12fc2fdcc1..34974ed8a84 100644 --- a/python/xgbserver/setup.py +++ b/python/xgbserver/setup.py @@ -16,6 +16,7 @@ tests_require = [ 'pytest', + 'pytest-asyncio', 'pytest-tornasync', 'mypy' ] diff --git a/python/xgbserver/xgbserver/__init__.py b/python/xgbserver/xgbserver/__init__.py index 2ccfb91fdb7..7f9c0ed79d8 100644 --- a/python/xgbserver/xgbserver/__init__.py +++ b/python/xgbserver/xgbserver/__init__.py @@ -13,3 +13,4 @@ # limitations under the License. from .model import XGBoostModel +from .xgboost_model_repository import XGBoostModelRepository diff --git a/python/xgbserver/xgbserver/__main__.py b/python/xgbserver/xgbserver/__main__.py index 06f64889377..ece5fc3b6eb 100644 --- a/python/xgbserver/xgbserver/__main__.py +++ b/python/xgbserver/xgbserver/__main__.py @@ -12,16 +12,19 @@ # See the License for the specific language governing permissions and # limitations under the License. -import kfserving import argparse +import logging +import sys +import kfserving -from xgbserver import XGBoostModel + +from xgbserver import XGBoostModel, XGBoostModelRepository DEFAULT_MODEL_NAME = "default" DEFAULT_LOCAL_MODEL_DIR = "/tmp/model" DEFAULT_NTHREAD = 1 -parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser]) #pylint:disable=c-extension-no-member +parser = argparse.ArgumentParser(parents=[kfserving.kfserver.parser]) # pylint:disable=c-extension-no-member parser.add_argument('--model_dir', required=True, help='A URI pointer to the model directory') parser.add_argument('--model_name', default=DEFAULT_MODEL_NAME, @@ -32,5 +35,12 @@ if __name__ == "__main__": model = XGBoostModel(args.model_name, args.model_dir, args.nthread) - model.load() - kfserving.KFServer().start([model]) #pylint:disable=c-extension-no-member + try: + model.load() + except Exception as e: + ex_type, ex_value, _ = sys.exc_info() + logging.error(f"fail to load model {args.model_name} from dir {args.model_dir}. " + f"exception type {ex_type}, exception msg: {ex_value}") + model.ready = False + + kfserving.KFServer(registered_models=XGBoostModelRepository(args.model_dir, args.nthread)).start([model] if model.ready else []) # pylint:disable=c-extension-no-member diff --git a/python/xgbserver/xgbserver/example_model/model.bst b/python/xgbserver/xgbserver/example_model/model/model.bst similarity index 100% rename from python/xgbserver/xgbserver/example_model/model.bst rename to python/xgbserver/xgbserver/example_model/model/model.bst diff --git a/python/xgbserver/xgbserver/model.py b/python/xgbserver/xgbserver/model.py index 2e16c650728..e42bc41e097 100644 --- a/python/xgbserver/xgbserver/model.py +++ b/python/xgbserver/xgbserver/model.py @@ -16,14 +16,14 @@ import xgboost as xgb from xgboost import XGBModel import os -import numpy as np -from typing import List, Dict +from typing import Dict BOOSTER_FILE = "model.bst" + class XGBoostModel(kfserving.KFModel): - def __init__(self, name: str, model_dir: str, nthread: int, booster: \ - XGBModel = None): + def __init__(self, name: str, model_dir: str, nthread: int, + booster: XGBModel = None): super().__init__(name) self.name = name self.model_dir = model_dir @@ -32,18 +32,19 @@ def __init__(self, name: str, model_dir: str, nthread: int, booster: \ self._booster = booster self.ready = True - def load(self): + def load(self) -> bool: model_file = os.path.join( kfserving.Storage.download(self.model_dir), BOOSTER_FILE) - self._booster = xgb.Booster(params={"nthread" : self.nthread}, + self._booster = xgb.Booster(params={"nthread": self.nthread}, model_file=model_file) self.ready = True + return self.ready def predict(self, request: Dict) -> Dict: try: # Use of list as input is deprecated see https://github.com/dmlc/xgboost/pull/3970 dmatrix = xgb.DMatrix(request["instances"], nthread=self.nthread) result: xgb.DMatrix = self._booster.predict(dmatrix) - return { "predictions": result.tolist() } + return {"predictions": result.tolist()} except Exception as e: raise Exception("Failed to predict %s" % e) diff --git a/python/xgbserver/xgbserver/test_model.py b/python/xgbserver/xgbserver/test_model.py index 2e8329b6cfc..a45040960be 100644 --- a/python/xgbserver/xgbserver/test_model.py +++ b/python/xgbserver/xgbserver/test_model.py @@ -17,10 +17,11 @@ from sklearn.datasets import load_iris from xgbserver import XGBoostModel -model_dir = model_dir = os.path.join(os.path.dirname(__file__), "example_model") +model_dir = os.path.join(os.path.dirname(__file__), "example_model", "model") BST_FILE = "model.bst" NTHREAD = 1 + def test_model(): iris = load_iris() y = iris['target'] @@ -34,9 +35,9 @@ def test_model(): 'objective': 'multi:softmax' } xgb_model = xgb.train(params=param, dtrain=dtrain) - model_file = os.path.join((model_dir), BST_FILE) + model_file = os.path.join(model_dir, BST_FILE) xgb_model.save_model(model_file) - model = XGBoostModel("xgbmodel", model_dir, NTHREAD) + model = XGBoostModel("model", model_dir, NTHREAD) model.load() request = [X[0].tolist()] response = model.predict({"instances": request}) diff --git a/python/xgbserver/xgbserver/test_xgboost_model_repository.py b/python/xgbserver/xgbserver/test_xgboost_model_repository.py new file mode 100644 index 00000000000..a733d38d715 --- /dev/null +++ b/python/xgbserver/xgbserver/test_xgboost_model_repository.py @@ -0,0 +1,39 @@ +# Copyright 2020 kubeflow.org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import pytest +from xgbserver import XGBoostModelRepository + +model_dir = os.path.join(os.path.dirname(__file__), "example_model") +invalid_model_dir = os.path.join(os.path.dirname(__file__), "model_not_exist", "model") + + +@pytest.mark.asyncio +async def test_load(): + repo = XGBoostModelRepository(model_dir=model_dir, nthread=1) + model_name = "model" + await repo.load(model_name) + assert repo.get_model(model_name) is not None + assert repo.is_model_ready(model_name) + + +@pytest.mark.asyncio +async def test_load_fail(): + repo = XGBoostModelRepository(model_dir=model_dir, nthread=1) + model_name = "model" + with pytest.raises(Exception): + await repo.load(model_name) + assert repo.get_model(model_name) is None + assert not repo.is_model_ready(model_name) diff --git a/python/xgbserver/xgbserver/xgboost_model_repository.py b/python/xgbserver/xgbserver/xgboost_model_repository.py new file mode 100644 index 00000000000..e353b560b54 --- /dev/null +++ b/python/xgbserver/xgbserver/xgboost_model_repository.py @@ -0,0 +1,29 @@ +# Copyright 2020 kubeflow.org. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from kfserving.kfmodel_repository import KFModelRepository, MODEL_MOUNT_DIRS +from xgbserver import XGBoostModel + + +class XGBoostModelRepository(KFModelRepository): + def __init__(self, model_dir: str = MODEL_MOUNT_DIRS, nthread: int = 1): + super().__init__(model_dir) + self.nthread = nthread + + async def load(self, name: str, ) -> bool: + model = XGBoostModel(name, os.path.join(self.models_dir, name), self.nthread) + if model.load(): + self.update(model) + return model.ready