diff --git a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp index 6c5552a7b5..d161b7b29b 100644 --- a/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp +++ b/src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp @@ -119,9 +119,10 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { const ov::genai::GenerationConfig& generation_config = {} ); - ov::genai::Tokenizer get_tokenizer(); + ov::genai::Tokenizer get_tokenizer() const; ov::genai::GenerationConfig get_config() const; + void set_config(const ov::genai::GenerationConfig& config); /** * Allows to get the current pipeline metrics. @@ -131,6 +132,7 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, const ov::genai::GenerationConfig& sampling_params); GenerationHandle add_request(uint64_t request_id, const std::string& prompt, const ov::genai::GenerationConfig& sampling_params); + GenerationHandle add_request(uint64_t request_id, const std::string& prompt, const std::vector& images, const ov::genai::GenerationConfig& sampling_params); void step(); @@ -139,7 +141,11 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline { // more high level interface, which can process multiple prompts in continuous batching manner std::vector generate(const std::vector& input_ids, const std::vector& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{}); std::vector generate(const std::vector& prompts, const std::vector& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{}); - + std::vector generate( + const std::vector& prompts, + const std::vector>& images, + const std::vector& sampling_params, + const StreamerVariant& streamer=std::monostate{}); /** * @brief start chat with keeping history in kv cache. * @param system_message optional system message. diff --git a/src/cpp/include/openvino/genai/visual_language/pipeline.hpp b/src/cpp/include/openvino/genai/visual_language/pipeline.hpp index b6b1d5c7f6..d8dfd6e930 100644 --- a/src/cpp/include/openvino/genai/visual_language/pipeline.hpp +++ b/src/cpp/include/openvino/genai/visual_language/pipeline.hpp @@ -190,8 +190,10 @@ class OPENVINO_GENAI_EXPORTS VLMPipeline { void set_generation_config(const GenerationConfig& new_config); private: + class VLMPipelineBase; class VLMPipelineImpl; - std::unique_ptr m_pimpl; + class VLMContinuousBatchingAdapter; + std::unique_ptr m_pimpl; }; /* diff --git a/src/cpp/src/block_manager.hpp b/src/cpp/src/block_manager.hpp index fa5c623f35..06f4f52052 100644 --- a/src/cpp/src/block_manager.hpp +++ b/src/cpp/src/block_manager.hpp @@ -677,11 +677,10 @@ class BlockManager { * Allocates a given number of KV cache blocks to a given sequence. * @param sequence The sequence for the blocks to be allocated to. * @param num_blocks The number of KV cache blocks to be allocated. - * @param prompt_ids Raw token values of the prompt for this sequence. Required if prefix caching is enabled. + * @param prompt_size Prompt size for this sequence. */ - void allocate(ov::genai::Sequence::Ptr sequence, size_t num_blocks, const ov::genai::TokenIds& prompt_ids = {}) { + void allocate(ov::genai::Sequence::Ptr sequence, size_t num_blocks, size_t prompt_size = 0) { OPENVINO_ASSERT(num_blocks > 0 && can_allocate_blocks(num_blocks)); - OPENVINO_ASSERT(!m_enable_prefix_caching || prompt_ids.size() > 0, "prompt_ids should be set for hash calculation."); auto sequence_id = sequence->get_id(); if (m_block_table.find(sequence_id) == m_block_table.end()) { @@ -689,7 +688,7 @@ class BlockManager { } auto& block_table = m_block_table[sequence_id][0]; - auto content_length = sequence->get_generated_len() + prompt_ids.size(); + auto content_length = sequence->get_generated_len() + prompt_size; size_t allocated_blocks = block_table.size(); // assuming all layers have the same number of allocated blocks size_t num_hashed_tokens = allocated_blocks * m_block_size; @@ -1016,7 +1015,7 @@ class BlockManager { if (num_logical_blocks > num_physical_blocks) { OPENVINO_ASSERT(can_allocate_blocks(num_logical_blocks - num_physical_blocks)); - allocate(sequence, num_logical_blocks - num_physical_blocks, seq_group->get_prompt_ids()); + allocate(sequence, num_logical_blocks - num_physical_blocks, seq_group->get_prompt_len()); } else { OPENVINO_ASSERT(num_logical_blocks == num_physical_blocks, "A number of physical and logic blocks must be the same in this code path"); diff --git a/src/cpp/src/continuous_batching_impl.cpp b/src/cpp/src/continuous_batching_impl.cpp index 6cc4824d7c..57b1aa780a 100644 --- a/src/cpp/src/continuous_batching_impl.cpp +++ b/src/cpp/src/continuous_batching_impl.cpp @@ -121,6 +121,20 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( initialize_pipeline(model, scheduler_config, device, properties, kv_cache_config); } +ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl( + const std::shared_ptr& model, + std::shared_ptr inputs_embedder, + const Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& properties, + const ov::genai::GenerationConfig& generation_config, + bool is_validation_mode_enabled) : ContinuousBatchingImpl(model, tokenizer, scheduler_config, device, properties, generation_config, is_validation_mode_enabled){ + m_inputs_embedder = inputs_embedder; + m_model_runner->set_inputs_embedder(inputs_embedder); + m_model_input_type = ModelInputType::EMBEDDINGS; +} + ContinuousBatchingPipeline::ContinuousBatchingImpl::~ContinuousBatchingImpl() { if (m_scheduler) { m_scheduler->release(); @@ -255,6 +269,9 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request SequenceGroup::Ptr sequence_group = std::make_shared(request_id, input_ids, sampling_params, m_block_size); if (m_scheduler->get_config().enable_prefix_caching) { + if (m_model_input_type == ModelInputType::EMBEDDINGS) { + OPENVINO_THROW("Prefix caching is not supported for VLM models."); + } m_scheduler->restore_cached_blocks(sequence_group); } @@ -270,12 +287,32 @@ GenerationHandle ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) { - static ManualTimer timer("tokenize"); - timer.start(); - ov::Tensor input_ids = m_tokenizer.encode(prompt).input_ids; - timer.end(); + ov::Tensor inputs; + ov::genai::VLMPerfMetrics metrics; + if (m_model_input_type == ModelInputType::TOKENS) { + static ManualTimer timer("tokenize"); + timer.start(); + inputs = m_tokenizer.encode(prompt).input_ids; + timer.end(); + return add_request(request_id, inputs, sampling_params); + } else if (m_model_input_type == ModelInputType::EMBEDDINGS) { + return add_request(request_id, prompt, {}, sampling_params); + } else { + OPENVINO_THROW("Unknown model input type."); + } + + return add_request(request_id, inputs, sampling_params); +} - return add_request(request_id, input_ids, sampling_params); +GenerationHandle +ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request_id, + const std::string& prompt, + const std::vector& rgbs, + GenerationConfig sampling_params) { + OPENVINO_ASSERT(m_model_input_type == ModelInputType::EMBEDDINGS, "Model doesn't support embeddings."); + ov::genai::VLMPerfMetrics metrics; + auto inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics); + return add_request(request_id, inputs, sampling_params); } bool ContinuousBatchingPipeline::ContinuousBatchingImpl::has_non_finished_requests() { diff --git a/src/cpp/src/continuous_batching_impl.hpp b/src/cpp/src/continuous_batching_impl.hpp index 1ee40ef73c..5b6817eb0b 100644 --- a/src/cpp/src/continuous_batching_impl.hpp +++ b/src/cpp/src/continuous_batching_impl.hpp @@ -7,6 +7,7 @@ #include "openvino/genai/lora_adapter.hpp" #include "cache_eviction.hpp" +#include "visual_language/inputs_embedder.hpp" namespace ov::genai { @@ -50,6 +51,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc std::shared_ptr m_cache_rotation_calculator; + #ifdef DEBUG_CACHE_STATE_DUMP size_t step_count = 0; #endif @@ -103,6 +105,15 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc const ov::AnyMap& properties, const ov::genai::GenerationConfig& generation_config, bool is_validation_mode_enabled = false); + + ContinuousBatchingImpl(const std::shared_ptr& model, + std::shared_ptr inputs_embedder, + const Tokenizer& tokenizer, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& properties, + const ov::genai::GenerationConfig& generation_config, + bool is_validation_mode_enabled = false); virtual ~ContinuousBatchingImpl(); @@ -114,6 +125,11 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc const std::string& prompt, ov::genai::GenerationConfig sampling_params) override; + GenerationHandle add_request(uint64_t request_id, + const std::string& prompt, + const std::vector& rgbs, + ov::genai::GenerationConfig sampling_params) override; + bool has_non_finished_requests() override; void step() override; @@ -123,6 +139,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc const std::vector& sampling_params, const StreamerVariant& streamer) override; + /** * Updates LoRA adapters for current generation call */ diff --git a/src/cpp/src/continuous_batching_pipeline.cpp b/src/cpp/src/continuous_batching_pipeline.cpp index 42dbffd439..4b3ef67235 100644 --- a/src/cpp/src/continuous_batching_pipeline.cpp +++ b/src/cpp/src/continuous_batching_pipeline.cpp @@ -15,6 +15,7 @@ #include "timer.hpp" #include "utils.hpp" #include "debug_utils.hpp" +#include "visual_language/inputs_embedder.hpp" using namespace ov::genai; @@ -43,6 +44,27 @@ inline float get_load_time(std::chrono::steady_clock::time_point start_time) { return std::chrono::duration_cast(stop_time - start_time).count(); } +inline bool ends_with(std::string str, std::string postfix) { + return str.rfind(postfix) == str.size() - postfix.size(); +} + +std::string get_directory(const std::string& s) { + // Linux-style separator + auto pos = s.find_last_of('/'); + if (pos != std::string::npos) { + return s.substr(0, pos ? pos : 1); + } + // Windows-style separator + pos = s.find_last_of('\\'); + if (pos != std::string::npos) { + return s.substr(0, pos); + } else if (s.empty()) { + return {}; + } else { + return {'.'}; + } +} + ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::path& models_path, const SchedulerConfig& scheduler_config, const std::string& device, @@ -54,9 +76,21 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::p auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model); auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model); - auto model = utils::singleton_core().read_model(models_path / "openvino_model.xml", {}, properties); - auto tokenizer = ov::genai::Tokenizer(models_path, tokenizer_properties); - auto generation_config = utils::from_config_json_if_exists(models_path); + std::filesystem::path model_path = models_path; + std::filesystem::path directory = models_path; + if (std::filesystem::exists(model_path / "openvino_model.xml")) { + model_path = model_path / "openvino_model.xml"; + } + else if (std::filesystem::exists(model_path / "openvino_language_model.xml")) { + model_path = model_path / "openvino_language_model.xml"; + } + else { + OPENVINO_THROW("Could not find a model in the directory."); + } + + auto model = utils::singleton_core().read_model(model_path, {}, properties); + auto tokenizer = ov::genai::Tokenizer(directory, tokenizer_properties); + auto generation_config = utils::from_config_json_if_exists(directory); if (is_prompt_lookup_enabled) { OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive"); @@ -64,7 +98,12 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::p } else if (draft_model_desr.model != nullptr) { auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config); m_impl = std::make_shared(main_model_descr, draft_model_desr); - } else { + } else if (std::filesystem::exists(directory / "openvino_text_embeddings_model.xml") ) { + auto vlm_config = ov::genai::VLMConfig{ utils::from_config_json_if_exists(directory, "config.json")}; + auto inputs_embedder = std::make_shared(vlm_config, directory, device, properties); + m_impl = std::make_shared(model, inputs_embedder, tokenizer, scheduler_config, device, properties, generation_config); + } + else { m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); } @@ -82,9 +121,19 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( auto properties_without_draft_model = properties; auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model); auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model); - std::filesystem::path openvino_model_name = "openvino_model.xml"; - auto model = utils::singleton_core().read_model(models_path / openvino_model_name, {}, properties_without_draft_model); - auto generation_config = utils::from_config_json_if_exists(models_path); + std::filesystem::path model_path = models_path; + std::filesystem::path directory = models_path; + if (std::filesystem::exists(model_path / "openvino_model.xml")) { + model_path = model_path / "openvino_model.xml"; + } + else if (std::filesystem::exists(model_path / "openvino_language_model.xml")) { + model_path = model_path / "openvino_language_model.xml"; + } + else { + OPENVINO_THROW("Could not find a model in the directory."); + } + auto model = utils::singleton_core().read_model(model_path, {}, properties_without_draft_model); + auto generation_config = utils::from_config_json_if_exists(directory); if (is_prompt_lookup_enabled) { OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive"); @@ -92,6 +141,10 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( } else if (draft_model_desr.model != nullptr) { auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config); m_impl = std::make_shared(main_model_descr, draft_model_desr); + } else if (std::filesystem::exists(directory / "openvino_text_embeddings_model.xml") ) { + auto vlm_config = ov::genai::VLMConfig{ utils::from_config_json_if_exists(directory, "config.json")}; + auto inputs_embedder = std::make_shared(vlm_config, directory, device, properties); + m_impl = std::make_shared(model, inputs_embedder, tokenizer, scheduler_config, device, properties, generation_config); } else { m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); } @@ -113,6 +166,7 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model); auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model); auto model = utils::singleton_core().read_model(model_str, weights_tensor); + auto directory = std::filesystem::path(get_directory(model_str)); if (is_prompt_lookup_enabled) { OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive"); @@ -120,6 +174,10 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( } else if (draft_model_desr.model != nullptr) { auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config); m_impl = std::make_shared(main_model_descr, draft_model_desr); + } else if (std::filesystem::exists(directory / "openvino_text_embeddings_model.xml")) { + auto vlm_config = ov::genai::VLMConfig{ utils::from_config_json_if_exists(directory, "config.json")}; + auto inputs_embedder = std::make_shared(vlm_config, directory, device, properties); + m_impl = std::make_shared(model, inputs_embedder, tokenizer, scheduler_config, device, properties, generation_config); } else { m_impl = std::make_shared(model, tokenizer, scheduler_config, device, properties, generation_config); } @@ -127,7 +185,7 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( m_impl->m_load_time_ms = get_load_time(start_time); } -ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() { +ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() const{ return m_impl->get_tokenizer(); } @@ -135,6 +193,10 @@ ov::genai::GenerationConfig ContinuousBatchingPipeline::get_config() const{ return m_impl->get_config(); } +void ContinuousBatchingPipeline::set_config(const ov::genai::GenerationConfig& config) { + m_impl->set_config(config); +} + PipelineMetrics ContinuousBatchingPipeline::get_metrics() const{ return m_impl->get_metrics(); } @@ -147,6 +209,10 @@ GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, co return m_impl->add_request(request_id, input_ids, sampling_params); } +GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, const std::string& prompt, const std::vector& images, const ov::genai::GenerationConfig& sampling_params) { + return m_impl->add_request(request_id, prompt, images, sampling_params); +} + void ContinuousBatchingPipeline::step() { m_impl->step(); } @@ -175,6 +241,15 @@ std::vector ContinuousBatchingPipeline::generate(const std::ve return decoded_results; } +std::vector ContinuousBatchingPipeline::generate( + const std::vector& prompts, + const std::vector>& images, + const std::vector& sampling_params, + const StreamerVariant& streamer) { + return m_impl->generate(prompts, images, sampling_params, streamer); +} + + void ContinuousBatchingPipeline::start_chat(const std::string& system_message) { m_impl->start_chat(system_message); }; diff --git a/src/cpp/src/icontinuous_batching.cpp b/src/cpp/src/icontinuous_batching.cpp index cd99b277ae..64b2a128d7 100644 --- a/src/cpp/src/icontinuous_batching.cpp +++ b/src/cpp/src/icontinuous_batching.cpp @@ -5,10 +5,17 @@ namespace ov::genai { +template struct overloaded : Ts... {using Ts::operator()...;}; +template overloaded(Ts...) -> overloaded; + GenerationConfig ContinuousBatchingPipeline::IContinuousBatchingPipeline::get_config() const { return m_generation_config; } +void ContinuousBatchingPipeline::IContinuousBatchingPipeline::set_config(const GenerationConfig& config) { + m_generation_config = config; +} + PipelineMetrics ContinuousBatchingPipeline::IContinuousBatchingPipeline::get_metrics() const { return m_pipeline_metrics; } @@ -18,6 +25,9 @@ Tokenizer ContinuousBatchingPipeline::IContinuousBatchingPipeline::get_tokenizer } void ContinuousBatchingPipeline::IContinuousBatchingPipeline::start_chat(const std::string& system_message) { + if (m_model_input_type == ModelInputType::EMBEDDINGS) { + OPENVINO_THROW("Chat mode is not supported."); + } if (!system_message.empty()) { m_history.push_back({{"role", "system"}, {"content", system_message}}); } @@ -34,6 +44,12 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( const std::vector& prompts, std::vector sampling_params, const StreamerVariant& streamer) { + if (m_model_input_type == ModelInputType::EMBEDDINGS) { + // TODO: remove this code and within model runner add check: if sequence group type is tokens, + // but embedding model is available => compute embeddings first, then pass to LLM + std::vector> images(prompts.size()); + return generate(prompts, images, sampling_params, streamer); + } std::vector input_ids; auto start_time = std::chrono::steady_clock::now(); @@ -117,6 +133,53 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( return decoded; } +std::vector +ContinuousBatchingPipeline::IContinuousBatchingPipeline::generate( + const std::vector& prompts, + const std::vector>& rgbs_vector, + const std::vector& sampling_params, + const StreamerVariant& streamer) { + // TODO: Add performance metrics + auto generate_start_time = std::chrono::steady_clock::now(); + OPENVINO_ASSERT(m_model_input_type == ModelInputType::EMBEDDINGS); + OPENVINO_ASSERT(!m_is_chat_conversation, "Chat mode is not supported."); + + OPENVINO_ASSERT(prompts.size() == sampling_params.size(), "Number of prompts should be equal to the number of generation configs."); + OPENVINO_ASSERT(prompts.size() == rgbs_vector.size(), "Number of prompts should be equal to the number of images vectors."); + + for (auto config: sampling_params) { + // If eos_token_id was not provided, take value from default m_generation_config + if (config.eos_token_id == -1) { + config.set_eos_token_id(m_generation_config.eos_token_id); + } + if (config.stop_token_ids.empty()) { + config.stop_token_ids = m_generation_config.stop_token_ids; + } + config.validate(); + } + + std::vector input_embeds_list; + for (size_t i = 0; i < prompts.size(); i++) { + auto prompt = prompts[i]; + auto rgbs = rgbs_vector[i]; + + VLMPerfMetrics perf_metrics; + input_embeds_list.emplace_back(m_inputs_embedder->get_inputs_embeds(prompt, rgbs, perf_metrics)); + } + std::vector results; + auto encoded_results = generate(input_embeds_list, sampling_params, streamer); + for (const auto& result: encoded_results) { + GenerationResult gen_result; + for (size_t idx = 0; idx < result.m_generation_ids.size(); ++idx) { + gen_result.m_generation_ids.push_back(m_tokenizer.decode(result.m_generation_ids.at(idx))); + gen_result.m_scores.push_back(result.m_scores.at(idx)); + gen_result.m_status = result.m_status; + } + results.emplace_back(gen_result); + } + return results; +} + void ContinuousBatchingPipeline::IContinuousBatchingPipeline::stream_tokens( const std::shared_ptr& streamer_ptr, const GenerationHandle& handle @@ -146,4 +209,5 @@ void ContinuousBatchingPipeline::IContinuousBatchingPipeline::stream_tokens( const auto tokens = generation_outputs.begin()->second.generated_ids; streamer_ptr->write(tokens); } + } diff --git a/src/cpp/src/icontinuous_batching.hpp b/src/cpp/src/icontinuous_batching.hpp index ea1c307a5a..dc7dc5acdf 100644 --- a/src/cpp/src/icontinuous_batching.hpp +++ b/src/cpp/src/icontinuous_batching.hpp @@ -13,6 +13,12 @@ namespace ov::genai { +enum class ModelInputType { + TOKENS, + EMBEDDINGS +}; + + /** * Base interface for all continuous batching based pipelines */ @@ -47,9 +53,13 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { // to access m_load_time_ms friend class ContinuousBatchingPipeline; + ModelInputType m_model_input_type = ModelInputType::TOKENS; + std::shared_ptr m_inputs_embedder; + void stream_tokens(const std::shared_ptr& streamer_ptr, const GenerationHandle& handle); public: GenerationConfig get_config() const; + void set_config(const GenerationConfig& config); PipelineMetrics get_metrics() const; Tokenizer get_tokenizer(); @@ -67,7 +77,16 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { virtual GenerationHandle add_request(uint64_t request_id, const std::string& prompt, GenerationConfig sampling_params) = 0; - + + /** + * Adds request to running queue based on string input and vector of images + * This step also performs tokenization's encode + */ + virtual GenerationHandle add_request(uint64_t request_id, + const std::string& prompt, + const std::vector& rgbs, + GenerationConfig sampling_params) = 0; + /** * Checks whether server (pipeline) has non-finished requests and step() should be called within a loop */ @@ -94,6 +113,13 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline { std::vector sampling_params, const StreamerVariant& streamer); + virtual std::vector + generate( + const std::vector& prompts, + const std::vector>& rgbs, + const std::vector& sampling_params, + const StreamerVariant& streamer); + /** * Starts chat with a given system prompt * diff --git a/src/cpp/src/model_runner.hpp b/src/cpp/src/model_runner.hpp index 1a161b03c8..4fd75ba224 100644 --- a/src/cpp/src/model_runner.hpp +++ b/src/cpp/src/model_runner.hpp @@ -8,6 +8,7 @@ #include +#include "visual_language/inputs_embedder.hpp" #include "debug_utils.hpp" #include "sequence_group.hpp" #include "scheduler.hpp" @@ -40,6 +41,9 @@ class ModelRunner { std::vector m_cache_rotation_deltas_for_each_layer; ov::Tensor m_cache_rotation_trig_lut; + std::shared_ptr m_inputs_embedder; + bool m_use_embeddings; + public: /** * Constructs the ModelRunner. @@ -75,6 +79,11 @@ class ModelRunner { return m_request; } + void set_inputs_embedder(std::shared_ptr embedder) { + m_use_embeddings = true; + m_inputs_embedder = embedder; + } + /** * @return A map of sequence IDs to vectors of ov::Tensor per-token attention scores. Each vector element is associated with its own * decoder layer, in order of their execution in the model. Each ov::Tensor has a shape of {N_k}, where N_k is the length of @@ -108,6 +117,13 @@ class ModelRunner { size_t batch_size_in_sequences = 0; size_t total_num_tokens = 0, total_num_blocks = 0; size_t max_context_len_val = 0; + size_t hidden_size = 0; + size_t num_generated_ids = 0; + OPENVINO_ASSERT(sequence_groups.size() > 0); + auto sequence_group_type = sequence_groups[0]->get_sequence_group_type(); + if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { + hidden_size = sequence_groups[0]->get_hidden_size(); + } // compute aggregated values for (size_t i = 0; i < num_sequence_groups; ++i) { @@ -118,10 +134,14 @@ class ModelRunner { total_num_tokens += sequence_group->get_num_scheduled_tokens() * num_sequences; total_num_blocks += sequence_group->get_num_blocks() * num_sequences; max_context_len_val = std::max(max_context_len_val, sequence_group->get_context_len()); + for (auto seq: sequence_group->get_running_sequences()) { + num_generated_ids += seq->get_generated_len(); + } } ov::Tensor input_ids(ov::element::i64, {total_num_tokens}), + inputs_embeds(ov::element::f32, {total_num_tokens, hidden_size}), position_ids(ov::element::i64, {total_num_tokens}), // PA specific parameters past_lens(ov::element::i32, {batch_size_in_sequences}), @@ -129,13 +149,48 @@ class ModelRunner { // block_indices are handled in a special fashion below block_indices_begins(ov::element::i32, {batch_size_in_sequences + 1}), max_context_len(ov::element::i32, {}); + + ov::Tensor generated_ids_embeds; + float *generated_ids_embeds_data = nullptr; max_context_len.data()[0] = max_context_len_val; // get raw pointers to copy to + float *inputs_embeds_data = nullptr; + int64_t *input_ids_data = nullptr; + + if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { + OPENVINO_ASSERT(m_use_embeddings, "Got sequence group with embeddings, but inputs embedder wasn't set."); + inputs_embeds_data = inputs_embeds.data(); + + ov::Tensor generated_ids = ov::Tensor(ov::element::i64, {1, num_generated_ids}); + int64_t *generated_ids_data = generated_ids.data(); + size_t pos = 0; + for (size_t i = 0; i < num_sequence_groups; ++i) { + size_t seq_group_id = scheduler_output.m_scheduled_sequence_groups_ids[i]; + SequenceGroup::CPtr sequence_group = sequence_groups[seq_group_id]; + for (auto seq: sequence_group->get_running_sequences()) { + auto generated_ids = seq->get_generated_ids(); + for (size_t token_idx = 0; token_idx < generated_ids.size(); token_idx++) { + generated_ids_data[pos] = generated_ids[token_idx]; + pos++; + } + } + } + if (pos > 0) { + // TODO: Compute embeddings only for last generated token, while previously generated embeddings save in SequenceGroup + auto embedder = m_inputs_embedder->get_embedding_model(); + generated_ids_embeds = embedder.infer(generated_ids); + generated_ids_embeds_data = generated_ids_embeds.data(); + } + + } else if (sequence_group_type == SequenceGroupType::TOKENS) { + input_ids_data = input_ids.data(); + } + int64_t - * input_ids_data = input_ids.data(), * position_ids_data = position_ids.data(); + int32_t * past_lens_data = past_lens.data(), * subsequence_begins_data = subsequence_begins.data(), @@ -174,9 +229,17 @@ class ModelRunner { Sequence::CPtr sequence = running_sequences[seq_id]; for (size_t token_id = 0, position_id = group_position_id; token_id < num_scheduled_tokens; ++token_id, ++position_id, ++gathering_current_index) { // compute token for current sequence - input_ids_data[token_id] = position_id < prompt_len ? - sequence_group->get_prompt_ids()[position_id] : - sequence->get_generated_ids()[position_id - prompt_len]; + if (sequence_group_type == SequenceGroupType::TOKENS) { + input_ids_data[token_id] = position_id < prompt_len ? + sequence_group->get_prompt_ids()[position_id] : + sequence->get_generated_ids()[position_id - prompt_len]; + } else if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { + auto embeds_pos = position_id < prompt_len ? 0 : hidden_size * (position_id - prompt_len); + const float* src = position_id < prompt_len ? sequence_group->get_input_embeds()[position_id].data() : generated_ids_embeds_data + embeds_pos; + std::copy_n(src, hidden_size, inputs_embeds_data + token_id * hidden_size); + } else { + OPENVINO_THROW("Unknown model inputs type."); + } position_ids_data[token_id] = position_id; @@ -204,7 +267,13 @@ class ModelRunner { block_indices_begins_data[1] = block_indices_begins_data[0] + num_blocks; // apply strides to shift to a next sequence - input_ids_data += num_scheduled_tokens; + if (sequence_group_type == SequenceGroupType::TOKENS) { + input_ids_data += num_scheduled_tokens; + } else if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { + inputs_embeds_data += num_scheduled_tokens * hidden_size; + generated_ids_embeds_data += sequence->get_generated_len() * hidden_size; + } + position_ids_data += num_scheduled_tokens; past_lens_data += 1; subsequence_begins_data += 1; @@ -213,8 +282,14 @@ class ModelRunner { sequence_group->set_output_seq_len(matmul_gathering_is_available ? output_seq_len : num_scheduled_tokens); } + if (sequence_group_type == SequenceGroupType::TOKENS) { + m_request.set_tensor("input_ids", input_ids); + } + else if (sequence_group_type == SequenceGroupType::EMBEDDINGS) { + m_request.set_tensor("inputs_embeds", inputs_embeds); + } + // typical LLM parameters - m_request.set_tensor("input_ids", input_ids); m_request.set_tensor("position_ids", position_ids); // PA specific parameters diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp index 03d41cc462..93856eb167 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.cpp @@ -28,6 +28,17 @@ ContinuousBatchingPipeline::PromptLookupImpl::add_request(uint64_t request_id, return m_pipeline->add_request(request_id, prompt, sampling_params); } +GenerationHandle +ContinuousBatchingPipeline::PromptLookupImpl::add_request(uint64_t request_id, + const std::string& prompt, + const std::vector& rgbs, + GenerationConfig sampling_params) { + OPENVINO_ASSERT(m_model_input_type == ModelInputType::EMBEDDINGS, "Model doesn't support embeddings."); + ov::genai::VLMPerfMetrics metrics; + auto inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics); + return add_request(request_id, inputs, sampling_params); +} + bool ContinuousBatchingPipeline::PromptLookupImpl::has_non_finished_requests() { return m_pipeline->has_non_finished_requests(); } diff --git a/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp b/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp index 0535931d81..7fbe56e9f2 100644 --- a/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp +++ b/src/cpp/src/prompt_lookup/prompt_lookup_impl.hpp @@ -36,7 +36,10 @@ class ContinuousBatchingPipeline::PromptLookupImpl : public ContinuousBatchingPi GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) override; - + GenerationHandle add_request(uint64_t request_id, + const std::string& prompt, + const std::vector& rgbs, + GenerationConfig sampling_params) override; bool has_non_finished_requests() override; void step() override; diff --git a/src/cpp/src/scheduler.hpp b/src/cpp/src/scheduler.hpp index 5d0e475a48..b51f08e587 100644 --- a/src/cpp/src/scheduler.hpp +++ b/src/cpp/src/scheduler.hpp @@ -287,7 +287,7 @@ class Scheduler { if (num_scheduled_tokens > 0) { // allocate KV blocks if required if (num_scheduled_blocks > 0) - m_block_manager->allocate(sequence, num_scheduled_blocks, sequence_group->get_prompt_ids()); + m_block_manager->allocate(sequence, num_scheduled_blocks, sequence_group->get_prompt_len()); // and schedule tokens sequence_group->schedule_tokens(num_scheduled_tokens); diff --git a/src/cpp/src/sequence_group.hpp b/src/cpp/src/sequence_group.hpp index c2861cdf18..7da341f43f 100644 --- a/src/cpp/src/sequence_group.hpp +++ b/src/cpp/src/sequence_group.hpp @@ -22,6 +22,11 @@ enum class SequenceStatus { WAITING = 3 }; +enum class SequenceGroupType { + TOKENS, + EMBEDDINGS +}; + using TokenIds = std::vector; using LogProbs = std::vector; class SequenceGroup; @@ -204,10 +209,12 @@ class SequenceGroup : public std::enable_shared_from_this { ov::genai::GenerationConfig m_sampling_params; std::size_t m_block_size; TokenIds m_prompt_ids; + std::vector> m_input_embeds; std::vector m_prompt_log_probs; GenerationStream::Ptr m_generation_stream; size_t m_num_evicted_tokens = 0; bool m_has_echoed = false; + SequenceGroupType m_sequence_group_type; uint64_t m_next_sequence_id = 0; @@ -252,11 +259,31 @@ class SequenceGroup : public std::enable_shared_from_this { SequenceGroup(uint64_t request_id, const ov::Tensor input_ids, const ov::genai::GenerationConfig& sampling_params, std::size_t block_size) : SequenceGroup(request_id, sampling_params, block_size) { - size_t prompt_len = input_ids.get_size(); + + size_t prompt_len; + if (input_ids.get_shape().size() > 1) { + prompt_len = input_ids.get_shape()[1]; + } else { + prompt_len = input_ids.get_size(); + } OPENVINO_ASSERT(prompt_len > 0, "Prompt length cannot be 0"); - m_prompt_ids.resize(prompt_len); - std::copy_n(input_ids.data(), prompt_len, m_prompt_ids.begin()); + if (input_ids.get_element_type() == ov::element::i64) { + m_prompt_ids.resize(prompt_len); + std::copy_n(input_ids.data(), prompt_len, m_prompt_ids.begin()); + m_sequence_group_type = SequenceGroupType::TOKENS; + } else if (input_ids.get_element_type() == ov::element::f32) { + auto embeds_len = input_ids.get_shape()[2]; + m_input_embeds.resize(prompt_len); + for (size_t i = 0; i < prompt_len; i++) { + m_input_embeds[i].resize(embeds_len); + std::copy_n(input_ids.data() + i * embeds_len, embeds_len, m_input_embeds[i].begin()); + } + m_sequence_group_type = SequenceGroupType::EMBEDDINGS; + } + else { + OPENVINO_THROW("Unknown tensor format."); + } m_prompt_log_probs.reserve(prompt_len); // create a single sequence @@ -277,7 +304,15 @@ class SequenceGroup : public std::enable_shared_from_this { } size_t get_prompt_len() const { - return m_prompt_ids.size(); + if (m_sequence_group_type == SequenceGroupType::EMBEDDINGS) { + return m_input_embeds.size(); + } + else if (m_sequence_group_type == SequenceGroupType::TOKENS) { + return m_prompt_ids.size(); + } + else { + OPENVINO_THROW("Not implemented."); + } } void pause_generation(bool status) { @@ -526,10 +561,25 @@ class SequenceGroup : public std::enable_shared_from_this { return m_prompt_ids; } + const std::vector>& get_input_embeds() const { + OPENVINO_ASSERT(m_sequence_group_type == SequenceGroupType::EMBEDDINGS); + return m_input_embeds; + } + + size_t get_hidden_size() const { + OPENVINO_ASSERT(m_sequence_group_type == SequenceGroupType::EMBEDDINGS); + OPENVINO_ASSERT(m_input_embeds.size() > 0, "Embeddings should be set to get hidden size."); + return m_input_embeds[0].size(); + } + void append_prompt_log_prob(float log_prob) { m_prompt_log_probs.push_back(log_prob); } + SequenceGroupType get_sequence_group_type() const { + return m_sequence_group_type; + } + /** * @return The number of logical KV cache blocks required to host all the tokens in this sequence group, taking into account previous token evictions. */ diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp index 0c4d4a2d0a..2407e4ef02 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.cpp @@ -121,6 +121,17 @@ ContinuousBatchingPipeline::SpeculativeDecodingImpl::add_request(uint64_t reques return m_main_pipeline->add_request(request_id, prompt, sampling_params); } +GenerationHandle +ContinuousBatchingPipeline::SpeculativeDecodingImpl::add_request(uint64_t request_id, + const std::string& prompt, + const std::vector& rgbs, + GenerationConfig sampling_params) { + OPENVINO_ASSERT(m_model_input_type == ModelInputType::EMBEDDINGS, "Model doesn't support embeddings."); + ov::genai::VLMPerfMetrics metrics; + auto inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics); + return add_request(request_id, inputs, sampling_params); +} + bool ContinuousBatchingPipeline::SpeculativeDecodingImpl::has_non_finished_requests() { return m_main_pipeline->has_non_finished_requests(); } diff --git a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp index 4023519287..612cf99598 100644 --- a/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp +++ b/src/cpp/src/speculative_decoding/speculative_decoding_impl.hpp @@ -58,6 +58,10 @@ class ContinuousBatchingPipeline::SpeculativeDecodingImpl : public ContinuousBat GenerationHandle add_request(uint64_t request_id, const std::string& prompt, ov::genai::GenerationConfig sampling_params) override; + GenerationHandle add_request(uint64_t request_id, + const std::string& prompt, + const std::vector& rgbs, + ov::genai::GenerationConfig sampling_params) override; bool has_non_finished_requests() override; diff --git a/src/cpp/src/utils.cpp b/src/cpp/src/utils.cpp index 7f7683d826..49691379fd 100644 --- a/src/cpp/src/utils.cpp +++ b/src/cpp/src/utils.cpp @@ -343,6 +343,9 @@ void trim_kv_cache(ov::InferRequest request, uint64_t remove_from_end, size_t se return; auto states = request.query_state(); + + OPENVINO_ASSERT(states.size() > 0, "Request contains no states."); + for (auto& state : states) { if(adapter_controller && adapter_controller->has_state_name(state.get_name())) continue; @@ -408,6 +411,28 @@ void print_compiled_model_properties(ov::CompiledModel& compiled_Model, const ch } } } + +const ModelsMap::mapped_type& get_model_weights_pair(const ModelsMap& models_map, const std::string& key) { + auto it = models_map.find(key); + if (it != models_map.end()) { + return it->second; + } + OPENVINO_THROW("Model with key '", key, "' not found in models map."); +} + +std::pair extract_scheduler_config(const ov::AnyMap& properties, std::optional default_config) { + ov::AnyMap plugin_config = properties; + auto it = plugin_config.find(ov::genai::scheduler_config.name()); + SchedulerConfig scheduler_config; + if (it != plugin_config.end()) { + scheduler_config = it->second.as(); + plugin_config.erase(it); + } else if (default_config.has_value()) { + scheduler_config = *default_config; + } + return {plugin_config, scheduler_config}; +}; + } // namespace utils } // namespace genai } // namespace ov diff --git a/src/cpp/src/utils.hpp b/src/cpp/src/utils.hpp index 2027d05a27..8594d1b833 100644 --- a/src/cpp/src/utils.hpp +++ b/src/cpp/src/utils.hpp @@ -8,6 +8,7 @@ #include #include "openvino/genai/llm_pipeline.hpp" +#include "openvino/genai/visual_language/pipeline.hpp" #include "openvino/runtime/core.hpp" #include "visual_language/processor_config.hpp" @@ -196,6 +197,12 @@ template struct overloaded : Ts... {using Ts::operator()...;}; template overloaded(Ts...) -> overloaded; std::shared_ptr create_streamer(StreamerVariant streamer, Tokenizer tokenizer); + +const ModelsMap::mapped_type& get_model_weights_pair(const ModelsMap& models_map, const std::string& key); + +std::pair extract_scheduler_config(const ov::AnyMap& properties, std::optional default_config = std::nullopt); + + } // namespace utils } // namespace genai } // namespace ov diff --git a/src/cpp/src/visual_language/continuous_batching_adapter.hpp b/src/cpp/src/visual_language/continuous_batching_adapter.hpp new file mode 100644 index 0000000000..7d734e9bcb --- /dev/null +++ b/src/cpp/src/visual_language/continuous_batching_adapter.hpp @@ -0,0 +1,68 @@ +// Copyright (C) 2023-2025 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "visual_language/pipeline_base.hpp" +#include "openvino/genai/continuous_batching_pipeline.hpp" + +using namespace ov::genai; + +class ov::genai::VLMPipeline::VLMContinuousBatchingAdapter : public ov::genai::VLMPipeline::VLMPipelineBase { +public: + ContinuousBatchingPipeline m_impl; + + VLMContinuousBatchingAdapter( + const std::filesystem::path& models_dir, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& properties + ): m_impl{ + models_dir, + scheduler_config, + device, + properties} { } + + VLMContinuousBatchingAdapter( + const ModelsMap& models_map, + const Tokenizer& tokenizer, + const std::filesystem::path& config_dir_path, + const SchedulerConfig& scheduler_config, + const std::string& device, + const ov::AnyMap& properties, + const ov::genai::GenerationConfig& generation_config + ): m_impl{ + "./", + scheduler_config, + device, + properties} { + OPENVINO_THROW("Not implemented."); + } + + VLMDecodedResults generate( + const std::string& prompt, + const std::vector& rgbs, + GenerationConfig generation_config, + const StreamerVariant& streamer + ) override { + auto result = m_impl.generate({prompt}, {rgbs}, {generation_config}, streamer)[0]; + VLMDecodedResults decoded; + for (size_t idx = 0; idx < result.m_generation_ids.size(); ++idx) { + decoded.texts.push_back(result.m_generation_ids.at(idx)); + decoded.scores.push_back(result.m_scores.at(idx)); + } + return decoded; + } + + virtual void start_chat(const std::string& system_message) override { m_impl.start_chat(system_message); }; + + virtual void finish_chat() override { m_impl.finish_chat(); }; + + virtual Tokenizer get_tokenizer() const override { return m_impl.get_tokenizer(); }; + + virtual void set_chat_template(const std::string& new_template) override { OPENVINO_THROW("Chat mode is not supported."); }; + + virtual GenerationConfig get_generation_config() const override { return m_impl.get_config(); }; + + virtual void set_generation_config(const GenerationConfig& new_config) override { m_impl.set_config(new_config); }; +}; diff --git a/src/cpp/src/visual_language/inputs_embedder.cpp b/src/cpp/src/visual_language/inputs_embedder.cpp index 8516c4c6e5..297edfe217 100644 --- a/src/cpp/src/visual_language/inputs_embedder.cpp +++ b/src/cpp/src/visual_language/inputs_embedder.cpp @@ -145,16 +145,16 @@ class InputsEmbedder::IInputsEmbedder { const ov::AnyMap device_config) : m_vlm_config{vlm_config}, m_vision_encoder( - get_model_weights_pair(models_map, "vision_embeddings").first, - get_model_weights_pair(models_map, "vision_embeddings").second, + utils::get_model_weights_pair(models_map, "vision_embeddings").first, + utils::get_model_weights_pair(models_map, "vision_embeddings").second, config_dir_path, m_vlm_config.model_type, device, device_config ), m_embedding( - get_model_weights_pair(models_map, "text_embeddings").first, - get_model_weights_pair(models_map, "text_embeddings").second, + utils::get_model_weights_pair(models_map, "text_embeddings").first, + utils::get_model_weights_pair(models_map, "text_embeddings").second, m_vlm_config.scale_emb, device, device_config @@ -334,8 +334,8 @@ class InputsEmbedderMiniCPM : public InputsEmbedder::IInputsEmbedder { const ov::AnyMap device_config) : IInputsEmbedder(vlm_config, models_map, tokenizer, config_dir_path, device, device_config) { m_resampler = utils::singleton_core().compile_model( - get_model_weights_pair(models_map, "resampler").first, - get_model_weights_pair(models_map, "resampler").second, + utils::get_model_weights_pair(models_map, "resampler").first, + utils::get_model_weights_pair(models_map, "resampler").second, device, device_config ).create_infer_request(); @@ -1619,8 +1619,8 @@ class InputsEmbedderQwen2VL : public InputsEmbedder::IInputsEmbedder { const ov::AnyMap device_config) : IInputsEmbedder(vlm_config, models_map, tokenizer, config_dir_path, device, device_config) { m_vision_embeddings_merger = utils::singleton_core().compile_model( - get_model_weights_pair(models_map, "vision_embeddings_merger").first, - get_model_weights_pair(models_map, "vision_embeddings_merger").second, + utils::get_model_weights_pair(models_map, "vision_embeddings_merger").first, + utils::get_model_weights_pair(models_map, "vision_embeddings_merger").second, device, device_config ).create_infer_request(); diff --git a/src/cpp/src/visual_language/pipeline.cpp b/src/cpp/src/visual_language/pipeline.cpp index 7786b0e3cc..4af2b76014 100644 --- a/src/cpp/src/visual_language/pipeline.cpp +++ b/src/cpp/src/visual_language/pipeline.cpp @@ -12,6 +12,8 @@ #include "visual_language/vlm_config.hpp" #include "visual_language/inputs_embedder.hpp" #include "visual_language/embedding_model.hpp" +#include "visual_language/pipeline_base.hpp" +#include "visual_language/continuous_batching_adapter.hpp" #include "sampler.hpp" #include "utils.hpp" @@ -20,24 +22,8 @@ using namespace ov::genai; -namespace { -} // namespace - -namespace ov::genai { - -const ModelsMap::mapped_type& get_model_weights_pair(const ModelsMap& models_map, const std::string& key) { - auto it = models_map.find(key); - if (it != models_map.end()) { - return it->second; - } - OPENVINO_THROW("Model with key '", key, "' not found in models map."); -} - -} - -class ov::genai::VLMPipeline::VLMPipelineImpl { -public: +class ov::genai::VLMPipeline::VLMPipelineImpl : public VLMPipelineBase{ // A config to follow for LLM input construction. VLMConfig m_vlm_config; // A config to follow for text generation. @@ -58,13 +44,11 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { bool m_is_chat_conversation; // InputsEmbedder std::shared_ptr m_inputs_embedder; - // Load pipeline time - float m_load_time_ms = 0; // Axis num in kv cache from m_language model, which contains information about history len size_t m_kv_cache_seq_length_axis = 2; // Component for applying sampling to lm outputs Sampler m_sampler; - +public: VLMPipelineImpl( const std::filesystem::path& models_dir, const std::string& device, @@ -107,6 +91,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { m_sampler.set_seed(m_generation_config.rng_seed); } + VLMPipelineImpl( const ModelsMap& models_map, const Tokenizer& tokenizer, @@ -129,7 +114,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { m_tokenizer = m_inputs_embedder->get_tokenizer(); m_embedding = m_inputs_embedder->get_embedding_model(); - auto m_language_pair = get_model_weights_pair(models_map, "language"); + auto m_language_pair = utils::get_model_weights_pair(models_map, "language"); m_language = utils::singleton_core().compile_model( m_language_pair.first, m_language_pair.second, device, properties ).create_infer_request(); @@ -150,7 +135,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { const std::vector& rgbs, GenerationConfig generation_config, const StreamerVariant& streamer - ) { + ) override { auto generate_start_time = std::chrono::steady_clock::now(); VLMPerfMetrics perf_metrics; auto& raw_counters = perf_metrics.raw_metrics; @@ -240,7 +225,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { // Common perf metrics auto& res_raw_counters = decoded.perf_metrics.raw_metrics; decoded.perf_metrics.num_input_tokens = prompt_ids.get_size(); - decoded.perf_metrics.load_time = m_load_time_ms; + decoded.perf_metrics.load_time = this->get_load_time(); res_raw_counters.generate_durations.emplace_back(PerfMetrics::get_microsec(generate_end_time - generate_start_time)); res_raw_counters.detokenization_durations.emplace_back(PerfMetrics::get_microsec(decode_end_time - decode_start_time)); res_raw_counters.tokenization_durations.insert(res_raw_counters.tokenization_durations.end(), raw_counters.tokenization_durations.begin(), raw_counters.tokenization_durations.end()); @@ -259,43 +244,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { return decoded; } - VLMDecodedResults generate( - const std::string& prompt, - const ov::AnyMap& config_map - ) { - auto image = config_map.find(ov::genai::image.name()); - auto images = config_map.find(ov::genai::images.name()); - OPENVINO_ASSERT( - config_map.end() == image || config_map.end() == images, - "Only one property can be set: image of images." - ); - std::vector rgbs; - if (config_map.end() != image) { - rgbs = {image->second.as()}; - } if (config_map.end() != images) { - if (images->second.is>()) { - rgbs = images->second.as>(); - } - else if (images->second.is()){ - rgbs = {images->second.as()}; - } - else { - OPENVINO_THROW("Unknown images type."); - } - } - ov::genai::OptionalGenerationConfig config_arg = utils::get_config_from_map(config_map); - GenerationConfig config = (config_arg.has_value()) ? *config_arg : get_generation_config(); - config.update_generation_config(config_map); - - return generate( - prompt, - rgbs, - config, - utils::get_streamer_from_map(config_map) - ); - } - - void start_chat(const std::string& system_message) { + void start_chat(const std::string& system_message) override { m_is_chat_conversation = true; bool have_state = 0 != m_language.get_tensor("attention_mask").get_size(); if (have_state) { @@ -307,7 +256,7 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { m_inputs_embedder->start_chat(system_message); } - void finish_chat() { + void finish_chat() override { m_is_chat_conversation = false; // Resetting state may be slow. m_language.reset_state(); @@ -316,20 +265,20 @@ class ov::genai::VLMPipeline::VLMPipelineImpl { m_inputs_embedder->finish_chat(); } - Tokenizer get_tokenizer() const { + Tokenizer get_tokenizer() const override { return m_tokenizer; } - void set_chat_template(const std::string& new_template) { + void set_chat_template(const std::string& new_template) override { OPENVINO_ASSERT(!m_is_chat_conversation, "Chat template cannot be changed once start_chat() is called. Please, finish current chat via finish_chat()"); m_tokenizer.set_chat_template(new_template); } - GenerationConfig get_generation_config() const { + GenerationConfig get_generation_config() const override { return m_generation_config; } - void set_generation_config(const GenerationConfig& new_config) { + void set_generation_config(const GenerationConfig& new_config) override { int64_t default_eos_token_id = m_generation_config.eos_token_id; auto default_stop_token_ids = m_generation_config.stop_token_ids; m_generation_config = new_config; @@ -351,9 +300,18 @@ VLMPipeline::VLMPipeline( const ov::AnyMap& properties ) { auto start_time = std::chrono::steady_clock::now(); - m_pimpl = std::make_unique(models_dir, device, properties); + + if (properties.find(ov::genai::scheduler_config.name()) != properties.end() || + properties.find(utils::DRAFT_MODEL_ARG_NAME) != properties.end() || + properties.find(ov::genai::prompt_lookup.name()) != properties.end()) { + auto [plugin_config, scheduler_config] = utils::extract_scheduler_config(properties); + m_pimpl = std::make_unique(models_dir, scheduler_config, device, plugin_config); + } + else { + m_pimpl = std::make_unique(models_dir, device, properties); + } auto stop_time = std::chrono::steady_clock::now(); - m_pimpl->m_load_time_ms = std::chrono::duration_cast(stop_time - start_time).count(); + m_pimpl->set_load_time(std::chrono::duration_cast(stop_time - start_time).count()); } VLMPipeline::VLMPipeline( @@ -365,9 +323,17 @@ VLMPipeline::VLMPipeline( const ov::genai::GenerationConfig& generation_config ) { auto start_time = std::chrono::steady_clock::now(); - m_pimpl = std::make_unique(models_map, tokenizer, config_dir_path, device, properties, generation_config); + if (properties.find(ov::genai::scheduler_config.name()) != properties.end() || + properties.find(utils::DRAFT_MODEL_ARG_NAME) != properties.end() || + properties.find(ov::genai::prompt_lookup.name()) != properties.end()) { + auto [plugin_config, scheduler_config] = utils::extract_scheduler_config(properties); + m_pimpl = std::make_unique(models_map, tokenizer, config_dir_path, scheduler_config, device, plugin_config, generation_config); + } + else { + m_pimpl = std::make_unique(models_map, tokenizer, config_dir_path, device, properties, generation_config); + } auto stop_time = std::chrono::steady_clock::now(); - m_pimpl->m_load_time_ms = std::chrono::duration_cast(stop_time - start_time).count(); + m_pimpl->set_load_time(std::chrono::duration_cast(stop_time - start_time).count()); } ov::genai::VLMPipeline::~VLMPipeline() = default; diff --git a/src/cpp/src/visual_language/pipeline_base.hpp b/src/cpp/src/visual_language/pipeline_base.hpp new file mode 100644 index 0000000000..6ee56f7e4e --- /dev/null +++ b/src/cpp/src/visual_language/pipeline_base.hpp @@ -0,0 +1,87 @@ +// Copyright (C) 2023-2025 Intel Corporation +// SPDX-License-Identifier: Apache-2.0 + +#pragma once + +#include "openvino/genai/visual_language/pipeline.hpp" +#include "utils.hpp" + +using namespace ov::genai; + +namespace { + +} // namespace + +namespace ov::genai { +class ov::genai::VLMPipeline::VLMPipelineBase { + // Load pipeline time + float m_load_time_ms = 0; +public: + + virtual ~VLMPipelineBase() = default; + + virtual VLMDecodedResults generate( + const std::string& prompt, + const std::vector& rgbs, + GenerationConfig generation_config, + const StreamerVariant& streamer + ) = 0; + + VLMDecodedResults generate( + const std::string& prompt, + const ov::AnyMap& config_map + ) { + auto image = config_map.find(ov::genai::image.name()); + auto images = config_map.find(ov::genai::images.name()); + OPENVINO_ASSERT( + config_map.end() == image || config_map.end() == images, + "Only one property can be set: image of images." + ); + std::vector rgbs; + if (config_map.end() != image) { + rgbs = {image->second.as()}; + } if (config_map.end() != images) { + if (images->second.is>()) { + rgbs = images->second.as>(); + } + else if (images->second.is()){ + rgbs = {images->second.as()}; + } + else { + OPENVINO_THROW("Unknown images type."); + } + } + + ov::genai::OptionalGenerationConfig config_arg = utils::get_config_from_map(config_map); + GenerationConfig config = (config_arg.has_value()) ? *config_arg : get_generation_config(); + config.update_generation_config(config_map); + + return generate( + prompt, + rgbs, + config, + utils::get_streamer_from_map(config_map) + ); + } + + virtual void start_chat(const std::string& system_message) = 0; + + virtual void finish_chat() = 0; + + virtual Tokenizer get_tokenizer() const = 0; + + virtual void set_chat_template(const std::string& new_template) = 0; + + virtual GenerationConfig get_generation_config() const = 0; + + virtual void set_generation_config(const GenerationConfig& new_config) = 0; + + void set_load_time(float load_time_ms) { + m_load_time_ms = load_time_ms; + } + + float get_load_time() { + return m_load_time_ms; + } +}; +} diff --git a/src/python/openvino_genai/__init__.py b/src/python/openvino_genai/__init__.py index b031b45957..9439ea2279 100644 --- a/src/python/openvino_genai/__init__.py +++ b/src/python/openvino_genai/__init__.py @@ -86,6 +86,7 @@ from .py_openvino_genai import ( ContinuousBatchingPipeline, GenerationResult, + GenerationStatus, SchedulerConfig, CacheEvictionConfig, AggregationMode diff --git a/src/python/openvino_genai/__init__.pyi b/src/python/openvino_genai/__init__.pyi index b3950af6b0..a1bd90c552 100644 --- a/src/python/openvino_genai/__init__.pyi +++ b/src/python/openvino_genai/__init__.pyi @@ -18,6 +18,7 @@ from openvino_genai.py_openvino_genai import EncodedResults from openvino_genai.py_openvino_genai import FluxTransformer2DModel from openvino_genai.py_openvino_genai import GenerationConfig from openvino_genai.py_openvino_genai import GenerationResult +from openvino_genai.py_openvino_genai import GenerationStatus from openvino_genai.py_openvino_genai import Generator from openvino_genai.py_openvino_genai import Image2ImagePipeline from openvino_genai.py_openvino_genai import ImageGenerationConfig @@ -49,5 +50,5 @@ from openvino_genai.py_openvino_genai import draft_model from openvino_genai.py_openvino_genai import get_version import os as os from . import py_openvino_genai -__all__ = ['Adapter', 'AdapterConfig', 'AggregationMode', 'AutoencoderKL', 'CLIPTextModel', 'CLIPTextModelWithProjection', 'CacheEvictionConfig', 'ChunkStreamerBase', 'ContinuousBatchingPipeline', 'CppStdGenerator', 'DecodedResults', 'EncodedResults', 'FluxTransformer2DModel', 'GenerationConfig', 'GenerationResult', 'Generator', 'Image2ImagePipeline', 'ImageGenerationConfig', 'ImageGenerationPerfMetrics', 'InpaintingPipeline', 'LLMPipeline', 'PerfMetrics', 'RawImageGenerationPerfMetrics', 'RawPerfMetrics', 'SD3Transformer2DModel', 'Scheduler', 'SchedulerConfig', 'StopCriteria', 'StreamerBase', 'StreamingStatus', 'T5EncoderModel', 'Text2ImagePipeline', 'TextStreamer', 'TokenizedInputs', 'Tokenizer', 'TorchGenerator', 'UNet2DConditionModel', 'VLMPipeline', 'WhisperGenerationConfig', 'WhisperPerfMetrics', 'WhisperPipeline', 'WhisperRawPerfMetrics', 'draft_model', 'get_version', 'openvino', 'os', 'py_openvino_genai'] +__all__ = ['Adapter', 'AdapterConfig', 'AggregationMode', 'AutoencoderKL', 'CLIPTextModel', 'CLIPTextModelWithProjection', 'CacheEvictionConfig', 'ChunkStreamerBase', 'ContinuousBatchingPipeline', 'CppStdGenerator', 'DecodedResults', 'EncodedResults', 'FluxTransformer2DModel', 'GenerationConfig', 'GenerationResult', 'GenerationStatus', 'Generator', 'Image2ImagePipeline', 'ImageGenerationConfig', 'ImageGenerationPerfMetrics', 'InpaintingPipeline', 'LLMPipeline', 'PerfMetrics', 'RawImageGenerationPerfMetrics', 'RawPerfMetrics', 'SD3Transformer2DModel', 'Scheduler', 'SchedulerConfig', 'StopCriteria', 'StreamerBase', 'StreamingStatus', 'T5EncoderModel', 'Text2ImagePipeline', 'TextStreamer', 'TokenizedInputs', 'Tokenizer', 'TorchGenerator', 'UNet2DConditionModel', 'VLMPipeline', 'WhisperGenerationConfig', 'WhisperPerfMetrics', 'WhisperPipeline', 'WhisperRawPerfMetrics', 'draft_model', 'get_version', 'openvino', 'os', 'py_openvino_genai'] __version__: str diff --git a/src/python/openvino_genai/py_openvino_genai.pyi b/src/python/openvino_genai/py_openvino_genai.pyi index 48a4c03aaa..5e2b70d5d6 100644 --- a/src/python/openvino_genai/py_openvino_genai.pyi +++ b/src/python/openvino_genai/py_openvino_genai.pyi @@ -391,11 +391,17 @@ class ContinuousBatchingPipeline: def add_request(self, request_id: int, prompt: str, generation_config: GenerationConfig) -> GenerationHandle: ... @typing.overload + def add_request(self, request_id: int, prompt: str, images: list[openvino._pyopenvino.Tensor], generation_config: GenerationConfig) -> GenerationHandle: + ... + @typing.overload def generate(self, input_ids: list[openvino._pyopenvino.Tensor], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], int | None] | StreamerBase | None = None) -> list[EncodedGenerationResult]: ... @typing.overload def generate(self, prompts: list[str], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], int | None] | StreamerBase | None = None) -> list[GenerationResult]: ... + @typing.overload + def generate(self, prompts: list[str], images: list[list[openvino._pyopenvino.Tensor]], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], int | None] | StreamerBase | None = None) -> list[GenerationResult]: + ... def get_config(self) -> GenerationConfig: ... def get_metrics(self) -> PipelineMetrics: diff --git a/src/python/py_continuous_batching_pipeline.cpp b/src/python/py_continuous_batching_pipeline.cpp index eba4b661f4..233bdbea06 100644 --- a/src/python/py_continuous_batching_pipeline.cpp +++ b/src/python/py_continuous_batching_pipeline.cpp @@ -278,6 +278,7 @@ void init_continuous_batching_pipeline(py::module_& m) { .def("get_metrics", &ContinuousBatchingPipeline::get_metrics) .def("add_request", py::overload_cast(&ContinuousBatchingPipeline::add_request), py::arg("request_id"), py::arg("input_ids"), py::arg("generation_config")) .def("add_request", py::overload_cast(&ContinuousBatchingPipeline::add_request), py::arg("request_id"), py::arg("prompt"), py::arg("generation_config")) + .def("add_request", py::overload_cast&, const ov::genai::GenerationConfig&>(&ContinuousBatchingPipeline::add_request), py::arg("request_id"), py::arg("prompt"), py::arg("images"), py::arg("generation_config")) .def("step", &ContinuousBatchingPipeline::step) .def("has_non_finished_requests", &ContinuousBatchingPipeline::has_non_finished_requests) @@ -308,5 +309,27 @@ void init_continuous_batching_pipeline(py::module_& m) { py::arg("prompts"), py::arg("generation_config"), py::arg("streamer") = std::monostate{} + ) + + .def( + "generate", + [](ContinuousBatchingPipeline& pipe, + const std::vector& prompts, + const std::vector>& images, + const std::vector& generation_config, + const pyutils::PyBindStreamerVariant& py_streamer + ) -> py::typing::Union> { + ov::genai::StreamerVariant streamer = pyutils::pystreamer_to_streamer(py_streamer); + std::vector generated_results; + { + py::gil_scoped_release rel; + generated_results = pipe.generate(prompts, images, generation_config, streamer); + } + return py::cast(generated_results); + }, + py::arg("prompts"), + py::arg("images"), + py::arg("generation_config"), + py::arg("streamer") = std::monostate{} ); } diff --git a/tests/python_tests/test_vlm_pipeline.py b/tests/python_tests/test_vlm_pipeline.py index 102809ac84..414e26c176 100644 --- a/tests/python_tests/test_vlm_pipeline.py +++ b/tests/python_tests/test_vlm_pipeline.py @@ -6,10 +6,10 @@ import pytest import transformers from optimum.intel.openvino import OVModelForVisualCausalLM -from openvino_genai import VLMPipeline, GenerationConfig +from openvino_genai import VLMPipeline, GenerationConfig, SchedulerConfig, ContinuousBatchingPipeline, GenerationStatus from utils.network import retry_request -from utils.generation_config import get_beam_search, get_multinomial_all_parameters +from utils.generation_config import get_beam_search, get_multinomial_all_parameters, get_greedy from utils.constants import get_default_llm_properties def get_ov_model(model_id, cache): @@ -93,6 +93,125 @@ def streamer(word: str) -> bool: assert res.texts[0] == ''.join(result_from_streamer) +configs = [ + get_greedy(), + get_beam_search(), +] + +@pytest.mark.precommit +@pytest.mark.nightly +@pytest.mark.parametrize("config", configs) +def test_vlm_continuous_batching_generate_vs_add_request(config, cache): + scheduler_config = SchedulerConfig() + models_path = get_ov_model(model_ids[0], cache) + ov_pipe = VLMPipeline(models_path, "CPU", scheduler_config=scheduler_config, **get_default_llm_properties()) + generation_config = config + generation_config.max_new_tokens = 30 + image_links_list = [ + [], + [image_links[0]] + ] + + res_generate = [] + for links in image_links_list: + images = [] + for link in links: + images.append(get_image_by_link(link)) + + res_generate.append(ov_pipe.generate(prompts[0], images=images, generation_config=generation_config)) + + cb_pipe = ContinuousBatchingPipeline(models_path, scheduler_config=scheduler_config, device="CPU", properties=get_default_llm_properties()) + tokenizer = cb_pipe.get_tokenizer() + + for idx, links in enumerate(image_links_list): + images = [] + for link in links: + images.append(get_image_by_link(link)) + handle = cb_pipe.add_request(idx, prompts[0], images, generation_config) + while handle.get_status() != GenerationStatus.FINISHED: + cb_pipe.step() + outputs = handle.read_all() + for out_idx, output in enumerate(outputs): + text = tokenizer.decode(output.generated_ids) + assert text == res_generate[idx].texts[out_idx] + assert output.score == res_generate[idx].scores[out_idx] + + +@pytest.mark.precommit +@pytest.mark.nightly +@pytest.mark.parametrize("config", configs) +def test_vlm_continuous_batching_vs_stateful(config, cache): + scheduler_config = SchedulerConfig() + models_path = get_ov_model(model_ids[0], cache) + cb_pipe = ContinuousBatchingPipeline(models_path, scheduler_config=scheduler_config, device="CPU", properties=get_default_llm_properties()) + generation_config = config + generation_config.max_new_tokens = 25 + eps = 0.001 + image_links_list = [ + [], + [image_links[0]] + ] + + res_cb = [] + for links in image_links_list: + images = [] + for link in links: + images.append(get_image_by_link(link)) + + res_cb.append(cb_pipe.generate([prompts[0]], images=[images], generation_config=[generation_config])) + + models_path = get_ov_model(model_ids[0], cache) + for idx, links in enumerate(image_links_list): + stateful_pipe = VLMPipeline(models_path, "CPU", **get_default_llm_properties()) + + images = [] + for link in links: + images.append(get_image_by_link(link)) + + res_stateful = stateful_pipe.generate(prompts[0], images=images, generation_config=generation_config) + for out_idx, text in enumerate(res_stateful.texts): + assert text == res_cb[idx][0].m_generation_ids[out_idx] + assert abs(res_stateful.scores[out_idx] - res_cb[idx][0].m_scores[out_idx]) < eps + + + +@pytest.mark.precommit +@pytest.mark.nightly +@pytest.mark.parametrize("config", configs) +def test_vlm_with_scheduler_vs_default(config, cache): + scheduler_config = SchedulerConfig() + models_path = get_ov_model(model_ids[0], cache) + cb_pipe = VLMPipeline(models_path, "CPU", scheduler_config=scheduler_config, **get_default_llm_properties()) + generation_config = config + generation_config.max_new_tokens = 25 + eps = 0.001 + image_links_list = [ + [], + [image_links[0]] + ] + + res_cb = [] + for links in image_links_list: + images = [] + for link in links: + images.append(get_image_by_link(link)) + + res_cb.append(cb_pipe.generate(prompts[0], images=images, generation_config=generation_config)) + + models_path = get_ov_model(model_ids[0], cache) + for idx, links in enumerate(image_links_list): + stateful_pipe = VLMPipeline(models_path, "CPU", **get_default_llm_properties()) + + images = [] + for link in links: + images.append(get_image_by_link(link)) + + res_stateful = stateful_pipe.generate(prompts[0], images=images, generation_config=generation_config) + for out_idx, text in enumerate(res_stateful.texts): + assert text == res_cb[idx].texts[out_idx] + assert abs(res_stateful.scores[out_idx] - res_cb[idx].scores[out_idx]) < eps + + @pytest.mark.precommit @pytest.mark.nightly @pytest.mark.parametrize("model_id", model_ids)