Skip to content

Commit

Permalink
[ Test ][ PR4 ] Splitting & Refactoring Common.py (#1722)
Browse files Browse the repository at this point in the history
Details:
* Implement `run_ov_pipeline` specified by `pipeline_type`

Tickets:
* [159925](https://jira.devtools.intel.com/browse/CVS-159925)

MERGE AFTER:
* #1691
* #1702
* #1718

---------

Co-authored-by: Ilya Lavrenov <ilya.lavrenov@intel.com>
  • Loading branch information
iefode and ilya-lavrenov authored Feb 22, 2025
1 parent 4d97d69 commit 3d3d44f
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 89 deletions.
3 changes: 3 additions & 0 deletions src/python/openvino_genai/py_openvino_genai.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,9 @@ class ContinuousBatchingPipeline:
@typing.overload
def generate(self, prompts: list[str], generation_config: list[GenerationConfig], streamer: typing.Callable[[str], int | None] | StreamerBase | None = None) -> list[GenerationResult]:
...
@typing.overload
def generate(self, prompt: str, generation_config: GenerationConfig, streamer: typing.Callable[[str], int | None] | StreamerBase | None = None) -> list[GenerationResult]:
...
def get_config(self) -> GenerationConfig:
...
def get_metrics(self) -> PipelineMetrics:
Expand Down
16 changes: 16 additions & 0 deletions src/python/py_continuous_batching_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,5 +308,21 @@ void init_continuous_batching_pipeline(py::module_& m) {
py::arg("prompts"),
py::arg("generation_config"),
py::arg("streamer") = std::monostate{}
)

.def(
"generate",
[](ContinuousBatchingPipeline& pipe,
const std::string& prompt,
const ov::genai::GenerationConfig& generation_config,
const pyutils::PyBindStreamerVariant& streamer
) -> py::typing::Union<std::vector<ov::genai::GenerationResult>> {
std::vector<std::string> prompts = { prompts };
std::vector<ov::genai::GenerationConfig> generation_configs = { generation_config };
return __call_cb_generate(pipe, prompts, generation_configs, streamer);
},
py::arg("prompt"),
py::arg("generation_config"),
py::arg("streamer") = std::monostate{}
);
}
97 changes: 18 additions & 79 deletions tests/python_tests/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from utils.constants import get_default_llm_properties
from utils.hugging_face import download_and_convert_model, run_hugging_face
from utils.comparation import compare_generation_results
from utils.ov_genai_pipelines import dict_to_scheduler_config
from utils.ov_genai_pipelines import dict_to_scheduler_config, run_ov_pipeline, StreamerWithResults, PipelineType

TESTS_ROOT = Path(__file__).parent

Expand Down Expand Up @@ -46,46 +46,13 @@ def run_continuous_batching(
) -> List[GenerationResult]:
if type(generation_configs) is not list:
generation_configs = [generation_configs] * len(prompts)

cb_pipe = ContinuousBatchingPipeline(models_path, scheduler_config=scheduler_config, device='CPU', tokenizer_properties={}, properties=get_default_llm_properties())
output = cb_pipe.generate(prompts, generation_configs)

del cb_pipe
shutil.rmtree(models_path)

return output


def get_models_list_from_path(file_name: str):
models = []
with open(file_name) as f:
for model_name in f:
model_name = model_name.strip()
# skip comment in model scope file
if model_name.startswith('#'):
continue
models.append(model_name)
return models


class StreamerWithResults:
# Return a streamer which accumulates results in order to compare with results returned from generate.
results: List[str] = []
def __init__(self):
self.results = []

def accumulate(self, subword) -> bool:
self.results.append(subword)
return False

def get_results(self) -> List[GenerationResult]:
streaming_result = GenerationResult()
streaming_result.m_generation_ids = [''.join(self.results)]
return [streaming_result]

def reset(self):
self.results = []

return run_ov_pipeline(models_path=models_path,
prompt=prompts,
generation_config=generation_configs,
pipeline_type=PipelineType.CONTINIOUS_BATCHING,
scheduler_config=scheduler_config,
ov_config=get_default_llm_properties())


def run_llm_pipeline(
Expand All @@ -96,44 +63,12 @@ def run_llm_pipeline(
streamer: StreamerWithResults | Callable | StreamerBase = None
) -> List[GenerationResult]:
properties = get_default_llm_properties()
if use_cb:
properties['scheduler_config'] = SchedulerConfig()
ov_pipe = LLMPipeline(models_path, device='CPU', **properties)

if streamer is None and not (generation_config.is_beam_search() or generation_config.num_return_sequences > 1) and len(prompts) == 1:
# We can use streamer only if we have a single prompt and not beam search.
streamer = StreamerWithResults()
if isinstance(streamer, StreamerWithResults):
# Clear the accumulated strings to avoid side effects
streamer.reset()

generate_outputs : DecodedResults = ov_pipe.generate(
inputs=prompts,
generation_config=generation_config,
streamer=streamer.accumulate if isinstance(streamer, StreamerWithResults) else streamer
)

index = 0
generation_results = []

for _ in prompts:
generation_result = GenerationResult()

generation_result.m_generation_ids = generate_outputs.texts[index : index + generation_config.num_return_sequences]
# sequences_scores are available only for beam search case
if generation_config.is_beam_search():
generation_result.m_scores = generate_outputs.scores[index : index + generation_config.num_return_sequences]
generation_results.append(generation_result)

index += generation_config.num_return_sequences

del ov_pipe
shutil.rmtree(models_path)

if isinstance(streamer, StreamerWithResults):
compare_generation_results(prompts, generation_results, streamer.get_results(), generation_config)

return generation_results
return run_ov_pipeline(models_path=models_path,
prompt=prompts,
generation_config=generation_config,
pipeline_type=(PipelineType.PAGED_ATTENTION if use_cb else PipelineType.STATEFUL),
streamer=streamer,
ov_config=properties)


def run_llm_pipeline_with_ref(model_id: str,
Expand Down Expand Up @@ -175,7 +110,11 @@ def run_cb_pipeline_with_ref(tmp_path: str,


# TODO: remove after Generator property is supported by LLMPipeline / VLMPipeline
def generate_and_compare_with_reference_text(models_path: Path, prompts: List[str], reference_texts_per_prompt: List[List[str]], generation_configs: List[GenerationConfig], scheduler_config: SchedulerConfig):
def generate_and_compare_with_reference_text(models_path: Path,
prompts: List[str],
reference_texts_per_prompt: List[List[str]],
generation_configs: List[GenerationConfig],
scheduler_config: SchedulerConfig):
ov_results : List[GenerationResult] = run_continuous_batching(models_path, scheduler_config, prompts, generation_configs)

assert len(prompts) == len(reference_texts_per_prompt)
Expand Down
8 changes: 4 additions & 4 deletions tests/python_tests/test_llm_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,14 +140,14 @@ def test_chat_scenario(model_id, intpus):
opt_model, hf_tokenizer, models_path = download_and_convert_model(model_id)
ov_pipe = create_ov_pipeline(models_path)

generation_config_kwargs, system_massage = intpus
generation_config_kwargs, system_message = intpus

ov_generation_config = ov_genai.GenerationConfig(**generation_config_kwargs)
hf_generation_config = generation_config_to_hf(opt_model.generation_config, ov_generation_config)

ov_pipe.start_chat(system_massage)
chat_history_hf.append({"role": "system", "content": system_massage})
chat_history_ov.append({"role": "system", "content": system_massage})
ov_pipe.start_chat(system_message)
chat_history_hf.append({"role": "system", "content": system_message})
chat_history_ov.append({"role": "system", "content": system_message})
for prompt in questions:
chat_history_hf.append({'role': 'user', 'content': prompt})
chat_history_ov.append({'role': 'user', 'content': prompt})
Expand Down
8 changes: 4 additions & 4 deletions tests/python_tests/test_sampling.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ def test_greedy(tmp_path, generation_config, prompt, use_cb):
prompt = prompt.decode('unicode_escape') if isinstance(prompt, bytes) else prompt

run_llm_pipeline_with_ref(model_id=model_id,
prompts=[prompt],
generation_config=generation_config,
tmp_path=tmp_path,
use_cb=use_cb)
prompts=[prompt],
generation_config=generation_config,
tmp_path=tmp_path,
use_cb=use_cb)


@pytest.mark.precommit
Expand Down
118 changes: 117 additions & 1 deletion tests/python_tests/utils/ov_genai_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@

from enum import Enum
from pathlib import Path
from typing import List, Callable
from shutil import rmtree

from openvino_genai import SchedulerConfig, draft_model, ContinuousBatchingPipeline, LLMPipeline
from openvino_genai import SchedulerConfig, draft_model, ContinuousBatchingPipeline, \
LLMPipeline, GenerationConfig, GenerationResult, StreamerBase, DecodedResults

from utils.constants import get_default_llm_properties
from utils.comparation import compare_generation_results

def dict_to_scheduler_config(scheduler_params: dict = None) -> SchedulerConfig:
scheduler_config = SchedulerConfig()
Expand Down Expand Up @@ -38,6 +42,25 @@ class PipelineType(Enum):
PROMPT_LOOKUP_DECODING = 5


class StreamerWithResults:
# Return a streamer which accumulates results in order to compare with results returned from generate.
results: List[str] = []
def __init__(self):
self.results = []

def accumulate(self, subword) -> bool:
self.results.append(subword)
return False

def get_results(self) -> List[GenerationResult]:
streaming_result = GenerationResult()
streaming_result.m_generation_ids = [''.join(self.results)]
return [streaming_result]

def reset(self):
self.results = []


def create_ov_pipeline(models_path: Path,
pipeline_type: PipelineType = PipelineType.PAGED_ATTENTION,
device: str = "CPU",
Expand All @@ -57,3 +80,96 @@ def create_ov_pipeline(models_path: Path,
else:
raise Exception(f"Unsupported pipeline type: {pipeline_type}")


def prepare_generation_config_by_pipe_type(generation_config : GenerationConfig,
pipeline_type: PipelineType = PipelineType.PAGED_ATTENTION):
if pipeline_type == PipelineType.SPECULATIVE_DECODING:
generation_config.assistant_confidence_threshold = 0.9
elif pipeline_type == PipelineType.PROMPT_LOOKUP_DECODING:
generation_config.num_assistant_tokens = 5
generation_config.max_ngram_size = 3
return generation_config

def prepare_generation_configs_by_pipe_type(generation_configs : List[GenerationConfig],
pipeline_type: PipelineType = PipelineType.PAGED_ATTENTION):
return [ prepare_generation_config_by_pipe_type(generation_config, pipeline_type) for generation_config in generation_configs ]


def convert_decoded_results_to_generation_result(generate_outputs: DecodedResults,
num_prompts: int,
num_return_sequences: int,
is_beam_search: bool) -> List[GenerationResult]:
index = 0
generation_results = []

for _ in range(num_prompts):
generation_result = GenerationResult()

generation_result.m_generation_ids = generate_outputs.texts[index : index + num_return_sequences]
# sequences_scores are available only for beam search case
if is_beam_search:
generation_result.m_scores = generate_outputs.scores[index : index + num_return_sequences]
generation_results.append(generation_result)

index += num_return_sequences
return generation_results


def run_ov_pipeline(models_path: Path,
prompt : str | List[str],
generation_config : GenerationConfig | List[GenerationConfig],
pipeline_type : PipelineType = PipelineType.PAGED_ATTENTION,
streamer: StreamerWithResults | Callable | StreamerBase = None,
scheduler_config: SchedulerConfig = SchedulerConfig(),
draft_model: draft_model = None,
ov_config: dict = {},
device: str = "CPU"
) -> List[GenerationResult]:
# update the generation config according pipeline_type
updated_generation_config = None
if isinstance(generation_config, List):
if pipeline_type != PipelineType.CONTINIOUS_BATCHING:
raise Exception(f"\'generation_config\' is \'List[GenerationConfig]\'. This type is supported only for \'PipelineType.CONTINIOUS_BATCHING\'! Please change pipeline_type or generation_config type!")
assert isinstance(prompt, List)
assert len(generation_config) == len(prompt)
updated_generation_config = prepare_generation_configs_by_pipe_type(generation_config, pipeline_type)
else:
updated_generation_config = prepare_generation_config_by_pipe_type(generation_config, pipeline_type)

# checking streamer
if isinstance(prompt, str):
if streamer is None and not (generation_config.is_beam_search() or generation_config.num_return_sequences > 1) and len(prompts) == 1:
# We can use streamer only if we have a single prompt and not beam search.
streamer = StreamerWithResults()
if isinstance(streamer, StreamerWithResults):
# Clear the accumulated strings to avoid side effects
streamer.reset()
else:
assert streamer is None

# create pipeline and generate results
ov_pipe = create_ov_pipeline(models_path=models_path,
pipeline_type=pipeline_type,
device=device,
ov_config=ov_config,
scheduler_config=scheduler_config,
draft_model=draft_model)
generation_results = ov_pipe.generate(prompt, updated_generation_config, streamer)

# convert results to `List[GenerationResult]`
if isinstance(generation_results, DecodedResults):
assert isinstance(generation_config, GenerationConfig)
num_prompts = 1 if isinstance(prompt, str) else len(prompt)
generation_results = convert_decoded_results_to_generation_result(generation_results, num_prompts, generation_config.num_return_sequences, generation_config.is_beam_search())

# cleanup test artifacts
del ov_pipe
rmtree(models_path)

# compare streaming results with generated results
if isinstance(streamer, StreamerWithResults):
prompts = [ prompt ] if isinstance(prompt, str) else prompt
compare_generation_results(prompts, generation_results, streamer.get_results(), generation_config)

return generation_results

2 changes: 1 addition & 1 deletion tests/python_tests/utils/tokenizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ def delete_rt_info(configs: List[Tuple], temp_path):
del rt_info[modified_key]
except KeyError:
pass
openvino.save_model(tokenizer, model_path)
openvino.save_model(tokenizer, model_path)

0 comments on commit 3d3d44f

Please sign in to comment.