Skip to content

Commit

Permalink
fix metrics async
Browse files Browse the repository at this point in the history
  • Loading branch information
alexm-neuralmagic committed Aug 23, 2024
1 parent fdc2a9c commit 9ddb985
Showing 1 changed file with 13 additions and 9 deletions.
22 changes: 13 additions & 9 deletions vllm/engine/llm_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -1439,18 +1439,18 @@ def step(self) -> List[Union[RequestOutput, EmbeddingRequestOutput]]:
self.do_tracing(scheduler_outputs)

if not self.has_unfinished_requests():
# Drain async postprocessor
if len(self.output_queue) > 0:
self._process_model_outputs(is_async=True, clear_outputs=False)
assert len(self.output_queue) == 0

# Stop the execute model loop in parallel workers until there are
# more requests to process. This avoids waiting indefinitely in
# torch.distributed ops which may otherwise timeout, and unblocks
# the RPC thread in the workers so that they can process any other
# queued control plane messages, such as add/remove lora adapters.
self.model_executor.stop_remote_worker_execution_loop()

# Drain async postprocessor
if len(self.output_queue) > 0:
self._process_model_outputs(is_async=True, clear_outputs=False)
assert len(self.output_queue) == 0

return self.request_outputs

def add_logger(self, logger_name: str, logger: StatLoggerBase) -> None:
Expand Down Expand Up @@ -1539,6 +1539,8 @@ def _get_stats(self,
n_requests: List[int] = []
finished_reason_requests: List[str] = []

actual_num_batched_tokens = scheduler_outputs.num_batched_tokens # type: ignore

# NOTE: This loop assumes prefill seq_groups are before
# decode seq_groups in scheduled_seq_groups.
if scheduler_outputs is not None:
Expand All @@ -1550,6 +1552,11 @@ def _get_stats(self,

for idx, scheduled_seq_group in enumerate(
scheduler_outputs.scheduled_seq_groups):
# Skip double logging when using async output proc
if finished_before and idx in finished_before:
actual_num_batched_tokens -= 1
continue

group_was_prefill = idx < scheduler_outputs.num_prefill_groups
seq_group = scheduled_seq_group.seq_group

Expand Down Expand Up @@ -1581,9 +1588,6 @@ def _get_stats(self,
# on logging request level information for finished requests,
# which can only happen once.
if (seq_group.is_finished()):
# Skip double logging when using async output proc
if finished_before and idx in finished_before:
continue
# Latency timings
time_e2e_requests.append(now -
seq_group.metrics.arrival_time)
Expand All @@ -1610,7 +1614,7 @@ def _get_stats(self,
# + num_generation_tokens_from_prefill_groups (since we generate
# one token on prefills on iters where the prefill finishes).
num_generation_tokens_iter = (
scheduler_outputs.num_batched_tokens - num_prompt_tokens_iter +
actual_num_batched_tokens - num_prompt_tokens_iter +
num_generation_tokens_from_prefill_groups)

# Spec decode, if enabled, emits specialized metrics from the worker in
Expand Down

0 comments on commit 9ddb985

Please sign in to comment.