diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml index d8276ee111..aab4a49ce5 100644 --- a/.github/workflows/pull_request.yml +++ b/.github/workflows/pull_request.yml @@ -31,8 +31,8 @@ jobs: uses: ./.github/workflows/ci_pipe.yml with: run_check: ${{ startsWith(github.ref_name, 'pull-request/') }} - container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-driver-230213 - test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230213 + container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-driver-230214 + test_container: nvcr.io/ea-nvidia-morpheus/morpheus:morpheus-ci-test-230214 secrets: GHA_AWS_ACCESS_KEY_ID: ${{ secrets.GHA_AWS_ACCESS_KEY_ID }} GHA_AWS_SECRET_ACCESS_KEY: ${{ secrets.GHA_AWS_SECRET_ACCESS_KEY }} diff --git a/ci/runner/Dockerfile b/ci/runner/Dockerfile index 80800a6588..1d3c75c5dc 100644 --- a/ci/runner/Dockerfile +++ b/ci/runner/Dockerfile @@ -58,6 +58,7 @@ ARG CUDA_PKG_VER RUN apt update && \ DEBIAN_FRONTEND=noninteractive TZ=Etc/UTC \ apt install --no-install-recommends -y \ + cuda-nvtx-${CUDA_PKG_VER} \ libcublas-dev-${CUDA_PKG_VER} \ libcufft-dev-${CUDA_PKG_VER} \ libcurand-dev-${CUDA_PKG_VER} \ diff --git a/external/utilities b/external/utilities index 6e1d4e62e8..fedba7fd5d 160000 --- a/external/utilities +++ b/external/utilities @@ -1 +1 @@ -Subproject commit 6e1d4e62e8ad36a3ff45652f4d1aa03810de3751 +Subproject commit fedba7fd5d646fa742cb62aac45be5265f6cc206 diff --git a/morpheus/_lib/include/morpheus/objects/rmm_tensor.hpp b/morpheus/_lib/include/morpheus/objects/rmm_tensor.hpp index d8d7ce1ce1..916daa4cd3 100644 --- a/morpheus/_lib/include/morpheus/objects/rmm_tensor.hpp +++ b/morpheus/_lib/include/morpheus/objects/rmm_tensor.hpp @@ -28,6 +28,7 @@ #include namespace morpheus { +#pragma GCC visibility push(default) /****** Component public implementations *******************/ /****** RMMTensor****************************************/ @@ -164,5 +165,7 @@ class RMMTensor : public ITensor std::vector m_shape; std::vector m_stride; }; + +#pragma GCC visibility pop /** @} */ // end of group } // namespace morpheus diff --git a/morpheus/_lib/include/morpheus/utilities/matx_util.hpp b/morpheus/_lib/include/morpheus/utilities/matx_util.hpp index 76842a5466..85be5a149a 100644 --- a/morpheus/_lib/include/morpheus/utilities/matx_util.hpp +++ b/morpheus/_lib/include/morpheus/utilities/matx_util.hpp @@ -102,7 +102,7 @@ struct MatxUtil static std::shared_ptr reduce_max(const DevMemInfo& input, const std::vector& seq_ids, size_t seq_id_offset, - const std::vector& output_shape); + const std::vector& output_shape); }; /** @} */ // end of group } // namespace morpheus diff --git a/morpheus/_lib/src/messages/multi_tensor.cpp b/morpheus/_lib/src/messages/multi_tensor.cpp index 70a4569116..058a07e539 100644 --- a/morpheus/_lib/src/messages/multi_tensor.cpp +++ b/morpheus/_lib/src/messages/multi_tensor.cpp @@ -83,7 +83,7 @@ void MultiTensorMessage::get_slice_impl(std::shared_ptr new_messag sliced_message->offset = start; sliced_message->count = stop - start; - // If we have more inference rows than message rows, we need to use the seq_ids to figure out the slicing. This + // If we have more tensor rows than message rows, we need to use the seq_ids to figure out the slicing. This // will be slow and should be avoided at all costs if (this->count != this->mess_count && this->memory->has_tensor("seq_ids")) { diff --git a/morpheus/_lib/src/stages/triton_inference.cpp b/morpheus/_lib/src/stages/triton_inference.cpp index 903183ce6d..3ce3f89e2f 100644 --- a/morpheus/_lib/src/stages/triton_inference.cpp +++ b/morpheus/_lib/src/stages/triton_inference.cpp @@ -23,14 +23,16 @@ #include "morpheus/messages/multi_response_probs.hpp" #include "morpheus/objects/dev_mem_info.hpp" // for DevMemInfo #include "morpheus/objects/dtype.hpp" // for DType +#include "morpheus/objects/rmm_tensor.hpp" #include "morpheus/objects/tensor.hpp" #include "morpheus/objects/tensor_object.hpp" // for TensorIndex, TensorObject #include "morpheus/objects/triton_in_out.hpp" #include "morpheus/utilities/matx_util.hpp" -#include "morpheus/utilities/stage_util.hpp" +#include "morpheus/utilities/stage_util.hpp" // for foreach_map #include "morpheus/utilities/string_util.hpp" // for MORPHEUS_CONCAT_STR #include "morpheus/utilities/tensor_util.hpp" // for get_elem_count +#include #include // for cudaMemcpy, cudaMemcpy2D, cudaMemcpyDeviceToHost, cudaMemcpyHostToDevice #include #include @@ -54,6 +56,11 @@ #define CHECK_TRITON(method) ::InferenceClientStage__check_triton_errors(method, #method, __FILE__, __LINE__); namespace { + +using namespace morpheus; +using tensor_map_t = TensorMemory::tensor_map_t; +using buffer_map_t = std::map>; + // Component-private free functions. void InferenceClientStage__check_triton_errors(triton::client::Error status, const std::string& methodName, @@ -70,6 +77,177 @@ void InferenceClientStage__check_triton_errors(triton::client::Error status, } } +void build_output_tensors(std::size_t count, + const std::vector& model_outputs, + buffer_map_t& output_buffers, + tensor_map_t& output_tensors) +{ + // Create the output memory blocks + for (auto& model_output : model_outputs) + { + std::vector total_shape{model_output.shape.begin(), model_output.shape.end()}; + + // First dimension will always end up being the number of rows in the dataframe + total_shape[0] = static_cast(count); + auto elem_count = TensorUtils::get_elem_count(total_shape); + + // Create the output memory + auto output_buffer = std::make_shared(elem_count * model_output.datatype.item_size(), + rmm::cuda_stream_per_thread); + + output_buffers[model_output.mapped_name] = output_buffer; + + // Triton results are always in row-major as required by the KServe protocol + // https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md#tensor-data + std::vector stride{total_shape[1], 1}; + output_tensors[model_output.mapped_name] = + Tensor::create(std::move(output_buffer), model_output.datatype, total_shape, stride, 0); + } +} + +std::vector get_seq_ids(const InferenceClientStage::sink_type_t& message) +{ + // Take a copy of the sequence Ids allowing us to map rows in the response to rows in the dataframe + // The output tensors we store in `reponse_memory` will all be of the same length as the the + // dataframe. seq_ids has three columns, but we are only interested in the first column. + auto seq_ids = message->get_input("seq_ids"); + const auto item_size = seq_ids.dtype().item_size(); + + std::vector host_seq_ids(message->count); + MRC_CHECK_CUDA(cudaMemcpy2D(host_seq_ids.data(), + item_size, + seq_ids.data(), + seq_ids.stride(0) * item_size, + item_size, + host_seq_ids.size(), + cudaMemcpyDeviceToHost)); + + return host_seq_ids; +} + +std::pair, std::vector> build_input( + const InferenceClientStage::sink_type_t& msg_slice, const TritonInOut& model_input) +{ + DCHECK(msg_slice->memory->has_tensor(model_input.mapped_name)) + << "Model input '" << model_input.mapped_name << "' not found in InferenceMemory"; + + auto const& inp_tensor = msg_slice->get_input(model_input.mapped_name); + + // Convert to the right type. Make shallow if necessary + auto final_tensor = inp_tensor.as_type(model_input.datatype); + + std::vector inp_data = final_tensor.get_host_data(); + + // Test + triton::client::InferInput* inp_ptr; + + triton::client::InferInput::Create( + &inp_ptr, model_input.name, {inp_tensor.shape(0), inp_tensor.shape(1)}, model_input.datatype.triton_str()); + + std::shared_ptr inp_shared; + inp_shared.reset(inp_ptr); + + inp_ptr->AppendRaw(inp_data); + + return std::make_pair(inp_shared, std::move(inp_data)); +} + +std::shared_ptr build_output(const TritonInOut& model_output) +{ + triton::client::InferRequestedOutput* out_ptr; + + triton::client::InferRequestedOutput::Create(&out_ptr, model_output.name); + std::shared_ptr out_shared; + out_shared.reset(out_ptr); + + return out_shared; +} + +void reduce_outputs(const InferenceClientStage::sink_type_t& x, + buffer_map_t& output_buffers, + tensor_map_t& output_tensors) +{ + // When our tensor lengths are longer than our dataframe we will need to use the seq_ids array to + // lookup how the values should map back into the dataframe. + auto host_seq_ids = get_seq_ids(x); + + tensor_map_t reduced_outputs; + + for (const auto& output : output_tensors) + { + DCHECK(std::dynamic_pointer_cast(output.second.get_tensor()) != nullptr); + auto tensor = std::static_pointer_cast(output.second.get_tensor()); + + const auto rank = tensor->rank(); + std::vector shape(rank); + tensor->get_shape(shape); + + std::vector stride(rank); + tensor->get_stride(stride); + + // DevMemInfo wants the shape & stride in size_t + std::vector tensor_shape(shape.size()); + std::copy(shape.cbegin(), shape.cend(), tensor_shape.begin()); + + std::vector tensor_stride(stride.size()); + std::copy(stride.cbegin(), stride.cend(), tensor_stride.begin()); + + std::vector reduced_shape{tensor_shape}; + reduced_shape[0] = x->mess_count; + + auto& buffer = output_buffers[output.first]; + auto reduced_buffer = MatxUtil::reduce_max( + DevMemInfo{buffer, tensor->dtype(), tensor_shape, tensor_stride}, host_seq_ids, 0, reduced_shape); + + output_buffers[output.first] = reduced_buffer; + + reduced_outputs[output.first] = + Tensor::create(std::move(reduced_buffer), + tensor->dtype(), + {static_cast(reduced_shape[0]), static_cast(reduced_shape[1])}, + stride, + 0); + } + + output_tensors = std::move(reduced_outputs); +} + +void apply_logits(buffer_map_t& output_buffers, tensor_map_t& output_tensors) +{ + tensor_map_t logit_outputs; + + for (const auto& output : output_tensors) + { + DCHECK(std::dynamic_pointer_cast(output.second.get_tensor()) != nullptr); + auto input_tensor = std::static_pointer_cast(output.second.get_tensor()); + + const auto rank = input_tensor->rank(); + std::vector shape(rank); + input_tensor->get_shape(shape); + + std::vector stride(rank); + input_tensor->get_stride(stride); + + // DevMemInfo wants the shape & stride in size_t + std::vector input_shape(shape.size()); + std::copy(shape.cbegin(), shape.cend(), input_shape.begin()); + + std::vector input_stride(stride.size()); + std::copy(stride.cbegin(), stride.cend(), input_stride.begin()); + + auto& buffer = output_buffers[output.first]; + + auto output_buffer = MatxUtil::logits(DevMemInfo{buffer, input_tensor->dtype(), input_shape, input_stride}); + + output_buffers[output.first] = output_buffer; + + // For logits the input and output shapes will be the same + logit_outputs[output.first] = Tensor::create(std::move(output_buffer), input_tensor->dtype(), shape, stride, 0); + } + + output_tensors = std::move(logit_outputs); +} + } // namespace namespace morpheus { @@ -103,121 +281,33 @@ InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() return input.subscribe(rxcpp::make_observer( [this, &output, &client](sink_type_t x) { - // When our tensor lengths are longer than our dataframe we will need to use the seq_ids - // array to lookup how the values should map back into the dataframe - const bool needs_seq_ids = x->mess_count != x->count; - std::map response_outputs; - - // Create the output memory blocks - for (auto& model_output : m_model_outputs) - { - std::vector total_shape{model_output.shape.begin(), model_output.shape.end()}; - - // First dimension will always end up being the number of rows in the dataframe - total_shape[0] = static_cast(x->mess_count); - auto elem_count = TensorUtils::get_elem_count(total_shape); - - // Create the output memory - auto output_buffer = std::make_shared( - elem_count * model_output.datatype.item_size(), rmm::cuda_stream_per_thread); - - response_outputs[model_output.mapped_name] = Tensor::create( - std::move(output_buffer), model_output.datatype, total_shape, std::vector{}, 0); - } - - // This will be the final output of all mini-batches - auto response_mem_probs = - std::make_shared(x->mess_count, std::move(response_outputs)); - auto response = std::make_shared(x->meta, - x->mess_offset, - x->mess_count, - std::move(response_mem_probs), - 0, - response_mem_probs->count); - - std::unique_ptr> host_seq_ids{nullptr}; - if (needs_seq_ids) - { - // Take a copy of the sequence Ids allowing us to map rows in the response to rows in the dataframe - // The output tensors we store in `reponse_memory` will all be of the same length as the the - // dataframe. seq_ids has three columns, but we are only interested in the first column. - auto seq_ids = x->get_input("seq_ids"); - const auto item_size = seq_ids.dtype().item_size(); - - host_seq_ids = std::make_unique>(x->count); - MRC_CHECK_CUDA(cudaMemcpy2D(host_seq_ids->data(), - item_size, - seq_ids.data(), - seq_ids.stride(0) * item_size, - item_size, - host_seq_ids->size(), - cudaMemcpyDeviceToHost)); - } - - for (size_t i = 0; i < x->count; i += m_max_batch_size) + // Using the `count` which is the number of rows in the inference tensors. We will check later if this + // doesn't match the number of rows in the dataframe (`mess_count`). This happens when the size of the + // input is too large and needs to be broken up in chunks in the pre-process stage. When this is the + // case we will reduce the rows in the response outputs such that we have a single response for each + // row int he dataframe. + tensor_map_t output_tensors; + buffer_map_t output_buffers; + build_output_tensors(x->count, m_model_outputs, output_buffers, output_tensors); + + for (size_t start = 0; start < x->count; start += m_max_batch_size) { triton::client::InferInput* input1; - size_t start = i; - size_t stop = std::min(i + m_max_batch_size, x->count); + size_t stop = std::min(start + m_max_batch_size, x->count); sink_type_t mini_batch_input = x->get_slice(start, stop); - size_t out_start = start; - size_t out_stop = stop; - if (needs_seq_ids) - { - out_start = (*host_seq_ids)[out_start]; - if (out_stop < host_seq_ids->size()) - { - out_stop = (*host_seq_ids)[out_stop]; - } - else - { - out_stop = x->mess_count; - } - } - - source_type_t mini_batch_output = response->get_slice(out_start, out_stop); - // Iterate on the model inputs in case the model takes less than what tensors are available std::vector, std::vector>> - saved_inputs = foreach_map(m_model_inputs, [this, &mini_batch_input](auto const& model_input) { - DCHECK(mini_batch_input->memory->has_tensor(model_input.mapped_name)) - << "Model input '" << model_input.mapped_name << "' not found in InferenceMemory"; - - auto const& inp_tensor = mini_batch_input->get_input(model_input.mapped_name); - - // Convert to the right type. Make shallow if necessary - auto final_tensor = inp_tensor.as_type(model_input.datatype); - - std::vector inp_data = final_tensor.get_host_data(); - - // Test - triton::client::InferInput* inp_ptr; - - triton::client::InferInput::Create(&inp_ptr, - model_input.name, - {inp_tensor.shape(0), inp_tensor.shape(1)}, - model_input.datatype.triton_str()); - std::shared_ptr inp_shared; - inp_shared.reset(inp_ptr); - - inp_ptr->AppendRaw(inp_data); - - return std::make_pair(inp_shared, std::move(inp_data)); + saved_inputs = foreach_map(m_model_inputs, [&mini_batch_input](auto const& model_input) { + return (build_input(mini_batch_input, model_input)); }); std::vector> saved_outputs = - foreach_map(m_model_outputs, [this](auto const& model_output) { + foreach_map(m_model_outputs, [](auto const& model_output) { // Generate the outputs to be requested. - triton::client::InferRequestedOutput* out_ptr; - - triton::client::InferRequestedOutput::Create(&out_ptr, model_output.name); - std::shared_ptr out_shared; - out_shared.reset(out_ptr); - - return out_shared; + return build_output(model_output); }); std::vector inputs = @@ -226,8 +316,6 @@ InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() std::vector outputs = foreach_map(saved_outputs, [](auto x) { return x.get(); }); - // this->segment().resources().fiber_pool().enqueue([client, output](){}); - triton::client::InferResult* results; CHECK_TRITON(client->Infer(&results, m_options, inputs, outputs)); @@ -248,58 +336,38 @@ InferenceClientStage::subscribe_fn_t InferenceClientStage::build_operator() size_t output_ptr_size = 0; CHECK_TRITON(results->RawData(model_output.name, &output_ptr, &output_ptr_size)); - auto output_buffer = - std::make_shared(output_ptr_size, rmm::cuda_stream_per_thread); + auto output_tensor = output_tensors[model_output.mapped_name].slice( + {static_cast(start), 0}, {static_cast(stop), -1}); - MRC_CHECK_CUDA( - cudaMemcpy(output_buffer->data(), output_ptr, output_ptr_size, cudaMemcpyHostToDevice)); + DCHECK_EQ(stop - start, output_shape[0]); + DCHECK_EQ(output_tensor.bytes(), output_ptr_size); - if (needs_seq_ids && output_shape[0] != mini_batch_output->count) - { - // Since we are working with slices of both the input and the output, the seq_ids have - // already been applied to the output's start & stop, so we only need to reduce the - // response tensort when the size doesn't match our output - std::vector mapped_output_shape{output_shape}; - mapped_output_shape[0] = mini_batch_output->count; - - // The shape of the triton output is the input to the reduce_max method - std::vector input_shape(output_shape.size()); - std::copy(output_shape.cbegin(), output_shape.cend(), input_shape.begin()); - - // Triton results are always in row-major as required by the KServe protocol - // https://github.com/kserve/kserve/blob/master/docs/predict-api/v2/required_api.md#tensor-data - std::vector stride{static_cast(output_shape[1]), 1}; - output_buffer = MatxUtil::reduce_max( - DevMemInfo{output_buffer, model_output.datatype, input_shape, stride}, - *host_seq_ids, - mini_batch_input->offset, - mapped_output_shape); - output_shape = std::move(mapped_output_shape); - } + MRC_CHECK_CUDA( + cudaMemcpy(output_tensor.data(), output_ptr, output_ptr_size, cudaMemcpyHostToDevice)); + } + } - // If we need to do logits, do that here - if (m_needs_logits) - { - std::vector input_shape(output_shape.size()); - std::copy(output_shape.cbegin(), output_shape.cend(), input_shape.begin()); - - output_buffer = - MatxUtil::logits(DevMemInfo{output_buffer, - model_output.datatype, - input_shape, - {static_cast(output_shape[1]), 1}}); - } + if (x->mess_count != x->count) + { + reduce_outputs(x, output_buffers, output_tensors); + } - mini_batch_output->set_output( - model_output.mapped_name, - Tensor::create(std::move(output_buffer), - model_output.datatype, - std::vector{static_cast(output_shape[0]), - static_cast(output_shape[1])}, - std::vector{}, - 0)); - } + // If we need to do logits, do that here + if (m_needs_logits) + { + apply_logits(output_buffers, output_tensors); } + + // Final output of all mini-batches + auto response_mem_probs = + std::make_shared(x->mess_count, std::move(output_tensors)); + auto response = std::make_shared(x->meta, + x->mess_offset, + x->mess_count, + std::move(response_mem_probs), + 0, + response_mem_probs->count); + output.on_next(std::move(response)); }, [&](std::exception_ptr error_ptr) { output.on_error(error_ptr); }, diff --git a/morpheus/_lib/src/utilities/matx_util.cu b/morpheus/_lib/src/utilities/matx_util.cu index f4fb4419b0..72019f34f1 100644 --- a/morpheus/_lib/src/utilities/matx_util.cu +++ b/morpheus/_lib/src/utilities/matx_util.cu @@ -27,10 +27,14 @@ #include #include +#include #include namespace morpheus { + using tensorShape_1d = std::array; + using tensorShape_2d = std::array; + // Component-private classes. // ************ MatxUtil__MatxCast**************// /** @@ -57,10 +61,10 @@ namespace morpheus { typename OutputT, std::enable_if_t() && cudf::is_numeric()> * = nullptr> void operator()(void *input_data, void *output_data) { - matx::tensorShape_t<1> shape({static_cast(element_count)}); + tensorShape_1d shape({static_cast(element_count)}); - matx::tensor_t input_tensor(static_cast(input_data), shape); - matx::tensor_t output_tensor(static_cast(output_data), shape); + auto input_tensor = matx::make_tensor(static_cast(input_data), shape); + auto output_tensor = matx::make_tensor(static_cast(output_data), shape); (output_tensor = input_tensor).run(stream.value()); } @@ -88,14 +92,14 @@ namespace morpheus { */ template> * = nullptr> void operator()(void *output_data) { - matx::tensorShape_t<2> shape({static_cast(element_count), 3}); + auto matx_count = static_cast(element_count); + tensorShape_2d shape({matx_count, 3}); - matx::tensor_t output_tensor(static_cast(output_data), shape); + auto output_tensor = matx::make_tensor(static_cast(output_data), shape); auto col0 = output_tensor.template Slice<1>({0, 0}, {matx::matxEnd, matx::matxDropDim}); auto col2 = output_tensor.template Slice<1>({0, 2}, {matx::matxEnd, matx::matxDropDim}); - auto range_col = - matx::range_x(matx::tensorShape_t<1>({static_cast(element_count)}), 0, 1); + auto range_col = matx::range<0, tensorShape_1d, OutputT>({matx_count}, 0, 1); (col0 = range_col).run(stream.value()); (col2 = fea_len - 1).run(stream.value()); @@ -123,11 +127,11 @@ namespace morpheus { */ template()> * = nullptr> void operator()(void *input_data, void *output_data) { - matx::tensorShape_t<1> shape({static_cast(element_count)}); + tensorShape_1d shape({static_cast(element_count)}); - matx::tensor_t input_tensor(static_cast(input_data), shape); + auto input_tensor = matx::make_tensor(static_cast(input_data), shape); - matx::tensor_t output_tensor(static_cast(output_data), shape); + auto output_tensor = matx::make_tensor(static_cast(output_data), shape); (output_tensor = (InputT) 1 / ((InputT) 1 + matx::exp((InputT) -1 * input_tensor))).run(stream.value()); } @@ -156,11 +160,11 @@ namespace morpheus { */ template()> * = nullptr> void operator()(void *input_data, void *output_data) { - matx::tensorShape_t<2> input_shape({static_cast(rows), static_cast(cols)}); - matx::tensorShape_t<2> output_shape({static_cast(cols), static_cast(rows)}); + tensorShape_2d input_shape({static_cast(rows), static_cast(cols)}); + tensorShape_2d output_shape({static_cast(cols), static_cast(rows)}); - matx::tensor_t input_tensor(static_cast(input_data), input_shape); - matx::tensor_t output_tensor(static_cast(output_data), output_shape); + auto input_tensor = matx::make_tensor(static_cast(input_data), input_shape); + auto output_tensor = matx::make_tensor(static_cast(output_data), output_shape); (output_tensor = input_tensor.Permute({1, 0})).run(stream.value()); } @@ -205,24 +209,22 @@ namespace morpheus { template void threshold_by_row(void *input_data, void *output_data, double threshold, const std::vector& stride) { - matx::tensorShape_t<2> input_shape({static_cast(rows), static_cast(cols)}); - // Output is always 1 column - matx::tensorShape_t<1> output_shape({static_cast(rows)}); + tensorShape_1d output_shape({static_cast(rows)}); + + matx::DefaultDescriptor<2> desc{{static_cast(rows), static_cast(cols)}, + {static_cast(stride[0]), static_cast(stride[1])}}; - // Specify the stride here since the data comes in column major order. - matx::tensor_t input_tensor(static_cast(input_data), - input_shape, - {static_cast(stride[0]), - static_cast(stride[1])}); + auto input_tensor = + matx::make_tensor>(static_cast(input_data), std::move(desc)); // Tmp array to hold max value - matx::tensor_t max_tensor(output_shape); + auto max_tensor = matx::make_tensor(output_shape); // row-wise reduction matx::rmax(max_tensor, input_tensor, stream.value()); - matx::tensor_t output_tensor(static_cast(output_data), output_shape); + auto output_tensor = matx::make_tensor(static_cast(output_data), output_shape); // Convert max value to bool (output_tensor = max_tensor > (InputT) threshold).run(stream.value()); @@ -234,13 +236,16 @@ namespace morpheus { template void threshold(void *input_data, void *output_data, double threshold, const std::vector& stride) { - matx::tensorShape_t<2> shape({static_cast(rows), static_cast(cols)}); + matx::DefaultDescriptor<2> input_desc{{static_cast(rows), static_cast(cols)}, + {static_cast(stride[0]), static_cast(stride[1])}}; - matx::index_t matx_stride[2] = {static_cast(stride[0]), - static_cast(stride[1])}; + // Input & Output have the same shape & stride. The make_tensor API requires a move for the descriptor + // so we need to take a copy of it here. + matx::DefaultDescriptor<2> output_desc = input_desc; - matx::tensor_t input_tensor(static_cast(input_data), shape, matx_stride); - matx::tensor_t output_tensor(static_cast(output_data), shape, matx_stride); + + auto input_tensor = matx::make_tensor(static_cast(input_data), std::move(input_desc)); + auto output_tensor = matx::make_tensor(static_cast(output_data), std::move(output_desc)); // Convert max value to bool (output_tensor = input_tensor > (InputT) threshold).run(stream.value()); @@ -249,23 +254,26 @@ namespace morpheus { struct MatxUtil__MatxReduceMax { matx::index_t num_input_rows; + matx::index_t num_output_rows; matx::index_t num_cols; std::vector input_stride; - matx::index_t num_output_rows; - void *input_data; - void *output_data; + const std::vector &seq_ids; + size_t seq_id_offset; rmm::cuda_stream_view stream; template()> * = nullptr> - void operator()(std::size_t start, std::size_t stop, int32_t output_idx) { + void operator()(void *input_data, void *output_data) { throw std::invalid_argument("Unsupported conversion"); } template()> * = nullptr> - void operator()(std::size_t start, std::size_t stop, int32_t output_idx) { - auto input_count = stop - start; - matx::tensorShape_t<2> input_shape({static_cast(input_count), num_cols}); - matx::tensorShape_t<1> output_shape({num_cols}); + void operator()(void *input_data, void *output_data) + { + auto input_ptr = static_cast(input_data); + matx::DefaultDescriptor<2> input_desc{{num_input_rows, num_cols}, {input_stride[0], input_stride[1]}}; + auto input_tensor = matx::make_tensor>(input_ptr, std::move(input_desc)); + + auto output_ptr = static_cast(output_data); matx::index_t output_stride[2] = {input_stride[0], input_stride[1]}; if (output_stride[0] == 1) @@ -273,16 +281,41 @@ namespace morpheus { output_stride[1] = num_output_rows; } - auto input_ptr = static_cast(input_data) + (start * input_stride[0]); - auto output_ptr = static_cast(output_data) + (output_idx * output_stride[0]); + matx::DefaultDescriptor<2> output_desc{{num_output_rows, num_cols}, output_stride}; + auto output_tensor = matx::make_tensor>(output_ptr, std::move(output_desc)); + + matx::index_t start = 0; + auto output_offset = static_cast(seq_ids[seq_id_offset]); + for (matx::index_t i=1; i < num_input_rows; ++i) + { + auto idx = seq_ids[i+seq_id_offset]; + if (idx != seq_ids[start+seq_id_offset]) + { + DCHECK(seq_ids[start+seq_id_offset]-output_offset < num_output_rows); + reduce_rows(input_tensor, output_tensor, start, i, static_cast(seq_ids[start+seq_id_offset])-output_offset); + start = i; + } + } + + + DCHECK(seq_ids[start+seq_id_offset]-output_offset < num_output_rows) << "\nstart=" << start << " seq_ids[start+seq_id_offset]-output_offset=" << seq_ids[start+seq_id_offset]-output_offset << " num_output_rows=" << num_output_rows; + reduce_rows(input_tensor, output_tensor, start, num_input_rows, static_cast(seq_ids[start+seq_id_offset])-output_offset); + } + + template + void reduce_rows(matx::tensor_t& input_tensor, + matx::tensor_t& output_tensor, + matx::index_t start, + matx::index_t stop, + matx::index_t output_idx) + { + auto input_slice = input_tensor.Slice({start, 0}, {stop, matx::matxEnd}); + auto tmp_tensor = matx::make_tensor({num_cols}); - matx::tensor_t input_tensor(input_ptr, input_shape, {input_stride[0], input_stride[1]}); - matx::tensor_t output_tensor(output_ptr, output_shape, {output_stride[1]}); + matx::rmax(tmp_tensor, input_slice.Permute({1, 0}), stream.value()); - // We need to transpose the input such that rmax will reduce the rows - // Matx performs reductions over the innermost dimensions. - // see https://nvidia.github.io/MatX/api/reduce.html - matx::rmax(output_tensor, input_tensor.Permute({1, 0}), stream.value()); + auto output_slice = output_tensor.template Slice<1>({output_idx, 0}, {matx::matxDropDim, matx::matxEnd}); + (output_slice = tmp_tensor).run(stream.value()); } }; @@ -374,14 +407,16 @@ namespace morpheus { MatxUtil::reduce_max(const DevMemInfo &input, const std::vector &seq_ids, size_t seq_id_offset, - const std::vector &output_shape) + const std::vector &output_shape) { const auto& dtype = input.dtype(); auto cudf_type = cudf::data_type{dtype.cudf_type_id()}; auto num_input_rows = input.shape(0); auto num_input_cols = input.shape(1); - std::vector matx_stride{static_cast(input.stride(0)), static_cast(input.stride(1))}; + std::vector matx_stride{static_cast(input.stride(0)), + static_cast(input.stride(1))}; + std::size_t output_element_count = output_shape[0] * output_shape[1]; std::size_t output_buff_size = dtype.item_size() * output_element_count; @@ -391,34 +426,14 @@ namespace morpheus { auto output = input.make_new_buffer(output_buff_size); MatxUtil__MatxReduceMax matx_reduce_max{static_cast(num_input_rows), + static_cast(output_shape[0]), static_cast(num_input_cols), matx_stride, - output_shape[0], - input.data(), - output->data(), + seq_ids, + seq_id_offset, output->stream()}; - std::size_t start = 0; - auto output_offset = seq_ids[seq_id_offset]; - for (std::size_t i=0; i < num_input_rows; ++i) - { - auto idx = seq_ids[i+seq_id_offset]; - if (idx != seq_ids[start+seq_id_offset]) - { - cudf::type_dispatcher(cudf_type, - matx_reduce_max, - start, - i, - seq_ids[start+seq_id_offset]-output_offset); - start = i; - } - } - - cudf::type_dispatcher(cudf_type, - matx_reduce_max, - start, - num_input_rows, - seq_ids[start+seq_id_offset]-output_offset); + cudf::type_dispatcher(cudf_type, matx_reduce_max, input.data(), output->data()); mrc::enqueue_stream_sync_event(output->stream()).get(); return output; diff --git a/morpheus/_lib/tests/test_matx_util.cpp b/morpheus/_lib/tests/test_matx_util.cpp index cce7192cc8..d957fe68e3 100644 --- a/morpheus/_lib/tests/test_matx_util.cpp +++ b/morpheus/_lib/tests/test_matx_util.cpp @@ -57,7 +57,7 @@ TEST_F(TestMatxUtil, ReduceMax1d) MRC_CHECK_CUDA(cudaMemcpy(input_buffer->data(), input.data(), input_buffer->size(), cudaMemcpyHostToDevice)); DevMemInfo dm{input_buffer, dtype, {input.size(), 1}, {1, 0}}; - std::vector output_shape{static_cast(expected_output.size()), 1}; + std::vector output_shape{expected_output.size(), 1}; auto output_buffer = MatxUtil::reduce_max(dm, seq_ids, 0, output_shape); std::vector output(expected_output.size()); @@ -111,7 +111,7 @@ TEST_F(TestMatxUtil, ReduceMax2dRowMajor) MRC_CHECK_CUDA(cudaMemcpy(input_buffer->data(), input.data(), input_buffer->size(), cudaMemcpyHostToDevice)); DevMemInfo dm{input_buffer, dtype, {num_rows, num_cols}, {num_cols, 1}}; - std::vector output_shape{static_cast(expected_rows), static_cast(num_cols)}; + std::vector output_shape{expected_rows, num_cols}; auto output_buffer = MatxUtil::reduce_max(dm, seq_ids, 0, output_shape); EXPECT_EQ(output_buffer->size(), expected_rows * num_cols * dtype.item_size()); @@ -172,7 +172,7 @@ TEST_F(TestMatxUtil, ReduceMax2dColMajor) EXPECT_EQ(expected_rows * num_cols, expected_output.size()); DevMemInfo dm{input_buffer, dtype, {num_rows, num_cols}, {1, num_rows}}; - std::vector output_shape{static_cast(expected_rows), static_cast(num_cols)}; + std::vector output_shape{expected_rows, num_cols}; auto output_buffer = MatxUtil::reduce_max(dm, seq_ids, 0, output_shape); EXPECT_EQ(output_buffer->size(), expected_rows * num_cols * dtype.item_size()); @@ -186,3 +186,129 @@ TEST_F(TestMatxUtil, ReduceMax2dColMajor) EXPECT_DOUBLE_EQ(output[i], expected_output[i]); } } + +TEST_F(TestMatxUtil, Cast) +{ + std::vector float_vec{5.1, 2.2, 8.3, 9.4, 8.5, 2.6, 1.7, 8.1}; + + DType float_type(TypeId::FLOAT32); + + auto float_buffer = + std::make_shared(float_vec.size() * float_type.item_size(), rmm::cuda_stream_per_thread); + + MRC_CHECK_CUDA(cudaMemcpy(float_buffer->data(), float_vec.data(), float_buffer->size(), cudaMemcpyHostToDevice)); + + DevMemInfo dm{float_buffer, float_type, {4, 2}, {1, 4}}; + + DType double_type(TypeId::FLOAT64); + auto double_buffer = MatxUtil::cast(dm, double_type.type_id()); + EXPECT_EQ(float_vec.size() * double_type.item_size(), double_buffer->size()); + + std::vector double_vec(float_vec.size()); + MRC_CHECK_CUDA(cudaMemcpy(double_vec.data(), double_buffer->data(), double_buffer->size(), cudaMemcpyDeviceToHost)); + + EXPECT_EQ(double_vec.size(), float_vec.size()); + for (std::size_t i = 0; i < double_vec.size(); ++i) + { + EXPECT_DOUBLE_EQ(double_vec[i], float_vec[i]); + } +} + +TEST_F(TestMatxUtil, Threshold) +{ + // clang-format off + // disabling clang-format to illustrate row-major layout + + std::vector input + { + 1.0, 0.2, 0.7, 0.9, + 1.0, 0.6, 0.1, 0.9, + 0.2, 0.8, 1.0, 0.9, + 0.1, 0.4, 0.1, 0.3, + 0.8, 1.0, 1.0, 0.8 + }; + + std::vector expected_output + { + true, false, true, true, + true, true, false, true, + false, true, true, true, + false, false, false, false, + true, true, true, true, + }; + // clang-format on + + std::size_t num_cols = 4; + std::size_t num_rows = 5; + EXPECT_EQ(num_cols * num_rows, input.size()); + + DType dtype(TypeId::FLOAT32); + + std::size_t buff_size = input.size() * dtype.item_size(); + auto input_buffer = std::make_shared(buff_size, rmm::cuda_stream_per_thread); + + MRC_CHECK_CUDA(cudaMemcpy(input_buffer->data(), input.data(), input_buffer->size(), cudaMemcpyHostToDevice)); + + DevMemInfo dm{input_buffer, dtype, {num_rows, num_cols}, {num_cols, 1}}; + + auto output = MatxUtil::threshold(dm, 0.5, false); + + // output and output_by_row are holding 1-byte bool values, so the byte size and element size should be the same + EXPECT_EQ(output->size(), expected_output.size()); + + std::vector host_byte_outut(expected_output.size()); + + MRC_CHECK_CUDA(cudaMemcpy(host_byte_outut.data(), output->data(), output->size(), cudaMemcpyDeviceToHost)); + + for (std::size_t i = 0; i < host_byte_outut.size(); ++i) + { + bool output_val = host_byte_outut[i]; + EXPECT_EQ(output_val, expected_output[i]); + } +} + +TEST_F(TestMatxUtil, ThresholdByRow) +{ + // clang-format off + // disabling clang-format to illustrate row-major layout + + std::vector input + { + 1.0, 0.2, 0.7, 0.9, + 1.0, 0.6, 0.1, 0.9, + 0.2, 0.8, 1.0, 0.9, + 0.1, 0.4, 0.1, 0.3, + 0.8, 1.0, 1.0, 0.8 + }; + + std::vector expected_output{true, true, true, false, true}; + // clang-format on + + std::size_t num_cols = 4; + std::size_t num_rows = 5; + EXPECT_EQ(num_cols * num_rows, input.size()); + + DType dtype(TypeId::FLOAT32); + + std::size_t buff_size = input.size() * dtype.item_size(); + auto input_buffer = std::make_shared(buff_size, rmm::cuda_stream_per_thread); + + MRC_CHECK_CUDA(cudaMemcpy(input_buffer->data(), input.data(), input_buffer->size(), cudaMemcpyHostToDevice)); + + DevMemInfo dm{input_buffer, dtype, {num_rows, num_cols}, {num_cols, 1}}; + + auto output = MatxUtil::threshold(dm, 0.5, true); + + // output and output_by_row are holding 1-byte bool values, so the byte size and element size should be the same + EXPECT_EQ(output->size(), expected_output.size()); + + std::vector host_byte_outut(expected_output.size()); + + MRC_CHECK_CUDA(cudaMemcpy(host_byte_outut.data(), output->data(), output->size(), cudaMemcpyDeviceToHost)); + + for (std::size_t i = 0; i < host_byte_outut.size(); ++i) + { + bool output_val = host_byte_outut[i]; + EXPECT_EQ(output_val, expected_output[i]); + } +} diff --git a/morpheus/_lib/tests/test_tensor.cpp b/morpheus/_lib/tests/test_tensor.cpp index 3fbb5cbd8f..095261925a 100644 --- a/morpheus/_lib/tests/test_tensor.cpp +++ b/morpheus/_lib/tests/test_tensor.cpp @@ -17,12 +17,15 @@ #include "./test_morpheus.hpp" // IWYU pragma: associated +#include "morpheus/objects/dtype.hpp" // for DType +#include "morpheus/objects/rmm_tensor.hpp" #include "morpheus/objects/tensor_object.hpp" // for TensorIndex #include "morpheus/utilities/tensor_util.hpp" // for TensorUtils, TensorUtils::shape_type_t #include // for AssertionResult, SuiteApiResolver, TestInfo, EXPECT_TRUE, Message, TEST_F, Test, TestFactoryImpl, TestPartResult #include // for size_t +#include // shared_ptr #include // for allocator, operator==, basic_string, string #include // for vector // IWYU pragma: no_include "morpheus/utilities/string_util.hpp" @@ -41,7 +44,7 @@ class TestTensor : public ::testing::Test TEST_F(TestTensor, UtilsShapeString) { TensorUtils::shape_type_t shape = {100, 10, 1}; - auto shape_str = TensorUtils::shape_to_string(shape); + auto shape_str = TensorUtils::shape_to_string(shape); EXPECT_TRUE(shape_str == std::string("(100, 10, 1)")); } @@ -78,6 +81,38 @@ TEST_F(TestTensor, GetElementStride) } } +TEST_F(TestTensor, AsType) +{ + std::vector float_vec{5.1, 2.2, 8.3, 9.4, 8.5, 2.6, 1.7, 8.1}; + + DType float_type(TypeId::FLOAT32); + + auto float_buffer = + std::make_shared(float_vec.size() * float_type.item_size(), rmm::cuda_stream_per_thread); + + MRC_CHECK_CUDA(cudaMemcpy(float_buffer->data(), float_vec.data(), float_buffer->size(), cudaMemcpyHostToDevice)); + + std::vector shape{4, 2}; + std::vector stride{1, 4}; + auto float_tensor = std::make_shared(float_buffer, 0, float_type, shape, stride); + + DType double_type(TypeId::FLOAT64); + auto double_tensor = float_tensor->as_type(double_type); + + EXPECT_EQ(float_vec.size(), double_tensor->count()); + EXPECT_EQ(float_vec.size() * double_type.item_size(), double_tensor->bytes()); + + std::vector double_vec(float_vec.size()); + MRC_CHECK_CUDA( + cudaMemcpy(double_vec.data(), double_tensor->data(), double_tensor->bytes(), cudaMemcpyDeviceToHost)); + + EXPECT_EQ(double_vec.size(), float_vec.size()); + for (std::size_t i = 0; i < double_vec.size(); ++i) + { + EXPECT_DOUBLE_EQ(double_vec[i], float_vec[i]); + } +} + /* TEST_F(TestTensor, UtilsValidateShapeAndStride) {