Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Continuous Batching in VLM [Draft] #1704

Open
wants to merge 25 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/cpp/include/openvino/genai/continuous_batching_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,10 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {
const ov::genai::GenerationConfig& generation_config = {}
);

ov::genai::Tokenizer get_tokenizer();
ov::genai::Tokenizer get_tokenizer() const;

ov::genai::GenerationConfig get_config() const;
void set_config(const ov::genai::GenerationConfig& config);

/**
* Allows to get the current pipeline metrics.
Expand All @@ -131,6 +132,7 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {

GenerationHandle add_request(uint64_t request_id, const ov::Tensor& input_ids, const ov::genai::GenerationConfig& sampling_params);
GenerationHandle add_request(uint64_t request_id, const std::string& prompt, const ov::genai::GenerationConfig& sampling_params);
GenerationHandle add_request(uint64_t request_id, const std::string& prompt, const std::vector<ov::Tensor>& images, const ov::genai::GenerationConfig& sampling_params);

void step();

Expand All @@ -139,7 +141,11 @@ class OPENVINO_GENAI_EXPORTS ContinuousBatchingPipeline {
// more high level interface, which can process multiple prompts in continuous batching manner
std::vector<EncodedGenerationResult> generate(const std::vector<ov::Tensor>& input_ids, const std::vector<ov::genai::GenerationConfig>& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{});
std::vector<GenerationResult> generate(const std::vector<std::string>& prompts, const std::vector<ov::genai::GenerationConfig>& sampling_params, const ov::genai::StreamerVariant& streamer=std::monostate{});

std::vector<GenerationResult> generate(
const std::vector<std::string>& prompts,
const std::vector<std::vector<ov::Tensor>>& images,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer=std::monostate{});
/**
* @brief start chat with keeping history in kv cache.
* @param system_message optional system message.
Expand Down
4 changes: 3 additions & 1 deletion src/cpp/include/openvino/genai/visual_language/pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,10 @@ class OPENVINO_GENAI_EXPORTS VLMPipeline {
void set_generation_config(const GenerationConfig& new_config);

private:
class VLMPipelineBase;
class VLMPipelineImpl;
std::unique_ptr<VLMPipelineImpl> m_pimpl;
class VLMContinuousBatchingAdapter;
std::unique_ptr<VLMPipelineBase> m_pimpl;
};

/*
Expand Down
9 changes: 4 additions & 5 deletions src/cpp/src/block_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -677,19 +677,18 @@ class BlockManager {
* Allocates a given number of KV cache blocks to a given sequence.
* @param sequence The sequence for the blocks to be allocated to.
* @param num_blocks The number of KV cache blocks to be allocated.
* @param prompt_ids Raw token values of the prompt for this sequence. Required if prefix caching is enabled.
* @param prompt_size Prompt size for this sequence.
*/
void allocate(ov::genai::Sequence::Ptr sequence, size_t num_blocks, const ov::genai::TokenIds& prompt_ids = {}) {
void allocate(ov::genai::Sequence::Ptr sequence, size_t num_blocks, size_t prompt_size = 0) {
OPENVINO_ASSERT(num_blocks > 0 && can_allocate_blocks(num_blocks));
OPENVINO_ASSERT(!m_enable_prefix_caching || prompt_ids.size() > 0, "prompt_ids should be set for hash calculation.");

auto sequence_id = sequence->get_id();
if (m_block_table.find(sequence_id) == m_block_table.end()) {
m_block_table[sequence_id].resize(m_num_layers);
}

auto& block_table = m_block_table[sequence_id][0];
auto content_length = sequence->get_generated_len() + prompt_ids.size();
auto content_length = sequence->get_generated_len() + prompt_size;
size_t allocated_blocks = block_table.size(); // assuming all layers have the same number of allocated blocks
size_t num_hashed_tokens = allocated_blocks * m_block_size;

Expand Down Expand Up @@ -1016,7 +1015,7 @@ class BlockManager {

if (num_logical_blocks > num_physical_blocks) {
OPENVINO_ASSERT(can_allocate_blocks(num_logical_blocks - num_physical_blocks));
allocate(sequence, num_logical_blocks - num_physical_blocks, seq_group->get_prompt_ids());
allocate(sequence, num_logical_blocks - num_physical_blocks, seq_group->get_prompt_len());
} else {
OPENVINO_ASSERT(num_logical_blocks == num_physical_blocks, "A number of physical and logic blocks must be the same in this code path");

Expand Down
47 changes: 42 additions & 5 deletions src/cpp/src/continuous_batching_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,20 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl(
initialize_pipeline(model, scheduler_config, device, properties, kv_cache_config);
}

ContinuousBatchingPipeline::ContinuousBatchingImpl::ContinuousBatchingImpl(
const std::shared_ptr<ov::Model>& model,
std::shared_ptr<InputsEmbedder> inputs_embedder,
const Tokenizer& tokenizer,
const SchedulerConfig& scheduler_config,
const std::string& device,
const ov::AnyMap& properties,
const ov::genai::GenerationConfig& generation_config,
bool is_validation_mode_enabled) : ContinuousBatchingImpl(model, tokenizer, scheduler_config, device, properties, generation_config, is_validation_mode_enabled){
m_inputs_embedder = inputs_embedder;
m_model_runner->set_inputs_embedder(inputs_embedder);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we can set only EmbeddingModel as whole inputs embedder is not required for LLM part

m_model_input_type = ModelInputType::EMBEDDINGS;
}

ContinuousBatchingPipeline::ContinuousBatchingImpl::~ContinuousBatchingImpl() {
if (m_scheduler) {
m_scheduler->release();
Expand Down Expand Up @@ -255,6 +269,9 @@ ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request
SequenceGroup::Ptr sequence_group = std::make_shared<SequenceGroup>(request_id, input_ids, sampling_params, m_block_size);

if (m_scheduler->get_config().enable_prefix_caching) {
if (m_model_input_type == ModelInputType::EMBEDDINGS) {
OPENVINO_THROW("Prefix caching is not supported for VLM models.");
}
m_scheduler->restore_cached_blocks(sequence_group);
}

Expand All @@ -270,12 +287,32 @@ GenerationHandle
ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request_id,
const std::string& prompt,
ov::genai::GenerationConfig sampling_params) {
static ManualTimer timer("tokenize");
timer.start();
ov::Tensor input_ids = m_tokenizer.encode(prompt).input_ids;
timer.end();
ov::Tensor inputs;
ov::genai::VLMPerfMetrics metrics;
if (m_model_input_type == ModelInputType::TOKENS) {
static ManualTimer timer("tokenize");
timer.start();
inputs = m_tokenizer.encode(prompt).input_ids;
timer.end();
return add_request(request_id, inputs, sampling_params);
} else if (m_model_input_type == ModelInputType::EMBEDDINGS) {
return add_request(request_id, prompt, {}, sampling_params);
} else {
OPENVINO_THROW("Unknown model input type.");
}

return add_request(request_id, inputs, sampling_params);
}

return add_request(request_id, input_ids, sampling_params);
GenerationHandle
ContinuousBatchingPipeline::ContinuousBatchingImpl::add_request(uint64_t request_id,
const std::string& prompt,
const std::vector<ov::Tensor>& rgbs,
GenerationConfig sampling_params) {
OPENVINO_ASSERT(m_model_input_type == ModelInputType::EMBEDDINGS, "Model doesn't support embeddings.");
ov::genai::VLMPerfMetrics metrics;
auto inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics);
return add_request(request_id, inputs, sampling_params);
}

bool ContinuousBatchingPipeline::ContinuousBatchingImpl::has_non_finished_requests() {
Expand Down
17 changes: 17 additions & 0 deletions src/cpp/src/continuous_batching_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#include "openvino/genai/lora_adapter.hpp"
#include "cache_eviction.hpp"
#include "visual_language/inputs_embedder.hpp"

namespace ov::genai {

Expand Down Expand Up @@ -50,6 +51,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc

std::shared_ptr<ov::genai::CacheRotationCalculator> m_cache_rotation_calculator;


#ifdef DEBUG_CACHE_STATE_DUMP
size_t step_count = 0;
#endif
Expand Down Expand Up @@ -103,6 +105,15 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
const ov::AnyMap& properties,
const ov::genai::GenerationConfig& generation_config,
bool is_validation_mode_enabled = false);

ContinuousBatchingImpl(const std::shared_ptr<ov::Model>& model,
std::shared_ptr<InputsEmbedder> inputs_embedder,
const Tokenizer& tokenizer,
const SchedulerConfig& scheduler_config,
const std::string& device,
const ov::AnyMap& properties,
const ov::genai::GenerationConfig& generation_config,
bool is_validation_mode_enabled = false);

virtual ~ContinuousBatchingImpl();

Expand All @@ -114,6 +125,11 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
const std::string& prompt,
ov::genai::GenerationConfig sampling_params) override;

GenerationHandle add_request(uint64_t request_id,
const std::string& prompt,
const std::vector<ov::Tensor>& rgbs,
ov::genai::GenerationConfig sampling_params) override;

bool has_non_finished_requests() override;

void step() override;
Expand All @@ -123,6 +139,7 @@ class ContinuousBatchingPipeline::ContinuousBatchingImpl : public ContinuousBatc
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) override;


/**
* Updates LoRA adapters for current generation call
*/
Expand Down
91 changes: 83 additions & 8 deletions src/cpp/src/continuous_batching_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include "timer.hpp"
#include "utils.hpp"
#include "debug_utils.hpp"
#include "visual_language/inputs_embedder.hpp"

using namespace ov::genai;

Expand Down Expand Up @@ -43,6 +44,27 @@ inline float get_load_time(std::chrono::steady_clock::time_point start_time) {
return std::chrono::duration_cast<std::chrono::milliseconds>(stop_time - start_time).count();
}

inline bool ends_with(std::string str, std::string postfix) {
return str.rfind(postfix) == str.size() - postfix.size();
}

std::string get_directory(const std::string& s) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this function and one above, are not required.

// Linux-style separator
auto pos = s.find_last_of('/');
if (pos != std::string::npos) {
return s.substr(0, pos ? pos : 1);
}
// Windows-style separator
pos = s.find_last_of('\\');
if (pos != std::string::npos) {
return s.substr(0, pos);
} else if (s.empty()) {
return {};
} else {
return {'.'};
}
}

ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::path& models_path,
const SchedulerConfig& scheduler_config,
const std::string& device,
Expand All @@ -54,17 +76,34 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline( const std::filesystem::p
auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model);
auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model);

auto model = utils::singleton_core().read_model(models_path / "openvino_model.xml", {}, properties);
auto tokenizer = ov::genai::Tokenizer(models_path, tokenizer_properties);
auto generation_config = utils::from_config_json_if_exists(models_path);
std::filesystem::path model_path = models_path;
std::filesystem::path directory = models_path;
if (std::filesystem::exists(model_path / "openvino_model.xml")) {
model_path = model_path / "openvino_model.xml";
}
else if (std::filesystem::exists(model_path / "openvino_language_model.xml")) {
model_path = model_path / "openvino_language_model.xml";
}
else {
OPENVINO_THROW("Could not find a model in the directory.");
}

auto model = utils::singleton_core().read_model(model_path, {}, properties);
auto tokenizer = ov::genai::Tokenizer(directory, tokenizer_properties);
auto generation_config = utils::from_config_json_if_exists(directory);

if (is_prompt_lookup_enabled) {
OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive");
m_impl = std::make_shared<PromptLookupImpl>(model, tokenizer, scheduler_config, device, properties_without_draft_model, generation_config);
} else if (draft_model_desr.model != nullptr) {
auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config);
m_impl = std::make_shared<SpeculativeDecodingImpl>(main_model_descr, draft_model_desr);
} else {
} else if (std::filesystem::exists(directory / "openvino_text_embeddings_model.xml") ) {
auto vlm_config = ov::genai::VLMConfig{ utils::from_config_json_if_exists<VLMConfig>(directory, "config.json")};
auto inputs_embedder = std::make_shared<InputsEmbedder>(vlm_config, directory, device, properties);
m_impl = std::make_shared<ContinuousBatchingImpl>(model, inputs_embedder, tokenizer, scheduler_config, device, properties, generation_config);
}
else {
m_impl = std::make_shared<ContinuousBatchingImpl>(model, tokenizer, scheduler_config, device, properties, generation_config);
}

Expand All @@ -82,16 +121,30 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline(
auto properties_without_draft_model = properties;
auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model);
auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model);
std::filesystem::path openvino_model_name = "openvino_model.xml";
auto model = utils::singleton_core().read_model(models_path / openvino_model_name, {}, properties_without_draft_model);
auto generation_config = utils::from_config_json_if_exists(models_path);
std::filesystem::path model_path = models_path;
std::filesystem::path directory = models_path;
if (std::filesystem::exists(model_path / "openvino_model.xml")) {
model_path = model_path / "openvino_model.xml";
}
else if (std::filesystem::exists(model_path / "openvino_language_model.xml")) {
model_path = model_path / "openvino_language_model.xml";
}
else {
OPENVINO_THROW("Could not find a model in the directory.");
}
auto model = utils::singleton_core().read_model(model_path, {}, properties_without_draft_model);
auto generation_config = utils::from_config_json_if_exists(directory);

if (is_prompt_lookup_enabled) {
OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive");
m_impl = std::make_shared<PromptLookupImpl>(model, tokenizer, scheduler_config, device, properties_without_draft_model, generation_config);
} else if (draft_model_desr.model != nullptr) {
auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config);
m_impl = std::make_shared<SpeculativeDecodingImpl>(main_model_descr, draft_model_desr);
} else if (std::filesystem::exists(directory / "openvino_text_embeddings_model.xml") ) {
auto vlm_config = ov::genai::VLMConfig{ utils::from_config_json_if_exists<VLMConfig>(directory, "config.json")};
auto inputs_embedder = std::make_shared<InputsEmbedder>(vlm_config, directory, device, properties);
m_impl = std::make_shared<ContinuousBatchingImpl>(model, inputs_embedder, tokenizer, scheduler_config, device, properties, generation_config);
} else {
m_impl = std::make_shared<ContinuousBatchingImpl>(model, tokenizer, scheduler_config, device, properties, generation_config);
}
Expand All @@ -113,28 +166,37 @@ ContinuousBatchingPipeline::ContinuousBatchingPipeline(
auto draft_model_desr = extract_draft_model_from_config(properties_without_draft_model);
auto is_prompt_lookup_enabled = extract_prompt_lookup_from_config(properties_without_draft_model);
auto model = utils::singleton_core().read_model(model_str, weights_tensor);
auto directory = std::filesystem::path(get_directory(model_str));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

model_str is a content of XML file. So, we cannot extract directory here.

Since 2025.0 release, IR frontend inserts __weights_path as runtime info for ov::Model (see openvinotoolkit/openvino#29101), so I think we can try to check this information and restore directory where model was read from


if (is_prompt_lookup_enabled) {
OPENVINO_ASSERT(draft_model_desr.model == nullptr, "Speculative decoding and prompt lookup decoding are mutually exclusive");
m_impl = std::make_shared<PromptLookupImpl>(model, tokenizer, scheduler_config, device, properties_without_draft_model, generation_config);
} else if (draft_model_desr.model != nullptr) {
auto main_model_descr = ov::genai::ModelDesc(model, tokenizer, device, properties_without_draft_model, scheduler_config, generation_config);
m_impl = std::make_shared<SpeculativeDecodingImpl>(main_model_descr, draft_model_desr);
} else if (std::filesystem::exists(directory / "openvino_text_embeddings_model.xml")) {
auto vlm_config = ov::genai::VLMConfig{ utils::from_config_json_if_exists<VLMConfig>(directory, "config.json")};
auto inputs_embedder = std::make_shared<InputsEmbedder>(vlm_config, directory, device, properties);
m_impl = std::make_shared<ContinuousBatchingImpl>(model, inputs_embedder, tokenizer, scheduler_config, device, properties, generation_config);
} else {
m_impl = std::make_shared<ContinuousBatchingImpl>(model, tokenizer, scheduler_config, device, properties, generation_config);
}

m_impl->m_load_time_ms = get_load_time(start_time);
}

ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() {
ov::genai::Tokenizer ContinuousBatchingPipeline::get_tokenizer() const{
return m_impl->get_tokenizer();
}

ov::genai::GenerationConfig ContinuousBatchingPipeline::get_config() const{
return m_impl->get_config();
}

void ContinuousBatchingPipeline::set_config(const ov::genai::GenerationConfig& config) {
m_impl->set_config(config);
}

PipelineMetrics ContinuousBatchingPipeline::get_metrics() const{
return m_impl->get_metrics();
}
Expand All @@ -147,6 +209,10 @@ GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, co
return m_impl->add_request(request_id, input_ids, sampling_params);
}

GenerationHandle ContinuousBatchingPipeline::add_request(uint64_t request_id, const std::string& prompt, const std::vector<ov::Tensor>& images, const ov::genai::GenerationConfig& sampling_params) {
return m_impl->add_request(request_id, prompt, images, sampling_params);
}

void ContinuousBatchingPipeline::step() {
m_impl->step();
}
Expand Down Expand Up @@ -175,6 +241,15 @@ std::vector<GenerationResult> ContinuousBatchingPipeline::generate(const std::ve
return decoded_results;
}

std::vector<GenerationResult> ContinuousBatchingPipeline::generate(
const std::vector<std::string>& prompts,
const std::vector<std::vector<ov::Tensor>>& images,
const std::vector<GenerationConfig>& sampling_params,
const StreamerVariant& streamer) {
return m_impl->generate(prompts, images, sampling_params, streamer);
}


void ContinuousBatchingPipeline::start_chat(const std::string& system_message) {
m_impl->start_chat(system_message);
};
Expand Down
Loading
Loading