Skip to content

Commit

Permalink
[tuner] Add BaselineResultHandler class (#789)
Browse files Browse the repository at this point in the history
This PR is relevant to address the task in:
#783.

This PR adds `BaselineResultHandler` class to handle the baseline
results and related functionality.
- Add reusable benchmarking helper functions `benchmark_baseline` and
`benchmark_candidates` to run baseline and candidates.
- Add `BaselineResultHandler ` class to handle both baseline and
candidate benchmark results including performance regression detection,
speedup calculation and candidate selection.
- Add corresponding tests.

---------

Signed-off-by: Bangtian Liu <liubangtian@gmail.com>
  • Loading branch information
bangtianliu authored Jan 20, 2025
1 parent 1f50538 commit 26bb250
Show file tree
Hide file tree
Showing 3 changed files with 331 additions and 147 deletions.
1 change: 0 additions & 1 deletion tuner/tuner/candidate_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
from abc import abstractmethod

from iree.compiler import ir # type: ignore

from iree.compiler.dialects import iree_codegen # type: ignore

from .common import *
Expand Down
324 changes: 226 additions & 98 deletions tuner/tuner/libtuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import shutil
import logging
import argparse
from collections import defaultdict
from datetime import datetime
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -143,6 +144,12 @@ class BenchmarkResult:
time: float
device_id: str

def is_valid(self) -> bool:
return math.isfinite(self.time)

def __iter__(self):
return iter((self.candidate_id, self.time, self.device_id))


def unit_to_microseconds(real_time: float, time_unit: str) -> float:
unit_conversions = {
Expand Down Expand Up @@ -429,7 +436,7 @@ def init_worker_context(queue: multiprocessing.Queue) -> None:
worker_id, device_id = queue.get()


def create_worker_context_queue(device_ids: list[int]) -> queue.Queue[tuple[int, int]]:
def create_worker_context_queue(device_ids: list[str]) -> queue.Queue[tuple[int, int]]:
"""Create queue contains Worker ID and Device ID for worker initialization"""
worker_contexts_queue = multiprocessing.Manager().Queue()
for worker_id, device_id in enumerate(device_ids):
Expand Down Expand Up @@ -564,7 +571,7 @@ def run_iree_benchmark_module_command(benchmark_pack: BenchmarkPack):

mean_benchmark_time = sum(times) / float(len(times))
logging.debug(
f"Benchmark time of candidate {candidate_id}: {mean_benchmark_time:.2f}"
f"Benchmark time of candidate {candidate_id}: {mean_benchmark_time:.2f} ms"
)
return BenchmarkResult(
candidate_id=candidate_id,
Expand Down Expand Up @@ -749,6 +756,170 @@ def collision_handler(index_hash_list: list[tuple[int, str]]) -> tuple[bool, lis
return collision_detected, unique_indexes


def benchmark_candidates(
candidate_indices, devices, tuning_client, candidate_trackers
) -> list[BenchmarkResult]:
"""
Runs the benchmarking for a given list of candidate indices.
"""
worker_context_queue = create_worker_context_queue(devices)

task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_trackers[idx],
)
for idx in candidate_indices
]

# Perform benchmarking.
return multiprocess_progress_wrapper(
num_worker=len(devices),
task_list=task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
)


def benchmark_baseline(
devices: list[str],
tuning_client: TuningClient,
candidate_tracker: CandidateTracker,
) -> list[BenchmarkResult]:
task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_tracker,
)
] * len(devices)

worker_context_queue = create_worker_context_queue(devices)
baseline_results = multiprocess_progress_wrapper(
num_worker=len(devices),
task_list=task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
)
return baseline_results


class BaselineResultHandler:
def __init__(self) -> None:
# Maps device IDs to a list of `BenchmarkResult`.
self.device_baseline_results: dict[str, list[BenchmarkResult]] = defaultdict(
list
)

def add_run(self, results: list[BenchmarkResult]) -> None:
if not BaselineResultHandler.are_baseline_devices_unique(results):
logging.warning(
"Duplicate device IDs detected in the first baseline results."
)
for result in results:
self.device_baseline_results[result.device_id].append(result)

@staticmethod
def are_baseline_devices_unique(results: list[BenchmarkResult]) -> bool:
return len(results) == len(set(result.device_id for result in results))

def get_valid_time_ms(self, device_id: str) -> list[float]:
return [
result.time
for result in self.device_baseline_results.get(device_id, [])
if result.is_valid()
]

def get_average_result_ms(self, device_id: str) -> Optional[float]:
valid_times = self.get_valid_time_ms(device_id)
if valid_times:
return sum(valid_times) / len(valid_times)
return None

def detect_regressions(
self,
baseline_results: list[BenchmarkResult],
threshold: float = 1.03,
) -> list[str]:
"""
Returns a list of device IDs where performance regressions were detected.
A performance regression is defined as a baseline time from 'baseline_results'
for a device that exceeds the stored average baseline time for that device by
a factor greater than the specified 'threshold'.
"""
regressions = []
for result in baseline_results:
if not result.is_valid():
continue

baseline_avg = self.get_average_result_ms(result.device_id)
if baseline_avg is not None and result.time > baseline_avg * threshold:
regressions.append(result.device_id)

return regressions

def is_valid(self) -> bool:
"""
Check if there are any valid finite baseline time recorded.
Returns True iff at least one valid (finite) baseline time was recorded.
"""
return any(
self.get_valid_time_ms(device_id)
for device_id in self.device_baseline_results
)

def is_valid_for_device(self, device_id: str) -> bool:
return len(self.get_valid_time_ms(device_id)) != 0

def get_candidates_ordered_by_speedup(
self, candidate_results: list[BenchmarkResult]
) -> list[tuple[BenchmarkResult, float]]:
"""
Returns a list of tuples (BenchmarkResult, speedup) sorted in ascending order based on speedup
or raw runtime.
If no valid baseline times are available across all devices, candidates are sorted based on
their raw runtime. A placeholder speedup value of 1.0 is assigned to each candidate.
If valid baseline times exist, speedup is defined as the ratio of the candidate's runtime to
the average baseline time for the corresponding device as:
speedup = candidate_runtime / avg_baseline_time (or fallback_baseline)
If no valid baseline times are available for a specific device, the fallback baseline is used.
The fallback baseline is the average of all valid baseline times across devices.
"""
if not self.is_valid():
logging.warning("No valid baseline times available.")
# Use the candidate time directly when no baselines are available.
return sorted(
[(candidate, 1.0) for candidate in candidate_results],
key=lambda x: x[0].time,
)

# Calculate the fallback baseline as the average of all valid times across devices.
valid_baseline_times = [
result.time
for device_id in self.device_baseline_results
for result in self.device_baseline_results[device_id]
if result.is_valid()
]

fallback_baseline = sum(valid_baseline_times) / len(valid_baseline_times)

candidates_with_speedup = []
for candidate in candidate_results:
baseline_avg_ms = self.get_average_result_ms(candidate.device_id)
if baseline_avg_ms is None:
baseline_avg_ms = fallback_baseline
speedup = candidate.time / baseline_avg_ms
candidates_with_speedup.append((candidate, speedup))
return sorted(candidates_with_speedup, key=lambda x: x[1])


def compile(
args: argparse.Namespace,
path_config: PathConfig,
Expand Down Expand Up @@ -832,64 +1003,6 @@ def compile(
return compiled_candidates


def select_best_benchmark_results(
candidate_results: list[BenchmarkResult],
baseline_results: list[BenchmarkResult],
num_candidates: Optional[int],
) -> list[BenchmarkResult]:
filtered_candidate_results = [r for r in candidate_results if math.isfinite(r.time)]
if len(filtered_candidate_results) == 0:
logging.error("No successful candidate benchmarks.")
return []
fallback_baseline_time: Optional[float] = None
filtered_baseline_results: list[BenchmarkResult] = []
for r in baseline_results:
if math.isfinite(r.time):
filtered_baseline_results.append(r)
fallback_baseline_time = r.time
else:
logging.warning(f"Baseline on device {r.device_id} failed.")
if fallback_baseline_time is None:
logging.warning(
f"All baseline benchmarks failed. Baselines will not be used to select top candidates"
)
baseline_times_by_device = {}
for r in filtered_baseline_results:
baseline_times_by_device[r.device_id] = r.time

# Select top candidates
def get_speedup(result: BenchmarkResult) -> float:
if result.device_id in baseline_times_by_device:
return result.time / baseline_times_by_device[result.device_id]
assert fallback_baseline_time is not None, "expected fallback_baseline_time"
return result.time / fallback_baseline_time

num_top_candidates = len(filtered_candidate_results)
if num_candidates is not None:
num_top_candidates = num_candidates

# Sort by the speedup over baseline on the same device. If a device failed
# the baseline benchmark, then use the fallback baseline. If there is no
# successful baseline, then the best we can do is to sort by the actual
# time.
sorting_key = get_speedup
if fallback_baseline_time is None:
sorting_key = lambda result: result.time
best_results = sorted(filtered_candidate_results, key=sorting_key)[
:num_top_candidates
]
logging.info(f"Selected top[{len(best_results)}]:")

for r in best_results:
if fallback_baseline_time is not None:
speedup = f"{round(get_speedup(r) * 100, 2)}% of baseline"
else:
speedup = "baseline unavailable"
result = f"Candidate {r.candidate_id} time: {r.time:.2f} ms ({speedup})"
logging.info(result)
return best_results


def benchmark(
args: argparse.Namespace,
compiled_candidates: list[int],
Expand All @@ -902,46 +1015,61 @@ def benchmark(
logging.warning("No candidates to benchmark.")
return []

task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_trackers[i],
)
for i in compiled_candidates
if i != 0
]
worker_context_queue = create_worker_context_queue(args.devices)
candidate_results: list[BenchmarkResult] = multiprocess_progress_wrapper(
num_worker=len(args.devices),
task_list=task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
)

# Benchmarking baselines on each involved device.
worker_context_queue = create_worker_context_queue(args.devices)
baseline_task_list = [
BenchmarkPack(
iree_benchmark_module_flags=tuning_client.get_iree_benchmark_module_flags(),
benchmark_timeout=tuning_client.get_benchmark_timeout_s(),
candidate_tracker=candidate_trackers[0],
)
] * len(args.devices)
baseline_results: list[BenchmarkResult] = multiprocess_progress_wrapper(
num_worker=len(args.devices),
task_list=baseline_task_list,
function=run_iree_benchmark_module_command,
initializer=init_worker_context,
initializer_inputs=(worker_context_queue,),
baseline_tracker = candidate_trackers[0]
first_baseline_result = benchmark_baseline(
devices=args.devices,
tuning_client=tuning_client,
candidate_tracker=baseline_tracker,
)
baseline_handler = BaselineResultHandler()
baseline_handler.add_run(first_baseline_result)
if not baseline_handler.is_valid():
logging.warning("Baseline run failed.")

candidate_indices = [i for i in compiled_candidates if i != 0]
candidate_results = benchmark_candidates(
candidate_indices=candidate_indices,
devices=args.devices,
tuning_client=tuning_client,
candidate_trackers=candidate_trackers,
)

best_results: list[BenchmarkResult] = select_best_benchmark_results(
candidate_results=candidate_results,
baseline_results=baseline_results,
num_candidates=num_candidates,
second_baseline_result = benchmark_baseline(
devices=args.devices,
tuning_client=tuning_client,
candidate_tracker=baseline_tracker,
)

top_candidates = [result.candidate_id for result in best_results]
return top_candidates
regression_devices = baseline_handler.detect_regressions(second_baseline_result)
if regression_devices:
logging.warning(
f"Performance regressions detected for the following devices: {', '.join(regression_devices)}."
)
baseline_handler.add_run(second_baseline_result)

if not baseline_handler.is_valid():
logging.warning("Baseline run failed.")

all_candidates_with_speedup = baseline_handler.get_candidates_ordered_by_speedup(
candidate_results
)
top_candidates_with_speedup = all_candidates_with_speedup[:num_candidates]
top_candidate_ids = []
if baseline_handler.is_valid():
for candidate, speedup in top_candidates_with_speedup:
time_ms = candidate.time
candidate_id = candidate.candidate_id
percentage_of_baseline = speedup * 100
top_candidate_ids.append(candidate_id)
logging.info(
f"Candidate {candidate_id} time: {time_ms:.2f} ms "
f"({percentage_of_baseline:.1f}% of baseline)"
)
else:
for candidate, _ in top_candidates_with_speedup:
time_ms = candidate.time
candidate_id = candidate.candidate_id
top_candidate_ids.append(candidate_id)
logging.info(f"Candidate {candidate_id} time: {time_ms:.2f} ms")
return top_candidate_ids
Loading

0 comments on commit 26bb250

Please sign in to comment.