Skip to content

Commit

Permalink
[V1][Metrics] Add e2e/queue/prefill/decode/inference time histograms
Browse files Browse the repository at this point in the history
Follow on from vllm-project#12579, part of vllm-project#10582.

Add the following:

- vllm:e2e_request_latency_seconds
- vllm:request_queue_time_seconds
- vllm:request_inference_time_seconds
- vllm:request_prefill_time_seconds
- vllm:request_decode_time_seconds

e2e_request_latency is calculated relative to the arrival_time
timestamp recorded by the frontend.

For the rest ... we want to capture (in histograms) precise
pre-request timing intervals between certain events in the engine
core:

```
  << queued timestamp >>
    [ queue interval ]
  << scheduled timestamp >>
    [ prefill interval ]
  << new token timestamp (FIRST) >>
    [ inter-token interval ]
  << new token timestamp >>
    [ decode interval (relative to first token time)
    [ inference interval (relative to scheduled time)
  << new token timestamp (FINISHED) >>
```

We want to collect these metrics in the frontend process, to keep the
engine core freed up as much as possible. We need to calculate these
intervals based on timestamps recorded by the engine core.

Engine core will include these timestamps in EngineCoreOutput (per
request) as a sequence of timestamped events, and the frontend will
calculate intervals and log them. Where we record these timestamped
events:

- QUEUED: scheduler add_request()
- SCHEDULED: scheduler schedule()

There is an implicit NEW_TOKENS timestamp based on an initialization
timestamp recorded on EngineCoreOutputs.

Signed-off-by: Mark McLoughlin <markmc@redhat.com>
  • Loading branch information
markmc committed Feb 7, 2025
1 parent 0630d45 commit 37e5b11
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 50 deletions.
31 changes: 31 additions & 0 deletions tests/entrypoints/openai/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,10 @@ async def client(server):
"vllm:time_per_output_token_seconds":
[("_count", _NUM_REQUESTS * (_NUM_GENERATION_TOKENS_PER_REQUEST - 1))],
"vllm:e2e_request_latency_seconds": [("_count", _NUM_REQUESTS)],
"vllm:request_queue_time_seconds": [("_count", _NUM_REQUESTS)],
"vllm:request_inference_time_seconds": [("_count", _NUM_REQUESTS)],
"vllm:request_prefill_time_seconds": [("_count", _NUM_REQUESTS)],
"vllm:request_decode_time_seconds": [("_count", _NUM_REQUESTS)],
"vllm:request_prompt_tokens":
[("_sum", _NUM_REQUESTS * _NUM_PROMPT_TOKENS_PER_REQUEST),
("_count", _NUM_REQUESTS)],
Expand Down Expand Up @@ -169,6 +173,18 @@ async def test_metrics_counts(server: RemoteOpenAIServer,
"vllm:e2e_request_latency_seconds_sum",
"vllm:e2e_request_latency_seconds_bucket",
"vllm:e2e_request_latency_seconds_count",
"vllm:request_queue_time_seconds_sum",
"vllm:request_queue_time_seconds_bucket",
"vllm:request_queue_time_seconds_count",
"vllm:request_inference_time_seconds_sum",
"vllm:request_inference_time_seconds_bucket",
"vllm:request_inference_time_seconds_count",
"vllm:request_prefill_time_seconds_sum",
"vllm:request_prefill_time_seconds_bucket",
"vllm:request_prefill_time_seconds_count",
"vllm:request_decode_time_seconds_sum",
"vllm:request_decode_time_seconds_bucket",
"vllm:request_decode_time_seconds_count",
"vllm:request_prompt_tokens_sum",
"vllm:request_prompt_tokens_bucket",
"vllm:request_prompt_tokens_count",
Expand Down Expand Up @@ -218,6 +234,21 @@ async def test_metrics_counts(server: RemoteOpenAIServer,
"vllm:time_per_output_token_seconds_sum",
"vllm:time_per_output_token_seconds_bucket",
"vllm:time_per_output_token_seconds_count",
"vllm:e2e_request_latency_seconds_sum",
"vllm:e2e_request_latency_seconds_bucket",
"vllm:e2e_request_latency_seconds_count",
"vllm:request_queue_time_seconds_sum",
"vllm:request_queue_time_seconds_bucket",
"vllm:request_queue_time_seconds_count",
"vllm:request_inference_time_seconds_sum",
"vllm:request_inference_time_seconds_bucket",
"vllm:request_inference_time_seconds_count",
"vllm:request_prefill_time_seconds_sum",
"vllm:request_prefill_time_seconds_bucket",
"vllm:request_prefill_time_seconds_count",
"vllm:request_decode_time_seconds_sum",
"vllm:request_decode_time_seconds_bucket",
"vllm:request_decode_time_seconds_count",
]


Expand Down
23 changes: 15 additions & 8 deletions tests/v1/engine/test_output_processor.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0

import math
import time
from typing import Dict, List, Optional

import pytest
Expand All @@ -15,6 +16,7 @@
from vllm.transformers_utils.tokenizer import AnyTokenizer
from vllm.v1.engine import EngineCoreRequest
from vllm.v1.engine.output_processor import OutputProcessor
from vllm.v1.metrics.stats import IterationStats


def _ref_convert_id_to_token(
Expand Down Expand Up @@ -603,6 +605,7 @@ def test_iteration_stats(dummy_test_vectors):
output_processor = OutputProcessor(dummy_test_vectors.tokenizer_group,
log_stats=True)
engine_core = MockEngineCore(dummy_test_vectors.generation_tokens)
engine_core_timestamp = time.monotonic()

# Make N requests.
requests = [
Expand Down Expand Up @@ -630,8 +633,9 @@ def test_iteration_stats(dummy_test_vectors):

# First iteration has 2 prefills.
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats
iteration_stats = IterationStats(output_processor.log_stats)
output_processor.process_outputs(outputs, engine_core_timestamp,
iteration_stats)
total_prompt_tokens = sum([
len(prompt_tokens)
for prompt_tokens in dummy_test_vectors.prompt_tokens[:num_active]
Expand All @@ -642,8 +646,9 @@ def test_iteration_stats(dummy_test_vectors):

# Just decodes in this step.
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats
iteration_stats = IterationStats(output_processor.log_stats)
output_processor.process_outputs(outputs, engine_core_timestamp,
iteration_stats)

assert iteration_stats.num_prompt_tokens == 0
assert iteration_stats.num_generation_tokens == num_active
Expand All @@ -652,17 +657,19 @@ def test_iteration_stats(dummy_test_vectors):
output_processor.add_request(inactive_request)
num_active += 1
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats
iteration_stats = IterationStats(output_processor.log_stats)
output_processor.process_outputs(outputs, engine_core_timestamp,
iteration_stats)
total_prompt_tokens = len(dummy_test_vectors.prompt_tokens[num_active - 1])

assert iteration_stats.num_prompt_tokens == total_prompt_tokens
assert iteration_stats.num_generation_tokens == num_active

# Just decodes in this step.
outputs = engine_core.get_outputs()[:num_active]
processed_outputs = output_processor.process_outputs(outputs)
iteration_stats = processed_outputs.iteration_stats
iteration_stats = IterationStats(output_processor.log_stats)
output_processor.process_outputs(outputs, engine_core_timestamp,
iteration_stats)

assert iteration_stats.num_prompt_tokens == 0
assert iteration_stats.num_generation_tokens == num_active
8 changes: 7 additions & 1 deletion vllm/v1/core/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# SPDX-License-Identifier: Apache-2.0

import time
from collections import deque
from dataclasses import dataclass
from typing import (TYPE_CHECKING, Deque, Dict, Iterable, List, Optional, Set,
Expand Down Expand Up @@ -113,6 +114,8 @@ def schedule(self) -> "SchedulerOutput":
scheduled_encoder_inputs: Dict[str, List[int]] = {}
encoder_budget = self.max_num_encoder_input_tokens

scheduled_timestamp = time.monotonic()

# First, schedule the RUNNING requests.
req_index = 0
while req_index < len(self.running) and token_budget > 0:
Expand Down Expand Up @@ -252,6 +255,7 @@ def schedule(self) -> "SchedulerOutput":
self.running.append(request)
if request.status == RequestStatus.WAITING:
scheduled_new_reqs.append(request)
request.scheduled(scheduled_timestamp)
elif request.status == RequestStatus.PREEMPTED:
scheduled_resumed_reqs.append(request)
else:
Expand Down Expand Up @@ -514,7 +518,8 @@ def update_from_output(
finish_reason=request.get_finished_reason(),
new_logprobs=new_logprobs,
new_prompt_logprobs_tensors=prompt_logprobs_tensors,
stop_reason=request.stop_reason))
stop_reason=request.stop_reason,
events=request.take_events()))

if not stopped:
new_running.append(request)
Expand Down Expand Up @@ -547,6 +552,7 @@ def _check_stop(self, request: Request) -> bool:
def add_request(self, request: Request) -> None:
self.waiting.append(request)
self.requests[request.request_id] = request
request.queued()

def finish_requests(
self,
Expand Down
31 changes: 31 additions & 0 deletions vllm/v1/engine/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# SPDX-License-Identifier: Apache-2.0

import enum
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, List, Optional, Union

Expand Down Expand Up @@ -39,6 +40,31 @@ def __str__(self):
return FINISH_REASON_STRINGS[self.value]


class EngineCoreEventType(enum.IntEnum):
"""The type of engine core request event."""
QUEUED = 1
SCHEDULED = 2


@dataclass
class EngineCoreEvent:
"""A timestamped engine core event associated with a request.
The timestamp is a monotonic timestamps and is used for by the engine
frontend to calculate intervals between engine core events. These
timestamps should not be compared with timestamps from other processes.
"""
type: EngineCoreEventType
timestamp: float

@classmethod
def new_event(cls,
event_type: EngineCoreEventType,
timestamp: Optional[float] = None) -> "EngineCoreEvent":
timestamp = time.monotonic() if timestamp is None else timestamp
return cls(event_type, timestamp)


@dataclass
class EngineCoreRequest:

Expand Down Expand Up @@ -74,6 +100,7 @@ class EngineCoreOutput(

finish_reason: Optional[FinishReason] = None
stop_reason: Union[int, str, None] = None
events: Optional[List[EngineCoreEvent]] = None

@property
def finished(self) -> bool:
Expand All @@ -92,6 +119,10 @@ class EngineCoreOutputs(
# [num_reqs]
outputs: List[EngineCoreOutput]
scheduler_stats: SchedulerStats
timestamp: float = 0.0

def __post_init__(self):
self.timestamp = time.monotonic()


@dataclass
Expand Down
6 changes: 3 additions & 3 deletions vllm/v1/engine/async_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,8 @@ async def _run_output_handler(self):
# 1) Pull EngineCoreOutputs from the EngineCore.
outputs = await self.engine_core.get_output_async()

iteration_stats = IterationStats(self.log_stats)

# Split outputs into chunks of at most
# VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the
# event loop for too long.
Expand All @@ -257,14 +259,12 @@ async def _run_output_handler(self):
outputs.outputs,
cdiv(num_outputs, VLLM_V1_OUTPUT_PROC_CHUNK_SIZE))

iteration_stats = None
for i, outputs_slice in enumerate(slices):
# 2) Process EngineCoreOutputs.
processed_outputs = self.output_processor.process_outputs(
outputs_slice, iteration_stats)
outputs_slice, outputs.timestamp, iteration_stats)
# NOTE: RequestOutputs are pushed to their queues.
assert not processed_outputs.request_outputs
iteration_stats = processed_outputs.iteration_stats

# Allow other asyncio tasks to run between chunks
if i + 1 < len(slices):
Expand Down
27 changes: 14 additions & 13 deletions vllm/v1/engine/output_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ class OutputProcessorOutput:

request_outputs: List[RequestOutput]
reqs_to_abort: List[str]
iteration_stats: IterationStats


class RequestState:
Expand All @@ -45,7 +44,7 @@ def __init__(
self.is_prefilling = True
self.queue = queue

self.stats = RequestStateStats(last_token_time=arrival_time)
self.stats = RequestStateStats(arrival_time=arrival_time)

@classmethod
def from_new_request(
Expand Down Expand Up @@ -117,6 +116,7 @@ def add_request(
def process_outputs(
self,
engine_core_outputs: List[EngineCoreOutput],
engine_core_timestamp: Optional[float] = None,
iteration_stats: Optional[IterationStats] = None,
) -> OutputProcessorOutput:
"""
Expand Down Expand Up @@ -145,8 +145,6 @@ def process_outputs(

request_outputs: List[RequestOutput] = []
reqs_to_abort: List[str] = []
if not iteration_stats:
iteration_stats = IterationStats(self.log_stats)
for engine_core_output in engine_core_outputs:
req_id = engine_core_output.request_id
req_state = self.request_states.get(req_id)
Expand All @@ -155,10 +153,13 @@ def process_outputs(
continue

# 1) Compute stats for this iteration.
iteration_stats.update_from_output(engine_core_output,
req_state.is_prefilling,
req_state.prompt_len,
req_state.stats)
if iteration_stats is not None:
assert engine_core_timestamp is not None
iteration_stats.update_from_output(engine_core_output,
engine_core_timestamp,
req_state.is_prefilling,
req_state.prompt_len,
req_state.stats)

new_token_ids = engine_core_output.new_token_ids
finish_reason = engine_core_output.finish_reason
Expand Down Expand Up @@ -205,15 +206,15 @@ def process_outputs(
# detected stop string, abort needed in EngineCore.
reqs_to_abort.append(req_id)

# Track per-request stats.
assert finish_reason is not None
iteration_stats.update_from_finished_request(
finish_reason, request_output, req_state.stats)
# Track per-request stats
if iteration_stats is not None:
assert finish_reason is not None
iteration_stats.update_from_finished_request(
finish_reason, request_output, req_state.stats)

return OutputProcessorOutput(
request_outputs=request_outputs,
reqs_to_abort=reqs_to_abort,
iteration_stats=iteration_stats,
)

@staticmethod
Expand Down
49 changes: 49 additions & 0 deletions vllm/v1/metrics/loggers.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,45 @@ def __init__(self, model_config: ModelConfig):
],
labelnames=labelnames).labels(*labelvalues)

request_latency_buckets = [
0.3, 0.5, 0.8, 1.0, 1.5, 2.0, 2.5, 5.0, 10.0, 15.0, 20.0, 30.0,
40.0, 50.0, 60.0
]
self.histogram_e2e_time_request = \
prometheus_client.Histogram(
name="vllm:e2e_request_latency_seconds",
documentation="Histogram of e2e request latency in seconds.",
buckets=request_latency_buckets,
labelnames=labelnames).labels(*labelvalues)
self.histogram_queue_time_request = \
prometheus_client.Histogram(
name="vllm:request_queue_time_seconds",
documentation=
"Histogram of time spent in WAITING phase for request.",
buckets=request_latency_buckets,
labelnames=labelnames).labels(*labelvalues)
self.histogram_inference_time_request = \
prometheus_client.Histogram(
name="vllm:request_inference_time_seconds",
documentation=
"Histogram of time spent in RUNNING phase for request.",
buckets=request_latency_buckets,
labelnames=labelnames).labels(*labelvalues)
self.histogram_prefill_time_request = \
prometheus_client.Histogram(
name="vllm:request_prefill_time_seconds",
documentation=
"Histogram of time spent in PREFILL phase for request.",
buckets=request_latency_buckets,
labelnames=labelnames).labels(*labelvalues)
self.histogram_decode_time_request = \
prometheus_client.Histogram(
name="vllm:request_decode_time_seconds",
documentation=
"Histogram of time spent in DECODE phase for request.",
buckets=request_latency_buckets,
labelnames=labelnames).labels(*labelvalues)

def log(self, scheduler_stats: SchedulerStats,
iteration_stats: IterationStats):
"""Log to prometheus."""
Expand All @@ -176,6 +215,12 @@ def log(self, scheduler_stats: SchedulerStats,

for finished_request in iteration_stats.finished_requests:
self.counter_request_success[finished_request.finish_reason].inc()
self.histogram_e2e_time_request.observe(
finished_request.e2e_latency)
self.histogram_inference_time_request.observe(
finished_request.inference_time)
self.histogram_decode_time_request.observe(
finished_request.decode_time)
self.histogram_num_prompt_tokens_request.observe(
finished_request.num_prompt_tokens)
self.histogram_num_generation_tokens_request.observe(
Expand All @@ -185,6 +230,10 @@ def log(self, scheduler_stats: SchedulerStats,
self.histogram_time_to_first_token.observe(ttft)
for tpot in iteration_stats.time_per_output_tokens_iter:
self.histogram_time_per_output_token.observe(tpot)
for queue_time in iteration_stats.queue_times_iter:
self.histogram_queue_time_request.observe(queue_time)
for prefill_time in iteration_stats.prefill_times_iter:
self.histogram_prefill_time_request.observe(prefill_time)

@staticmethod
def _unregister_vllm_metrics():
Expand Down
Loading

0 comments on commit 37e5b11

Please sign in to comment.