Skip to content

Commit

Permalink
format, remove profiler
Browse files Browse the repository at this point in the history
  • Loading branch information
SolitaryThinker committed Aug 8, 2024
1 parent 6921f1c commit d820deb
Show file tree
Hide file tree
Showing 8 changed files with 60 additions and 126 deletions.
6 changes: 3 additions & 3 deletions benchmarks/backend_request_func.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,9 @@ async def async_request_openai_completions(
pbar: Optional[tqdm] = None,
) -> RequestFuncOutput:
api_url = request_func_input.api_url
# assert api_url.endswith(
# "completions"
# ), "OpenAI Completions API URL must end with 'completions'."
assert api_url.endswith(
"completions"
), "OpenAI Completions API URL must end with 'completions'."
async with aiohttp.ClientSession(timeout=AIOHTTP_TIMEOUT) as session:
assert not request_func_input.use_beam_search
payload = {
Expand Down
46 changes: 0 additions & 46 deletions benchmarks/benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,13 @@ def calculate_metrics(
async def benchmark(
backend: str,
api_url: str,
base_url: str,
model_id: str,
tokenizer: PreTrainedTokenizerBase,
input_requests: List[Tuple[str, int, int]],
best_of: int,
use_beam_search: bool,
request_rate: float,
disable_tqdm: bool,
profile: bool,
):
if backend in ASYNC_REQUEST_FUNCS:
request_func = ASYNC_REQUEST_FUNCS[backend]
Expand All @@ -328,24 +326,6 @@ async def benchmark(
f"are correctly specified. Error: {test_output.error}")
else:
print("Initial test run completed. Starting main benchmark run...")

if profile:
print("Starting profiler")
profile_input = RequestFuncInput(
model=model_id,
prompt=test_prompt,
api_url=base_url + "/start_profile",
prompt_len=test_prompt_len,
output_len=test_output_len,
best_of=best_of,
use_beam_search=use_beam_search,
)
profile_output = await request_func(request_func_input=profile_input)
if profile_output.success:
print("profiler started")
else:
print("profiler failed to start")

print(f"Traffic request rate: {request_rate}")

pbar = None if disable_tqdm else tqdm(total=len(input_requests))
Expand All @@ -369,23 +349,6 @@ async def benchmark(
pbar=pbar)))
outputs: List[RequestFuncOutput] = await asyncio.gather(*tasks)

if profile:
print("Stopping profiler")
profile_input = RequestFuncInput(
model=model_id,
prompt=test_prompt,
api_url=base_url + "/stop_profile",
prompt_len=test_prompt_len,
output_len=test_output_len,
best_of=best_of,
use_beam_search=use_beam_search,
)
profile_output = await request_func(request_func_input=profile_input)
if profile_output.success:
print("profiler stopped")
else:
print("profiler failed to stop")

if pbar is not None:
pbar.close()

Expand Down Expand Up @@ -470,10 +433,8 @@ def main(args: argparse.Namespace):

if args.base_url is not None:
api_url = f"{args.base_url}{args.endpoint}"
base_url = f"{args.base_url}"
else:
api_url = f"http://{args.host}:{args.port}{args.endpoint}"
base_url = f"http://{args.host}:{args.port}"

tokenizer = get_tokenizer(tokenizer_id,
trust_remote_code=args.trust_remote_code)
Expand Down Expand Up @@ -545,15 +506,13 @@ def main(args: argparse.Namespace):
benchmark(
backend=backend,
api_url=api_url,
base_url=base_url,
model_id=model_id,
tokenizer=tokenizer,
input_requests=input_requests,
best_of=args.best_of,
use_beam_search=args.use_beam_search,
request_rate=args.request_rate,
disable_tqdm=args.disable_tqdm,
profile=args.profile,
))

# Save config and results to json
Expand Down Expand Up @@ -734,11 +693,6 @@ def main(args: argparse.Namespace):
action="store_true",
help="Specify to disable tqdm progress bar.",
)
parser.add_argument(
"--profile",
action="store_true",
help="Use Torch Profiler",
)
parser.add_argument(
"--save-result",
action="store_true",
Expand Down
12 changes: 3 additions & 9 deletions vllm/engine/async_llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,14 +309,14 @@ async def step_async(
virtual_engine].last_output
# make sure we don't incur a GPU->CPU transfer if we don't need to
if (self.parallel_config.pipeline_parallel_size > 1
and cached_last_output is not None
and cached_last_output.sampled_token_ids_numpy is not None):
and cached_last_output is not None and
cached_last_output.sampled_token_ids_numpy is not None):
last_sampled_token_ids = \
torch.from_numpy(cached_last_output.sampled_token_ids_numpy)
# last_sampled_token_ids = \
# torch.Tensor(cached_last_output.sampled_token_ids_numpy).long()
# \
# cached_last_output.sampled_token_ids.cpu()
# cached_last_output.sampled_token_ids.cpu()
else:
last_sampled_token_ids = None

Expand Down Expand Up @@ -515,12 +515,6 @@ def __init__(self,
# Lazy initialized fields
self._request_tracker: RequestTracker

def start_profile(self):
self.engine.model_executor._run_workers('start_profile')

def stop_profile(self):
self.engine.model_executor._run_workers('stop_profile')

@classmethod
def _get_executor_cls(
cls, engine_config: EngineConfig) -> Type[ExecutorAsyncBase]:
Expand Down
14 changes: 0 additions & 14 deletions vllm/entrypoints/openai/api_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,20 +196,6 @@ async def show_version():
return JSONResponse(content=ver)


@router.post("/start_profile")
async def start_profile():
engine.start_profile()
print('Profile started')
return JSONResponse(content={"message": "Profile started"})


@router.post("/stop_profile")
async def stop_profile():
engine.stop_profile()
print('Profile stopped')
return JSONResponse(content={"message": "Profile stopped"})


@router.post("/v1/chat/completions")
async def create_chat_completion(request: ChatCompletionRequest,
raw_request: Request):
Expand Down
3 changes: 2 additions & 1 deletion vllm/sequence.py
Original file line number Diff line number Diff line change
Expand Up @@ -1140,4 +1140,5 @@ def clone(
previous_hidden_states=self.previous_hidden_states,
num_steps=self.num_steps,
finished_requests_ids=self.finished_requests_ids,
last_sampled_token_ids=self.last_sampled_token_ids.clone())
last_sampled_token_ids=self.last_sampled_token_ids.clone()
if self.last_sampled_token_ids is not None else None)
82 changes: 47 additions & 35 deletions vllm/worker/multi_step_model_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
CompletionSequenceGroupOutput, Logprob)
from vllm import _custom_ops as ops


import torch

logger = init_logger(__name__)
Expand All @@ -48,39 +47,48 @@ class ModelOutput:
sampled_token_ids: Optional[torch.Tensor] = None
pythonized: bool = False

def pythonize(self, input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> None:
def pythonize(
self,
input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> None:
"""Pythonize the output. Blocking."""
if not self.pythonized:
self._pythonize_sampler_output_wait_on_event(
input_metadata, copy_stream, pinned_sampled_token_buffer)
self.pythonized = True

def maybe_pythonize(self,
input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> None:
def maybe_pythonize(
self,
input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> None:
"""Pythonize the output if ready, else return None. Non-blocking."""
if not self.pythonized:
self.pythonized = self._pythonize_sampler_output_if_event_ready(
input_metadata, copy_stream, pinned_sampled_token_buffer)

def _pythonize_sampler_output_wait_on_event(
self, input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
self,
input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> None:
self.sampler_output_ready_event.synchronize()
with torch.cuda.stream(copy_stream):
_pythonize_sampler_output(input_metadata, self.sampler_output, pinned_sampled_token_buffer, self.sampled_token_ids)
_pythonize_sampler_output(input_metadata, self.sampler_output,
pinned_sampled_token_buffer,
self.sampled_token_ids)

def _pythonize_sampler_output_if_event_ready(
self, input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
self,
input_metadata: "MutableModelInputForGPUWithMultiStepMetadata",
copy_stream: torch.cuda.Stream,
pinned_sampled_token_buffer: torch.Tensor) -> bool:
if self.sampler_output_ready_event.query():
with torch.cuda.stream(copy_stream):
_pythonize_sampler_output(input_metadata, self.sampler_output, pinned_sampled_token_buffer, self.sampled_token_ids)
_pythonize_sampler_output(input_metadata, self.sampler_output,
pinned_sampled_token_buffer,
self.sampled_token_ids)
return True
return False

Expand Down Expand Up @@ -142,7 +150,9 @@ def wait_previous_step(self):
self.step_cuda_events[(self.current_step + 1) % 2].wait()
self.step_cuda_events[(self.current_step + 1) % 2] = None

def add_sampler_output(self, sampler_output: SamplerOutput, sampled_token_ids: Optional[torch.Tensor] = None):
def add_sampler_output(self,
sampler_output: SamplerOutput,
sampled_token_ids: Optional[torch.Tensor] = None):
self.outputs.append(
ModelOutput(sampler_output=sampler_output,
sampler_output_ready_event=None,
Expand Down Expand Up @@ -253,7 +263,6 @@ def execute_model(
print(f'current_step: {model_input.current_step}')
print(f'is_first_multi_step: {model_input.is_first_multi_step}')


# make sure we skip the sampler on the lask rank and only pythonize
# if CPU is ahead.
if self.is_driver_worker and get_pp_group().is_last_rank:
Expand All @@ -262,12 +271,14 @@ def execute_model(
self.pinned_sampled_token_ids = torch.zeros(
(self.scheduler_config.max_num_seqs, 1),
dtype=torch.long,
device="cpu", pin_memory=True)
device="cpu",
pin_memory=True)

self._base_model_runner.model.sampler.include_gpu_probs_tensor = True
frozen_model_input.sampling_metadata.skip_sampler_cpu_output = True
for output in model_input.outputs:
output.maybe_pythonize(model_input, self._copy_stream, self.pinned_sampled_token_ids)
output.maybe_pythonize(model_input, self._copy_stream,
self.pinned_sampled_token_ids)

# some pre-execute model logic for multi-step:
# - if it's the first step, we need to reset the sampling tensors
Expand All @@ -288,8 +299,6 @@ def execute_model(
if frozen_model_input.sampling_metadata:
frozen_model_input.sampling_metadata.reuse_sampling_tensors = False



# Execute the model
output = self._base_model_runner.execute_model(frozen_model_input,
kv_caches,
Expand All @@ -309,10 +318,12 @@ def execute_model(
output_ready_event = torch.cuda.Event()
output_ready_event.record(current_stream)
if self.parallel_config.pipeline_parallel_size > 1:
output[0].sampled_token_ids_numpy = output[0].sampled_token_ids.numpy(force=True)
output[0].sampled_token_ids_numpy = output[
0].sampled_token_ids.numpy(force=True)
# output[0].sampled_token_ids_numpy = output[0].sampled_token_ids.tolist()
model_input.outputs.append(
ModelOutput(output[0], output_ready_event, output[0].sampled_token_ids, False))
ModelOutput(output[0], output_ready_event,
output[0].sampled_token_ids, False))
# make sure we dont try to serialize any GPU tensors
output[0].sampled_token_ids = None
output[0].sampled_token_probs = None
Expand All @@ -331,7 +342,8 @@ def execute_model(
if model_input.is_last_step:
outputs = []
for output in model_input.outputs:
output.pythonize(model_input, self._copy_stream, self.pinned_sampled_token_ids)
output.pythonize(model_input, self._copy_stream,
self.pinned_sampled_token_ids)
outputs.append(output.sampler_output)
return outputs

Expand Down Expand Up @@ -407,16 +419,17 @@ def _advance_step(
self._update_flash_attn_metadata(attn_metadata, num_seqs, num_queries)

# Update GPU tensors
ops.advance_step(num_seqs=num_seqs,
num_queries=num_queries,
block_size=self.block_size,
input_tokens=frozen_model_input.input_tokens,
# sampled_token_ids=out.sampled_token_ids,
sampled_token_ids=model_input.outputs[-1].sampled_token_ids,
input_positions=frozen_model_input.input_positions,
seq_lens=attn_metadata.seq_lens_tensor,
slot_mapping=attn_metadata.slot_mapping,
block_tables=attn_metadata.block_tables)
ops.advance_step(
num_seqs=num_seqs,
num_queries=num_queries,
block_size=self.block_size,
input_tokens=frozen_model_input.input_tokens,
# sampled_token_ids=out.sampled_token_ids,
sampled_token_ids=model_input.outputs[-1].sampled_token_ids,
input_positions=frozen_model_input.input_positions,
seq_lens=attn_metadata.seq_lens_tensor,
slot_mapping=attn_metadata.slot_mapping,
block_tables=attn_metadata.block_tables)

# Update sampling_metadata
# model_input.seq_lens = attn_metadata.seq_lens
Expand All @@ -432,8 +445,7 @@ def _advance_step(

def _pythonize_sampler_output(
model_input: MutableModelInputForGPUWithMultiStepMetadata,
output: SamplerOutput,
pinned_sampled_token_buffer: torch.Tensor,
output: SamplerOutput, pinned_sampled_token_buffer: torch.Tensor,
sampled_token_ids: Optional[torch.Tensor]) -> SamplerOutput:
# TODO(will): fix logprobs

Expand Down Expand Up @@ -463,7 +475,7 @@ def _pythonize_sampler_output(
sampling_metadata = frozen_model_input.sampling_metadata

for (seq_group, sample_result) in zip(sampling_metadata.seq_groups,
samples_list):
samples_list):
seq_ids = seq_group.seq_ids
# next_token_ids, parent_ids = sample_result
next_token_ids = sample_result
Expand All @@ -474,7 +486,7 @@ def _pythonize_sampler_output(
# XXX Hard coded logprob
seq_outputs.append(
SequenceOutput(seq_ids[parent_id], next_token_id,
{next_token_id: Logprob(logprob=42)}))
{next_token_id: Logprob(logprob=42)}))
# print('CompletionSequenceGroupOutput', seq_outputs)
output.outputs.append(CompletionSequenceGroupOutput(seq_outputs, None))
assert len(output.outputs) > 0
Loading

0 comments on commit d820deb

Please sign in to comment.