Skip to content

Commit

Permalink
Fixing triton inference stage resource pool (nv-morpheus#722)
Browse files Browse the repository at this point in the history
Fixes a copy/paste error which prevented memory resource objects from being returned to the pool.

Closes nv-morpheus#726

Authors:
  - Michael Demoret (https://github.com/mdemoret-nv)
  - David Gardner (https://github.com/dagardner-nv)

Approvers:
  - David Gardner (https://github.com/dagardner-nv)

URL: nv-morpheus#722
  • Loading branch information
mdemoret-nv authored and jjacobelli committed Mar 7, 2023
1 parent 28d23cb commit de4c8af
Show file tree
Hide file tree
Showing 5 changed files with 233 additions and 27 deletions.
2 changes: 1 addition & 1 deletion examples/ransomware_detection/stages/create_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

from dask.distributed import Client

from morpheus._lib.messages import MessageMeta
from morpheus.cli.register_stage import register_stage
from morpheus.config import Config
from morpheus.config import PipelineModes
from morpheus.messages import MessageMeta
from morpheus.messages import MultiMessage
from morpheus.pipeline.multi_message_stage import MultiMessageStage
from morpheus.pipeline.stream_pair import StreamPair
Expand Down
2 changes: 1 addition & 1 deletion examples/sid_visualization/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
import mrc

from morpheus._lib.common import FileTypes
from morpheus._lib.messages import MessageMeta
from morpheus.config import Config
from morpheus.config import CppConfig
from morpheus.config import PipelineModes
from morpheus.io.deserializers import read_file_to_df
from morpheus.messages import MessageMeta
from morpheus.pipeline.linear_pipeline import LinearPipeline
from morpheus.pipeline.preallocator_mixin import PreallocatorMixin
from morpheus.pipeline.single_output_source import SingleOutputSource
Expand Down
3 changes: 2 additions & 1 deletion morpheus.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@
],
"python.linting.flake8Enabled": true,
"python.linting.pylintArgs": [
"--rcfile=${workspaceFolder}/.pylintrc"
"--rcfile=${workspaceFolder}/.pylintrc",
"--init-hook=import sys; sys.path.append(\"${workspaceFolder}\")",
],
"python.linting.pylintEnabled": true,
"rewrap.wrappingColumn": 120,
Expand Down
64 changes: 40 additions & 24 deletions morpheus/stages/inference/triton_inference_stage.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import dataclasses
import logging
import queue
import threading
import typing
import warnings
from abc import abstractmethod
Expand All @@ -41,6 +40,8 @@
from morpheus.stages.inference.inference_stage import InferenceWorker
from morpheus.utils.producer_consumer_queue import ProducerConsumerQueue

_T = typing.TypeVar("_T")

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -97,7 +98,7 @@ class TritonInOut:
ptr: cp.cuda.MemoryPointer = None


class ResourcePool:
class ResourcePool(typing.Generic[_T]):
"""
This class provides a bounded pool of resources. Users of the pool can borrow a resource where they will
get exclusive access to that resource until it is returned. New objects will be created if the pool is
Expand All @@ -116,57 +117,72 @@ class ResourcePool:
"""

def __init__(self, create_fn: typing.Callable[[], typing.Any], max_size: int = 1000):
def __init__(self, create_fn: typing.Callable[[], _T], max_size: int = 1000):
self._create_fn = create_fn
self._max_size = max_size
self._added_count = 0

self._queue = ProducerConsumerQueue()

self._adding_condition = threading.Condition(self._queue.mutex)

self._outstanding = []
self._queue: ProducerConsumerQueue[_T] = ProducerConsumerQueue(maxsize=self._max_size)

def _borrow(self):
def _add_item(self):
try:
return self._queue.get_nowait()
except queue.Empty:
# Now try and create one
# Hold the queue mutex while we create this
with self._queue.mutex:

# Only add it if we have room
# Only add it if we have room. Otherwise we allocate memory each time we try to exceed the size
if (self._added_count < self._max_size):
self._queue.put(self._create_fn())
self._queue.put_nowait(self._create_fn())
self._added_count += 1

return self._queue.get()
except queue.Full:
logger.error(
"Failed to add item to the Triton ResourcePool. The ResourcePool and queue size are out of sync.")
raise

def borrow(self):
@property
def added_count(self):
"""
The number of items that have been generated by the pool. Starts at 0 and increases for ever borrow request when
the current pool is empty.
Returns
-------
int
Current number of added items.
"""
return self._added_count

def borrow_obj(self, timeout: float = None) -> _T:
"""
Returns an item from the pool. If the pool is empty, a new item will be created and returned.
Returns
-------
obj : typing.Any
obj
Item from the queue.
"""

obj = self._borrow()
try:
return self._queue.get_nowait()
except queue.Empty:
# Now try and create one
self._add_item()

return obj
return self._queue.get(timeout=timeout)

def return_obj(self, obj):
def return_obj(self, obj: _T):
"""
Returns a borrowed item back to the pool to be used by new calls to `borrow()`.
Parameters
----------
obj : typing.Any
obj
An item to be added to the queue.
self._queue.put(obj)
"""

# Use put_nowait here because we should never exceed the size and this should fail instead of blocking
self._queue.put_nowait(obj)


class InputWrapper:
"""
Expand Down Expand Up @@ -600,7 +616,7 @@ def process(self, batch: MultiInferenceMessage, cb: typing.Callable[[ResponseMem
Callback to set the values for the inference response.
"""
mem: InputWrapper = self._mem_pool.borrow()
mem: InputWrapper = self._mem_pool.borrow_obj()

inputs: typing.List[tritonclient.InferInput] = [
mem.build_input(input.name,
Expand Down
189 changes: 189 additions & 0 deletions tests/test_triton_inference_stage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python
# SPDX-FileCopyrightText: Copyright (c) 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import csv
import os
import queue
from unittest import mock

import numpy as np
import pytest

from morpheus.config import ConfigFIL
from morpheus.config import PipelineModes
from morpheus.pipeline import LinearPipeline
from morpheus.stages.inference.triton_inference_stage import ResourcePool
from morpheus.stages.inference.triton_inference_stage import TritonInferenceStage
from morpheus.stages.input.file_source_stage import FileSourceStage
from morpheus.stages.output.write_to_file_stage import WriteToFileStage
from morpheus.stages.postprocess.add_scores_stage import AddScoresStage
from morpheus.stages.postprocess.serialize_stage import SerializeStage
from morpheus.stages.preprocess.deserialize_stage import DeserializeStage
from morpheus.stages.preprocess.preprocess_fil_stage import PreprocessFILStage

MODEL_MAX_BATCH_SIZE = 1024


def test_resource_pool():
create_fn = mock.MagicMock()

# If called a third time this will raise a StopIteration exception
create_fn.side_effect = range(2)

rp = ResourcePool[int](create_fn=create_fn, max_size=2)

assert rp._queue.qsize() == 0

# Check for normal allocation
assert rp.borrow_obj() == 0
assert rp._queue.qsize() == 0
assert rp.added_count == 1
create_fn.assert_called_once()

assert rp.borrow_obj() == 1
assert rp._queue.qsize() == 0
assert rp.added_count == 2
assert create_fn.call_count == 2

rp.return_obj(0)
assert rp._queue.qsize() == 1
rp.return_obj(1)
assert rp._queue.qsize() == 2

assert rp.borrow_obj() == 0
assert rp._queue.qsize() == 1
assert rp._added_count == 2
assert create_fn.call_count == 2

assert rp.borrow_obj() == 1
assert rp._queue.qsize() == 0
assert rp._added_count == 2
assert create_fn.call_count == 2


def test_resource_pool_overallocate():
create_fn = mock.MagicMock()

# If called a third time this will raise a StopIteration exception
create_fn.side_effect = range(5)

rp = ResourcePool[int](create_fn=create_fn, max_size=2)

assert rp.borrow_obj() == 0
assert rp.borrow_obj() == 1

with pytest.raises(queue.Empty):
rp.borrow_obj(timeout=0)


def test_resource_pool_large_count():
create_fn = mock.MagicMock()
create_fn.side_effect = range(10000)

rp = ResourcePool[int](create_fn=create_fn, max_size=10000)

for _ in range(10000):
rp.borrow_obj(timeout=0)

assert rp._queue.qsize() == 0
assert create_fn.call_count == 10000


def test_resource_pool_create_raises_error():
create_fn = mock.MagicMock()
create_fn.side_effect = (10, RuntimeError, 20)

rp = ResourcePool[int](create_fn=create_fn, max_size=10)

assert rp.borrow_obj() == 10

with pytest.raises(RuntimeError):
rp.borrow_obj()

assert rp.borrow_obj() == 20


@pytest.mark.slow
@pytest.mark.use_python
@pytest.mark.parametrize('num_records', [1000, 2000, 4000])
@mock.patch('tritonclient.grpc.InferenceServerClient')
def test_triton_stage_pipe(mock_triton_client, config, tmp_path, num_records):
mock_metadata = {
"inputs": [{
'name': 'input__0', 'datatype': 'FP32', "shape": [-1, 1]
}],
"outputs": [{
'name': 'output__0', 'datatype': 'FP32', 'shape': ['-1', '1']
}]
}
mock_model_config = {"config": {"max_batch_size": MODEL_MAX_BATCH_SIZE}}

input_file = os.path.join(tmp_path, "input_data.csv")
with open(input_file, 'w') as fh:
writer = csv.writer(fh, dialect=csv.excel)
writer.writerow(['v'])
for i in range(num_records):
writer.writerow([i * 2])

mock_triton_client.return_value = mock_triton_client
mock_triton_client.is_server_live.return_value = True
mock_triton_client.is_server_ready.return_value = True
mock_triton_client.is_model_ready.return_value = True
mock_triton_client.get_model_metadata.return_value = mock_metadata
mock_triton_client.get_model_config.return_value = mock_model_config

data = np.loadtxt(input_file, delimiter=',', skiprows=1)
inf_results = np.split(data, range(MODEL_MAX_BATCH_SIZE, len(data), MODEL_MAX_BATCH_SIZE))

mock_infer_result = mock.MagicMock()
mock_infer_result.as_numpy.side_effect = inf_results

def async_infer(callback=None, **k):
callback(mock_infer_result, None)

mock_triton_client.async_infer.side_effect = async_infer

config.mode = PipelineModes.FIL
config.class_labels = ["test"]
config.model_max_batch_size = MODEL_MAX_BATCH_SIZE
config.pipeline_batch_size = 1024
config.feature_length = 1
config.edge_buffer_size = 128
config.num_threads = 1

config.fil = ConfigFIL()

config.fil.feature_columns = ['v']

out_file = os.path.join(tmp_path, 'results.csv')

pipe = LinearPipeline(config)
pipe.set_source(FileSourceStage(config, filename=input_file, iterative=False))
pipe.add_stage(DeserializeStage(config))
pipe.add_stage(PreprocessFILStage(config))
pipe.add_stage(
TritonInferenceStage(config, model_name='abp-nvsmi-xgb', server_url='test:0000', force_convert_inputs=True))
pipe.add_stage(AddScoresStage(config, prefix="score_"))
pipe.add_stage(SerializeStage(config))
pipe.add_stage(WriteToFileStage(config, filename=out_file, overwrite=False))

pipe.run()

results = np.loadtxt(out_file, delimiter=',', skiprows=1)
assert len(results) == num_records

for (i, row) in enumerate(results):
assert (row == [i, i * 2, i * 2]).all()

0 comments on commit de4c8af

Please sign in to comment.