Skip to content

Commit

Permalink
kfserver add load/unload endpoint (kubeflow#1082)
Browse files Browse the repository at this point in the history
* kfserver add load/unload endpoint

* address comments

* revert license

* fix python packages bugs

* revert crd serving.kubeflow.org_inferenceservices

* add missing packages in setup.py

* update package import and trigger build/test

* revert back pytorchserver

* update

* fix sklearn docker

* fix model package init

* revert back sklearn/xgboost and add SKLearnModelRepository/XGBoostModelRepository

* remove unused file

* mount directory follows /mnt/models/

* remove unused files

* create storage destination dir if not exist

* model dir -> /mnt/models/

* fix unit test

* revert back model repo

* pass model dir to KFModelRepository

* move registered_models to the last on the argument list
  • Loading branch information
wengyao04 authored Sep 16, 2020
1 parent 67e3d1a commit d883c34
Show file tree
Hide file tree
Showing 31 changed files with 406 additions and 62 deletions.
2 changes: 1 addition & 1 deletion python/alibiexplainer/alibiexplainer/explainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def __init__( # pylint:disable=too-many-arguments
else:
raise NotImplementedError

def load(self):
def load(self) -> bool:
pass

def _predict_fn(self, arr: Union[np.ndarray, List]) -> np.ndarray:
Expand Down
11 changes: 5 additions & 6 deletions python/kfserving/kfserving/handlers/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,21 @@
import inspect
import tornado.web
import json
from typing import Dict
from http import HTTPStatus
from kfserving.kfmodel import KFModel
from kfserving.kfmodel_repository import KFModelRepository


class HTTPHandler(tornado.web.RequestHandler):
def initialize(self, models: Dict[str, KFModel]):
self.models = models # pylint:disable=attribute-defined-outside-init
def initialize(self, models: KFModelRepository):
self.models = models # pylint:disable=attribute-defined-outside-init

def get_model(self, name: str):
if name not in self.models:
model = self.models.get_model(name)
if model is None:
raise tornado.web.HTTPError(
status_code=HTTPStatus.NOT_FOUND,
reason="Model with name %s does not exist." % name
)
model = self.models[name]
if not model.ready:
model.load()
return model
Expand Down
3 changes: 2 additions & 1 deletion python/kfserving/kfserving/kfmodel.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ def _http_client(self):
self._http_client_instance = AsyncHTTPClient(max_clients=sys.maxsize)
return self._http_client_instance

def load(self):
def load(self) -> bool:
self.ready = True
return self.ready

def preprocess(self, request: Dict) -> Dict:
return request
Expand Down
54 changes: 54 additions & 0 deletions python/kfserving/kfserving/kfmodel_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Copyright 2020 kubeflow.org.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import List, Optional
from kfserving import KFModel

MODEL_MOUNT_DIRS = "/mnt/models"


class KFModelRepository:
"""
Model repository interface, follows NVIDIA Triton's `model-repository`
extension.
"""

def __init__(self, models_dir: str = MODEL_MOUNT_DIRS):
self.models = {}
self.models_dir = models_dir

def set_models_dir(self, models_dir): # used for unit tests
self.models_dir = models_dir

def get_model(self, name: str) -> Optional[KFModel]:
return self.models.get(name, None)

def get_models(self) -> List[KFModel]:
return list(self.models.values())

def is_model_ready(self, name: str):
model = self.get_model(name)
return False if model is None else model.ready

def update(self, model: KFModel):
self.models[model.name] = model

def load(self, name: str) -> bool:
pass

def unload(self, name: str):
if name in self.models:
del self.models[name]
else:
raise KeyError(f"model {name} does not exist")
74 changes: 64 additions & 10 deletions python/kfserving/kfserving/kfserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
import argparse
import logging
import json
from typing import List, Dict
import inspect
import sys
from typing import List, Optional
import tornado.ioloop
import tornado.web
import tornado.httpserver
import tornado.log

from kfserving.handlers.http import PredictHandler, ExplainHandler
from kfserving import KFModel
from kfserving.kfmodel_repository import KFModelRepository

DEFAULT_HTTP_PORT = 8080
DEFAULT_GRPC_PORT = 8081
Expand All @@ -40,17 +44,19 @@

tornado.log.enable_pretty_logging()


class KFServer:
def __init__(self, http_port: int = args.http_port,
grpc_port: int = args.grpc_port,
max_buffer_size: int = args.max_buffer_size,
workers: int = args.workers):
self.registered_models = {}
workers: int = args.workers,
registered_models: KFModelRepository = KFModelRepository()):
self.registered_models = registered_models
self.http_port = http_port
self.grpc_port = grpc_port
self.max_buffer_size = max_buffer_size
self.workers = workers
self._http_server = None
self._http_server: Optional[tornado.httpserver.HTTPServer] = None

def create_application(self):
return tornado.web.Application([
Expand All @@ -65,6 +71,10 @@ def create_application(self):
PredictHandler, dict(models=self.registered_models)),
(r"/v1/models/([a-zA-Z0-9_-]+):explain",
ExplainHandler, dict(models=self.registered_models)),
(r"/v1/models/([a-zA-Z0-9_-]+)/load",
LoadHandler, dict(models=self.registered_models)),
(r"/v1/models/([a-zA-Z0-9_-]+)/unload",
UnloadHandler, dict(models=self.registered_models)),
])

def start(self, models: List[KFModel], nest_asyncio: bool = False):
Expand Down Expand Up @@ -92,7 +102,7 @@ def register_model(self, model: KFModel):
if not model.name:
raise Exception(
"Failed to register model, model.name must be provided.")
self.registered_models[model.name] = model
self.registered_models.update(model)
logging.info("Registering model: %s", model.name)


Expand All @@ -102,17 +112,17 @@ def get(self):


class HealthHandler(tornado.web.RequestHandler):
def initialize(self, models: Dict[str, KFModel]):
def initialize(self, models: KFModelRepository):
self.models = models # pylint:disable=attribute-defined-outside-init

def get(self, name: str):
if name not in self.models:
model = self.models.get_model(name)
if model is None:
raise tornado.web.HTTPError(
status_code=404,
reason="Model with name %s does not exist." % name
)

model = self.models[name]
if not model.ready:
raise tornado.web.HTTPError(
status_code=503,
Expand All @@ -126,8 +136,52 @@ def get(self, name: str):


class ListHandler(tornado.web.RequestHandler):
def initialize(self, models: Dict[str, KFModel]):
def initialize(self, models: KFModelRepository):
self.models = models # pylint:disable=attribute-defined-outside-init

def get(self):
self.write(json.dumps(list(self.models.values())))
self.write(json.dumps([ob.name for ob in self.models.get_models()]))


class LoadHandler(tornado.web.RequestHandler):
def initialize(self, models: KFModelRepository): # pylint:disable=attribute-defined-outside-init
self.models = models

async def post(self, name: str):
try:
(await self.models.load(name)) if inspect.iscoroutinefunction(self.models.load) else self.models.load(name)
except Exception as e:
ex_type, ex_value, ex_traceback = sys.exc_info()
raise tornado.web.HTTPError(
status_code=500,
reason=f"Model with name {name} is not ready. "
f"Error type: {ex_type} error msg: {ex_value}"
)

if not self.models.is_model_ready(name):
raise tornado.web.HTTPError(
status_code=503,
reason=f"Model with name {name} is not ready."
)
self.write(json.dumps({
"name": name,
"load": True
}))


class UnloadHandler(tornado.web.RequestHandler):
def initialize(self, models: KFModelRepository): # pylint:disable=attribute-defined-outside-init
self.models = models

def post(self, name: str):
try:
self.models.unload(name)
except KeyError:
raise tornado.web.HTTPError(
status_code=404,
reason="Model with name %s does not exist." % name
)
self.write(json.dumps({
"name": name,
"unload": True
}))
2 changes: 2 additions & 0 deletions python/kfserving/kfserving/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ def download(uri: str, out_dir: str = None) -> str:
# noop if out_dir is not set and the path is local
return Storage._download_local(uri)
out_dir = tempfile.mkdtemp()
elif not os.path.exists(out_dir):
os.mkdir(out_dir)

if uri.startswith(_GCS_PREFIX):
Storage._download_gcs(uri, out_dir)
Expand Down
2 changes: 1 addition & 1 deletion python/kfserving/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ google-cloud-storage>=1.16.0
adal>=1.2.2
table_logger>=0.3.5
numpy>=1.17.3
azure-storage-blob>=1.3.0,<=2.1.0
azure-storage-blob>=1.3.0,<=2.1.0
68 changes: 64 additions & 4 deletions python/kfserving/test/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

import pytest
import kfserving
from kfserving import kfmodel
from kfserving import kfserver
from tornado.httpclient import HTTPClientError
from kfserving.kfmodel_repository import KFModelRepository


class DummyModel(kfserving.KFModel):
class DummyModel(kfmodel.KFModel):
def __init__(self, name):
super().__init__(name)
self.name = name
Expand All @@ -33,13 +35,28 @@ async def explain(self, request):
return {"predictions": request["instances"]}


class DummyKFModelRepository(KFModelRepository):
def __init__(self, test_load_success: bool):
super().__init__()
self.test_load_success = test_load_success

async def load(self, name: str) -> bool:
if self.test_load_success:
model = DummyModel(name)
model.load()
self.update(model)
return model.ready
else:
return False


class TestTFHttpServer():

@pytest.fixture(scope="class")
def app(self): # pylint: disable=no-self-use
model = DummyModel("TestModel")
model.load()
server = kfserving.KFServer()
server = kfserver.KFServer()
server.register_model(model)
return server.create_application()

Expand Down Expand Up @@ -67,13 +84,56 @@ async def test_explain(self, http_server_client):
assert resp.body == b'{"predictions": [[1, 2]]}'
assert resp.headers['content-type'] == "application/json; charset=UTF-8"

async def test_list(self, http_server_client):
resp = await http_server_client.fetch('/v1/models')
assert resp.code == 200
assert resp.body == b'["TestModel"]'


class TestTFHttpServerLoadAndUnLoad():
@pytest.fixture(scope="class")
def app(self): # pylint: disable=no-self-use
server = kfserver.KFServer(registered_models=DummyKFModelRepository(test_load_success=True))
return server.create_application()

async def test_load(self, http_server_client):
resp = await http_server_client.fetch('/v1/models/model/load',
method="POST", body=b'')
assert resp.code == 200
assert resp.body == b'{"name": "model", "load": true}'

async def test_unload(self, http_server_client):
resp = await http_server_client.fetch('/v1/models/model/unload',
method="POST", body=b'')
assert resp.code == 200
assert resp.body == b'{"name": "model", "unload": true}'


class TestTFHttpServerLoadAndUnLoadFailure():
@pytest.fixture(scope="class")
def app(self): # pylint: disable=no-self-use
server = kfserver.KFServer(registered_models=DummyKFModelRepository(test_load_success=False))
return server.create_application()

async def test_load_fail(self, http_server_client):
with pytest.raises(HTTPClientError) as err:
_ = await http_server_client.fetch('/v1/models/model/load',
method="POST", body=b'')
assert err.value.code == 503

async def test_unload_fail(self, http_server_client):
with pytest.raises(HTTPClientError) as err:
_ = await http_server_client.fetch('/v1/models/model/unload',
method="POST", body=b'')
assert err.value.code == 404


class TestTFHttpServerModelNotLoaded():

@pytest.fixture(scope="class")
def app(self): # pylint: disable=no-self-use
model = DummyModel("TestModel")
server = kfserving.KFServer()
server = kfserver.KFServer()
server.register_model(model)
return server.create_application()

Expand Down
5 changes: 3 additions & 2 deletions python/pytorchserver/pytorchserver/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ def __init__(self, name: str, model_class_name: str, model_dir: str):
self.model = None
self.device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

def load(self):
model_file_dir = kfserving.Storage.download(self.model_dir)
def load(self) -> bool:
model_file_dir = kfserving.Storage.download(self.model_dir, self.name)
model_file = os.path.join(model_file_dir, PYTORCH_FILE)
py_files = []
for filename in os.listdir(model_file_dir):
Expand All @@ -58,6 +58,7 @@ def load(self):
self.model.load_state_dict(torch.load(model_file, map_location=self.device))
self.model.eval()
self.ready = True
return self.ready

def predict(self, request: Dict) -> Dict:
inputs = []
Expand Down
Loading

0 comments on commit d883c34

Please sign in to comment.