From d820deb53283da31556ebee07638b7c763da4919 Mon Sep 17 00:00:00 2001 From: Will Lin Date: Wed, 7 Aug 2024 17:35:21 -0700 Subject: [PATCH] format, remove profiler --- benchmarks/backend_request_func.py | 6 +- benchmarks/benchmark_serving.py | 46 --------------- vllm/engine/async_llm_engine.py | 12 +--- vllm/entrypoints/openai/api_server.py | 14 ----- vllm/sequence.py | 3 +- vllm/worker/multi_step_model_runner.py | 82 +++++++++++++++----------- vllm/worker/multi_step_worker.py | 14 ++--- vllm/worker/worker.py | 9 --- 8 files changed, 60 insertions(+), 126 deletions(-) diff --git a/benchmarks/backend_request_func.py b/benchmarks/backend_request_func.py index 10f2f5ab2f9b..8593e2b538c2 100644 --- a/benchmarks/backend_request_func.py +++ b/benchmarks/backend_request_func.py @@ -224,9 +224,9 @@ async def async_request_openai_completions( pbar: Optional[tqdm] = None, ) -> RequestFuncOutput: api_url = request_func_input.api_url - # assert api_url.endswith( - # "completions" - # ), "OpenAI Completions API URL must end with 'completions'." + assert api_url.endswith( + "completions" + ), "OpenAI Completions API URL must end with 'completions'." async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session: assert not request_func_input.use_beam_search payload = { diff --git a/benchmarks/benchmark_serving.py b/benchmarks/benchmark_serving.py index 62b47addd6f9..fc0dbf77f16b 100644 --- a/benchmarks/benchmark_serving.py +++ b/benchmarks/benchmark_serving.py @@ -295,7 +295,6 @@ def calculate_metrics( async def benchmark( backend: str, api_url: str, - base_url: str, model_id: str, tokenizer: PreTrainedTokenizerBase, input_requests: List[Tuple[str, int, int]], @@ -303,7 +302,6 @@ async def benchmark( use_beam_search: bool, request_rate: float, disable_tqdm: bool, - profile: bool, ): if backend in ASYNC_REQUEST_FUNCS: request_func = ASYNC_REQUEST_FUNCS[backend] @@ -328,24 +326,6 @@ async def benchmark( f"are correctly specified. Error: {test_output.error}") else: print("Initial test run completed. Starting main benchmark run...") - - if profile: - print("Starting profiler") - profile_input = RequestFuncInput( - model=model_id, - prompt=test_prompt, - api_url=base_url + "/start_profile", - prompt_len=test_prompt_len, - output_len=test_output_len, - best_of=best_of, - use_beam_search=use_beam_search, - ) - profile_output = await request_func(request_func_input=profile_input) - if profile_output.success: - print("profiler started") - else: - print("profiler failed to start") - print(f"Traffic request rate: {request_rate}") pbar = None if disable_tqdm else tqdm(total=len(input_requests)) @@ -369,23 +349,6 @@ async def benchmark( pbar=pbar))) outputs: List[RequestFuncOutput] = await asyncio.gather(*tasks) - if profile: - print("Stopping profiler") - profile_input = RequestFuncInput( - model=model_id, - prompt=test_prompt, - api_url=base_url + "/stop_profile", - prompt_len=test_prompt_len, - output_len=test_output_len, - best_of=best_of, - use_beam_search=use_beam_search, - ) - profile_output = await request_func(request_func_input=profile_input) - if profile_output.success: - print("profiler stopped") - else: - print("profiler failed to stop") - if pbar is not None: pbar.close() @@ -470,10 +433,8 @@ def main(args: argparse.Namespace): if args.base_url is not None: api_url = f"{args.base_url}{args.endpoint}" - base_url = f"{args.base_url}" else: api_url = f"http://{args.host}:{args.port}{args.endpoint}" - base_url = f"http://{args.host}:{args.port}" tokenizer = get_tokenizer(tokenizer_id, trust_remote_code=args.trust_remote_code) @@ -545,7 +506,6 @@ def main(args: argparse.Namespace): benchmark( backend=backend, api_url=api_url, - base_url=base_url, model_id=model_id, tokenizer=tokenizer, input_requests=input_requests, @@ -553,7 +513,6 @@ def main(args: argparse.Namespace): use_beam_search=args.use_beam_search, request_rate=args.request_rate, disable_tqdm=args.disable_tqdm, - profile=args.profile, )) # Save config and results to json @@ -734,11 +693,6 @@ def main(args: argparse.Namespace): action="store_true", help="Specify to disable tqdm progress bar.", ) - parser.add_argument( - "--profile", - action="store_true", - help="Use Torch Profiler", - ) parser.add_argument( "--save-result", action="store_true", diff --git a/vllm/engine/async_llm_engine.py b/vllm/engine/async_llm_engine.py index 25294e8bfeac..61f35cf83429 100644 --- a/vllm/engine/async_llm_engine.py +++ b/vllm/engine/async_llm_engine.py @@ -309,14 +309,14 @@ async def step_async( virtual_engine].last_output # make sure we don't incur a GPU->CPU transfer if we don't need to if (self.parallel_config.pipeline_parallel_size > 1 - and cached_last_output is not None - and cached_last_output.sampled_token_ids_numpy is not None): + and cached_last_output is not None and + cached_last_output.sampled_token_ids_numpy is not None): last_sampled_token_ids = \ torch.from_numpy(cached_last_output.sampled_token_ids_numpy) # last_sampled_token_ids = \ # torch.Tensor(cached_last_output.sampled_token_ids_numpy).long() # \ - # cached_last_output.sampled_token_ids.cpu() + # cached_last_output.sampled_token_ids.cpu() else: last_sampled_token_ids = None @@ -515,12 +515,6 @@ def __init__(self, # Lazy initialized fields self._request_tracker: RequestTracker - def start_profile(self): - self.engine.model_executor._run_workers('start_profile') - - def stop_profile(self): - self.engine.model_executor._run_workers('stop_profile') - @classmethod def _get_executor_cls( cls, engine_config: EngineConfig) -> Type[ExecutorAsyncBase]: diff --git a/vllm/entrypoints/openai/api_server.py b/vllm/entrypoints/openai/api_server.py index e2de166dfd41..d44604b12fb6 100644 --- a/vllm/entrypoints/openai/api_server.py +++ b/vllm/entrypoints/openai/api_server.py @@ -196,20 +196,6 @@ async def show_version(): return JSONResponse(content=ver) -@router.post("/start_profile") -async def start_profile(): - engine.start_profile() - print('Profile started') - return JSONResponse(content={"message": "Profile started"}) - - -@router.post("/stop_profile") -async def stop_profile(): - engine.stop_profile() - print('Profile stopped') - return JSONResponse(content={"message": "Profile stopped"}) - - @router.post("/v1/chat/completions") async def create_chat_completion(request: ChatCompletionRequest, raw_request: Request): diff --git a/vllm/sequence.py b/vllm/sequence.py index 27f1c58b1afc..ae5d94fa3f5c 100644 --- a/vllm/sequence.py +++ b/vllm/sequence.py @@ -1140,4 +1140,5 @@ def clone( previous_hidden_states=self.previous_hidden_states, num_steps=self.num_steps, finished_requests_ids=self.finished_requests_ids, - last_sampled_token_ids=self.last_sampled_token_ids.clone()) + last_sampled_token_ids=self.last_sampled_token_ids.clone() + if self.last_sampled_token_ids is not None else None) diff --git a/vllm/worker/multi_step_model_runner.py b/vllm/worker/multi_step_model_runner.py index f9f7f49b8d3e..eeddd72afb57 100644 --- a/vllm/worker/multi_step_model_runner.py +++ b/vllm/worker/multi_step_model_runner.py @@ -26,7 +26,6 @@ CompletionSequenceGroupOutput, Logprob) from vllm import _custom_ops as ops - import torch logger = init_logger(__name__) @@ -48,39 +47,48 @@ class ModelOutput: sampled_token_ids: Optional[torch.Tensor] = None pythonized: bool = False - def pythonize(self, input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", - copy_stream: torch.cuda.Stream, - pinned_sampled_token_buffer: torch.Tensor) -> None: + def pythonize( + self, + input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", + copy_stream: torch.cuda.Stream, + pinned_sampled_token_buffer: torch.Tensor) -> None: """Pythonize the output. Blocking.""" if not self.pythonized: self._pythonize_sampler_output_wait_on_event( input_metadata, copy_stream, pinned_sampled_token_buffer) self.pythonized = True - def maybe_pythonize(self, - input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", - copy_stream: torch.cuda.Stream, - pinned_sampled_token_buffer: torch.Tensor) -> None: + def maybe_pythonize( + self, + input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", + copy_stream: torch.cuda.Stream, + pinned_sampled_token_buffer: torch.Tensor) -> None: """Pythonize the output if ready, else return None. Non-blocking.""" if not self.pythonized: self.pythonized = self._pythonize_sampler_output_if_event_ready( input_metadata, copy_stream, pinned_sampled_token_buffer) def _pythonize_sampler_output_wait_on_event( - self, input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", + self, + input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", copy_stream: torch.cuda.Stream, pinned_sampled_token_buffer: torch.Tensor) -> None: self.sampler_output_ready_event.synchronize() with torch.cuda.stream(copy_stream): - _pythonize_sampler_output(input_metadata, self.sampler_output, pinned_sampled_token_buffer, self.sampled_token_ids) + _pythonize_sampler_output(input_metadata, self.sampler_output, + pinned_sampled_token_buffer, + self.sampled_token_ids) def _pythonize_sampler_output_if_event_ready( - self, input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", + self, + input_metadata: "MutableModelInputForGPUWithMultiStepMetadata", copy_stream: torch.cuda.Stream, pinned_sampled_token_buffer: torch.Tensor) -> bool: if self.sampler_output_ready_event.query(): with torch.cuda.stream(copy_stream): - _pythonize_sampler_output(input_metadata, self.sampler_output, pinned_sampled_token_buffer, self.sampled_token_ids) + _pythonize_sampler_output(input_metadata, self.sampler_output, + pinned_sampled_token_buffer, + self.sampled_token_ids) return True return False @@ -142,7 +150,9 @@ def wait_previous_step(self): self.step_cuda_events[(self.current_step + 1) % 2].wait() self.step_cuda_events[(self.current_step + 1) % 2] = None - def add_sampler_output(self, sampler_output: SamplerOutput, sampled_token_ids: Optional[torch.Tensor] = None): + def add_sampler_output(self, + sampler_output: SamplerOutput, + sampled_token_ids: Optional[torch.Tensor] = None): self.outputs.append( ModelOutput(sampler_output=sampler_output, sampler_output_ready_event=None, @@ -253,7 +263,6 @@ def execute_model( print(f'current_step: {model_input.current_step}') print(f'is_first_multi_step: {model_input.is_first_multi_step}') - # make sure we skip the sampler on the lask rank and only pythonize # if CPU is ahead. if self.is_driver_worker and get_pp_group().is_last_rank: @@ -262,12 +271,14 @@ def execute_model( self.pinned_sampled_token_ids = torch.zeros( (self.scheduler_config.max_num_seqs, 1), dtype=torch.long, - device="cpu", pin_memory=True) + device="cpu", + pin_memory=True) self._base_model_runner.model.sampler.include_gpu_probs_tensor = True frozen_model_input.sampling_metadata.skip_sampler_cpu_output = True for output in model_input.outputs: - output.maybe_pythonize(model_input, self._copy_stream, self.pinned_sampled_token_ids) + output.maybe_pythonize(model_input, self._copy_stream, + self.pinned_sampled_token_ids) # some pre-execute model logic for multi-step: # - if it's the first step, we need to reset the sampling tensors @@ -288,8 +299,6 @@ def execute_model( if frozen_model_input.sampling_metadata: frozen_model_input.sampling_metadata.reuse_sampling_tensors = False - - # Execute the model output = self._base_model_runner.execute_model(frozen_model_input, kv_caches, @@ -309,10 +318,12 @@ def execute_model( output_ready_event = torch.cuda.Event() output_ready_event.record(current_stream) if self.parallel_config.pipeline_parallel_size > 1: - output[0].sampled_token_ids_numpy = output[0].sampled_token_ids.numpy(force=True) + output[0].sampled_token_ids_numpy = output[ + 0].sampled_token_ids.numpy(force=True) # output[0].sampled_token_ids_numpy = output[0].sampled_token_ids.tolist() model_input.outputs.append( - ModelOutput(output[0], output_ready_event, output[0].sampled_token_ids, False)) + ModelOutput(output[0], output_ready_event, + output[0].sampled_token_ids, False)) # make sure we dont try to serialize any GPU tensors output[0].sampled_token_ids = None output[0].sampled_token_probs = None @@ -331,7 +342,8 @@ def execute_model( if model_input.is_last_step: outputs = [] for output in model_input.outputs: - output.pythonize(model_input, self._copy_stream, self.pinned_sampled_token_ids) + output.pythonize(model_input, self._copy_stream, + self.pinned_sampled_token_ids) outputs.append(output.sampler_output) return outputs @@ -407,16 +419,17 @@ def _advance_step( self._update_flash_attn_metadata(attn_metadata, num_seqs, num_queries) # Update GPU tensors - ops.advance_step(num_seqs=num_seqs, - num_queries=num_queries, - block_size=self.block_size, - input_tokens=frozen_model_input.input_tokens, - # sampled_token_ids=out.sampled_token_ids, - sampled_token_ids=model_input.outputs[-1].sampled_token_ids, - input_positions=frozen_model_input.input_positions, - seq_lens=attn_metadata.seq_lens_tensor, - slot_mapping=attn_metadata.slot_mapping, - block_tables=attn_metadata.block_tables) + ops.advance_step( + num_seqs=num_seqs, + num_queries=num_queries, + block_size=self.block_size, + input_tokens=frozen_model_input.input_tokens, + # sampled_token_ids=out.sampled_token_ids, + sampled_token_ids=model_input.outputs[-1].sampled_token_ids, + input_positions=frozen_model_input.input_positions, + seq_lens=attn_metadata.seq_lens_tensor, + slot_mapping=attn_metadata.slot_mapping, + block_tables=attn_metadata.block_tables) # Update sampling_metadata # model_input.seq_lens = attn_metadata.seq_lens @@ -432,8 +445,7 @@ def _advance_step( def _pythonize_sampler_output( model_input: MutableModelInputForGPUWithMultiStepMetadata, - output: SamplerOutput, - pinned_sampled_token_buffer: torch.Tensor, + output: SamplerOutput, pinned_sampled_token_buffer: torch.Tensor, sampled_token_ids: Optional[torch.Tensor]) -> SamplerOutput: # TODO(will): fix logprobs @@ -463,7 +475,7 @@ def _pythonize_sampler_output( sampling_metadata = frozen_model_input.sampling_metadata for (seq_group, sample_result) in zip(sampling_metadata.seq_groups, - samples_list): + samples_list): seq_ids = seq_group.seq_ids # next_token_ids, parent_ids = sample_result next_token_ids = sample_result @@ -474,7 +486,7 @@ def _pythonize_sampler_output( # XXX Hard coded logprob seq_outputs.append( SequenceOutput(seq_ids[parent_id], next_token_id, - {next_token_id: Logprob(logprob=42)})) + {next_token_id: Logprob(logprob=42)})) # print('CompletionSequenceGroupOutput', seq_outputs) output.outputs.append(CompletionSequenceGroupOutput(seq_outputs, None)) assert len(output.outputs) > 0 diff --git a/vllm/worker/multi_step_worker.py b/vllm/worker/multi_step_worker.py index 5f8dcfcc3e87..4d38cd36edc5 100644 --- a/vllm/worker/multi_step_worker.py +++ b/vllm/worker/multi_step_worker.py @@ -63,7 +63,8 @@ def _get_driver_input_and_broadcast( # update their model input metadata inplace. if not is_first_multi_step: if get_pp_group().is_last_rank: - assert model_input.outputs[-1].sampler_output.sampled_token_ids is None + assert model_input.outputs[ + -1].sampler_output.sampled_token_ids is None assert model_input.outputs[-1].sampled_token_ids is not None model_input.last_sampled_token_ids = model_input.outputs[ -1].sampled_token_ids @@ -80,11 +81,9 @@ def _get_driver_input_and_broadcast( model_input.last_sampled_token_ids = execute_model_req.last_sampled_token_ids.cuda( ) model_input.add_sampler_output( - SamplerOutput( - outputs=[], - sampled_token_ids=None), + SamplerOutput(outputs=[], sampled_token_ids=None), model_input.last_sampled_token_ids) - + # free sampled token ids from the previous step. # TODO(will) we could reuse the sampled token ids tensor from # the previous step instead. @@ -97,7 +96,6 @@ def _get_driver_input_and_broadcast( broadcast_data.update(model_input.as_broadcastable_tensor_dict()) broadcast_tensor_dict(broadcast_data, src=0) - return model_input, worker_input def prepare_input( @@ -161,9 +159,7 @@ def prepare_input( # we need to update the last sampled token ids in the model input # for the workers so that they can run inplace advance_step model_input.add_sampler_output( - SamplerOutput( - outputs=[], - sampled_token_ids=None), + SamplerOutput(outputs=[], sampled_token_ids=None), model_input.last_sampled_token_ids) # self.multi_step_states[virtual_engine] = MultiStepState( # worker_input=worker_input, model_input=model_input) diff --git a/vllm/worker/worker.py b/vllm/worker/worker.py index 15d95a11bdfd..3d5c191e79ea 100644 --- a/vllm/worker/worker.py +++ b/vllm/worker/worker.py @@ -133,15 +133,6 @@ def __init__( # Initialize gpu_cache as embedding models don't initialize kv_caches self.gpu_cache: Optional[List[List[torch.Tensor]]] = None - self.profiler = torch.profiler.profile( - activities=[ - torch.profiler.ProfilerActivity.CPU, - torch.profiler.ProfilerActivity.CUDA, - ], - with_stack=True, - on_trace_ready=torch.profiler.tensorboard_trace_handler( - str('/mnt/user_storage/traces/'), use_gzip=True)) - def _is_encoder_decoder_model(self): return is_encoder_decoder_model_config(self.model_config)