diff --git a/BUILD.bazel b/BUILD.bazel index 4558ba5cca..4ba330cd7c 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -23,6 +23,7 @@ py_library( "//compiler_gym/datasets", "//compiler_gym/envs", "//compiler_gym/service", + "//compiler_gym/service/runtime", "//compiler_gym/spaces", "//compiler_gym/views", "//examples/sensitivity_analysis:action_sensitivity_analysis", diff --git a/compiler_gym/service/runtime/BUILD b/compiler_gym/service/runtime/BUILD new file mode 100644 index 0000000000..5c75bc297e --- /dev/null +++ b/compiler_gym/service/runtime/BUILD @@ -0,0 +1,40 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +# +# This package implements the CompilerGym service runtime, which is the utility +# code that creates RPC servers and dispatches to CompilationServices. +load("@rules_cc//cc:defs.bzl", "cc_library") +load("@rules_python//python:defs.bzl", "py_library") + +py_library( + name = "runtime", + srcs = ["__init__.py"], + visibility = ["//visibility:public"], + deps = [ + ":benchmark_cache", + ], +) + +py_library( + name = "benchmark_cache", + srcs = ["benchmark_cache.py"], + visibility = ["//tests/service/runtime:__subpackages__"], + deps = [ + "//compiler_gym/service/proto", + ], +) + +cc_library( + name = "BenchmarkCache", + srcs = ["BenchmarkCache.cc"], + hdrs = ["BenchmarkCache.h"], + visibility = ["//tests/service/runtime:__subpackages__"], + deps = [ + "//compiler_gym/service/proto:compiler_gym_service_cc", + "@boost//:filesystem", + "@com_github_grpc_grpc//:grpc++", + "@glog", + ], +) diff --git a/compiler_gym/service/runtime/BenchmarkCache.cc b/compiler_gym/service/runtime/BenchmarkCache.cc new file mode 100644 index 0000000000..cbeffee010 --- /dev/null +++ b/compiler_gym/service/runtime/BenchmarkCache.cc @@ -0,0 +1,83 @@ +// Copyright (c) Facebook, Inc. and its affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. +#include "compiler_gym/service/runtime/BenchmarkCache.h" + +#include + +using grpc::Status; +using grpc::StatusCode; + +namespace compiler_gym::runtime { + +BenchmarkCache::BenchmarkCache(std::optional rand, size_t maxSizeInBytes) + : rand_(rand.has_value() ? *rand : std::mt19937_64(std::random_device()())), + maxSizeInBytes_(maxSizeInBytes), + sizeInBytes_(0){}; + +const Benchmark* BenchmarkCache::get(const std::string& uri) const { + auto it = benchmarks_.find(uri); + if (it == benchmarks_.end()) { + return nullptr; + } + + return &it->second; +} + +void BenchmarkCache::add(const Benchmark&& benchmark) { + VLOG(3) << "Caching benchmark " << benchmark.uri() << ". Cache size = " << sizeInBytes() + << " bytes, " << size() << " items"; + + // Remove any existing value to keep the cache size consistent. + const auto it = benchmarks_.find(benchmark.uri()); + if (it != benchmarks_.end()) { + const size_t replacedSize = it->second.ByteSizeLong(); + benchmarks_.erase(it); + sizeInBytes_ -= replacedSize; + } + + const size_t size = benchmark.ByteSizeLong(); + if (sizeInBytes() + size > maxSizeInBytes()) { + if (size > maxSizeInBytes()) { + LOG(WARNING) << "Adding new benchmark with size " << size + << " bytes exceeds total target cache size of " << maxSizeInBytes() << " bytes"; + } else { + VLOG(3) << "Adding new benchmark with size " << size << " bytes exceeds maximum size " + << maxSizeInBytes() << " bytes, " << this->size() << " items"; + } + prune(); + } + + benchmarks_.insert({benchmark.uri(), std::move(benchmark)}); + sizeInBytes_ += size; +} + +void BenchmarkCache::prune(std::optional targetSize) { + int evicted = 0; + targetSize = targetSize.has_value() ? targetSize : maxSizeInBytes() / 2; + + while (size() && sizeInBytes() > targetSize) { + // Select a benchmark randomly. + std::uniform_int_distribution distribution(0, benchmarks_.size() - 1); + size_t index = distribution(rand_); + auto iterator = std::next(std::begin(benchmarks_), index); + + // Evict the benchmark from the pool of loaded benchmarks. + ++evicted; + sizeInBytes_ -= iterator->second.ByteSizeLong(); + benchmarks_.erase(iterator); + } + + if (evicted) { + VLOG(2) << "Evicted " << evicted << " benchmarks from cache. Benchmark cache " + << "size now " << sizeInBytes() << " bytes, " << benchmarks_.size() << " items"; + } +} + +void BenchmarkCache::setMaxSizeInBytes(size_t maxSizeInBytes) { + maxSizeInBytes_ = maxSizeInBytes; + prune(maxSizeInBytes); +} + +} // namespace compiler_gym::runtime diff --git a/compiler_gym/service/runtime/BenchmarkCache.h b/compiler_gym/service/runtime/BenchmarkCache.h new file mode 100644 index 0000000000..36ad6d481f --- /dev/null +++ b/compiler_gym/service/runtime/BenchmarkCache.h @@ -0,0 +1,54 @@ +// Copyright (c) Facebook, Inc. and its affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. +#pragma once + +#include + +#include +#include +#include +#include + +#include "boost/filesystem.hpp" +#include "compiler_gym/service/proto/compiler_gym_service.pb.h" + +namespace compiler_gym::runtime { + +constexpr size_t kEvictionSizeInBytes = 512 * 1024 * 1024; + +// An in-memory cache of Benchmark protocol buffers. +// +// This object caches Benchmark messages by URI. Once the cache reaches a +// predetermined size, benchmarks are evicted randomly until the capacity is +// reduced to 50%. +class BenchmarkCache { + public: + BenchmarkCache(std::optional rand = std::nullopt, + size_t maxSizeInBytes = kEvictionSizeInBytes); + + // The pointer set by benchmark is valid only until the next call to add(). + const Benchmark* get(const std::string& uri) const; + + // Move-insert the given benchmark to the cache. + void add(const Benchmark&& benchmark); + + inline size_t size() const { return benchmarks_.size(); }; + inline size_t sizeInBytes() const { return sizeInBytes_; }; + inline size_t maxSizeInBytes() const { return maxSizeInBytes_; }; + + void setMaxSizeInBytes(size_t maxSizeInBytes); + + // Evict benchmarks randomly to reduce the capacity below 50%. + void prune(std::optional targetSize = std::nullopt); + + private: + std::unordered_map benchmarks_; + + std::mt19937_64 rand_; + size_t maxSizeInBytes_; + size_t sizeInBytes_; +}; + +} // namespace compiler_gym::runtime diff --git a/compiler_gym/service/runtime/__init__.py b/compiler_gym/service/runtime/__init__.py new file mode 100644 index 0000000000..6264236915 --- /dev/null +++ b/compiler_gym/service/runtime/__init__.py @@ -0,0 +1,4 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. diff --git a/compiler_gym/service/runtime/benchmark_cache.py b/compiler_gym/service/runtime/benchmark_cache.py new file mode 100644 index 0000000000..efd9a13f56 --- /dev/null +++ b/compiler_gym/service/runtime/benchmark_cache.py @@ -0,0 +1,128 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +import logging +from typing import Dict, Optional + +import numpy as np + +from compiler_gym.service.proto import Benchmark + +MAX_SIZE_IN_BYTES = 512 * 104 * 1024 + + +class BenchmarkCache: + """An in-memory cache of Benchmark messages. + + This object caches Benchmark messages by URI. Once the cache reaches a + predetermined size, benchmarks are evicted randomly until the capacity is + reduced to 50%. + """ + + def __init__( + self, + max_size_in_bytes: int = MAX_SIZE_IN_BYTES, + logger: Optional[logging.Logger] = None, + rng: Optional[np.random.Generator] = None, + ): + self.rng = rng or np.random.default_rng() + self._max_size_in_bytes = max_size_in_bytes + self.logger = logger or logging.getLogger("compiler_gym") + + self._benchmarks: Dict[str, Benchmark] = {} + self._size_in_bytes = 0 + + def __getitem__(self, uri: str) -> Benchmark: + """Get a benchmark by URI. Raises KeyError.""" + item = self._benchmarks.get(uri) + if item is None: + raise KeyError(uri) + return item + + def __contains__(self, uri: str): + """Whether URI is in cache.""" + return uri in self._benchmarks + + def __setitem__(self, uri: str, benchmark: Benchmark): + """Add benchmark to cache.""" + self.logger.debug( + "Caching benchmark %s. Cache size = %d bytes, %d items", + uri, + self.size_in_bytes, + self.size, + ) + + # Remove any existing value to keep the cache size consistent. + if uri in self._benchmarks: + self._size_in_bytes -= self._benchmarks[uri].ByteSize() + del self._benchmarks[uri] + + size = benchmark.ByteSize() + if self.size_in_bytes + size > self.max_size_in_bytes: + if size > self.max_size_in_bytes: + self.logger.warning( + "Adding new benchmark with size %d bytes exceeds total " + "target cache size of %d bytes", + size, + self.max_size_in_bytes, + ) + else: + self.logger.debug( + "Adding new benchmark with size %d bytes " + "exceeds maximum size %d bytes, %d items", + size, + self.max_size_in_bytes, + self.size, + ) + self.prune() + + self._benchmarks[uri] = benchmark + self._size_in_bytes += size + + def prune(self, target_size_in_bytes: Optional[int] = None) -> None: + """Evict benchmarks randomly to reduce the capacity below 50%.""" + evicted = 0 + target_size_in_bytes = ( + self.max_size_in_bytes // 2 + if target_size_in_bytes is None + else target_size_in_bytes + ) + + while self.size and self.size_in_bytes > target_size_in_bytes: + evicted += 1 + key = self.rng.choice(list(self._benchmarks.keys())) + self._size_in_bytes -= self._benchmarks[key].ByteSize() + del self._benchmarks[key] + + if evicted: + self.logger.info( + "Evicted %d benchmarks from cache. " + "Benchmark cache size now %d bytes, %d items", + evicted, + self.size_in_bytes, + self.size, + ) + + @property + def size(self) -> int: + """The number of items in the cache.""" + return len(self._benchmarks) + + @property + def size_in_bytes(self) -> int: + """The combined size of the elements in the cache, excluding the + cache overhead. + """ + return self._size_in_bytes + + @property + def max_size_in_bytes(self) -> int: + """The maximum size of the cache.""" + return self._max_size_in_bytes + + @max_size_in_bytes.setter + def max_size_in_bytes(self, value: int) -> None: + """Set a new maximum cache size.""" + self._max_size_in_bytes = value + self.prune(target_size_in_bytes=value) diff --git a/setup.py b/setup.py index d361a6a574..2841bded9e 100644 --- a/setup.py +++ b/setup.py @@ -58,8 +58,9 @@ def get_tag(self): "compiler_gym.envs", "compiler_gym.envs", "compiler_gym.leaderboard", - "compiler_gym.service.proto", "compiler_gym.service", + "compiler_gym.service.proto", + "compiler_gym.service.runtime", "compiler_gym.spaces", "compiler_gym.third_party.autophase", "compiler_gym.third_party.inst2vec", diff --git a/tests/service/runtime/BUILD b/tests/service/runtime/BUILD new file mode 100644 index 0000000000..9e659686bc --- /dev/null +++ b/tests/service/runtime/BUILD @@ -0,0 +1,27 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +load("@rules_cc//cc:defs.bzl", "cc_test") +load("@rules_python//python:defs.bzl", "py_test") + +py_test( + name = "benchmark_cache_test", + srcs = ["benchmark_cache_test.py"], + deps = [ + "//compiler_gym/service/proto", + "//compiler_gym/service/runtime:benchmark_cache", + "//tests:test_main", + ], +) + +cc_test( + name = "BenchmarkCacheTest", + srcs = ["BenchmarkCacheTest.cc"], + deps = [ + "//compiler_gym/service/proto:compiler_gym_service_cc", + "//compiler_gym/service/runtime:BenchmarkCache", + "//tests:TestMain", + "@gtest", + ], +) diff --git a/tests/service/runtime/BenchmarkCacheTest.cc b/tests/service/runtime/BenchmarkCacheTest.cc new file mode 100644 index 0000000000..c120a06d43 --- /dev/null +++ b/tests/service/runtime/BenchmarkCacheTest.cc @@ -0,0 +1,100 @@ +// Copyright (c) Facebook, Inc. and its affiliates. +// +// This source code is licensed under the MIT license found in the +// LICENSE file in the root directory of this source tree. +#include + +#include + +#include "compiler_gym/service/proto/compiler_gym_service.pb.h" +#include "compiler_gym/service/runtime/BenchmarkCache.h" + +using namespace ::testing; + +namespace compiler_gym::runtime { +namespace { + +// Test helper. Generate a benchmark of the given size in bytes. +Benchmark makeBenchmarkOfSize(const std::string& uri, int sizeInBytes, int target) { + Benchmark bm; + bm.set_uri(uri); + std::vector contents(target, '0'); + *bm.mutable_program()->mutable_contents() = {contents.begin(), contents.end()}; + int sizeOffset = bm.ByteSizeLong() - sizeInBytes; + if (sizeOffset) { + return makeBenchmarkOfSize(uri, sizeInBytes, sizeInBytes - sizeOffset); + } + + return bm; +} + +Benchmark makeBenchmarkOfSize(const std::string& uri, int sizeInBytes) { + return makeBenchmarkOfSize(uri, sizeInBytes, sizeInBytes); +} + +TEST(BenchmarkCache, makeBenchmarkOfSize) { + // Sanity check for test helper function. + ASSERT_EQ(makeBenchmarkOfSize("a", 10).ByteSizeLong(), 10); + ASSERT_EQ(makeBenchmarkOfSize("abc", 10).ByteSizeLong(), 10); + ASSERT_EQ(makeBenchmarkOfSize("a", 50).ByteSizeLong(), 50); + ASSERT_EQ(makeBenchmarkOfSize("a", 100).ByteSizeLong(), 100); + ASSERT_EQ(makeBenchmarkOfSize("a", 1024).ByteSizeLong(), 1024); +} + +TEST(BenchmarkCache, replaceExistingItem) { + BenchmarkCache cache; + + cache.add(makeBenchmarkOfSize("a", 30)); + ASSERT_EQ(cache.size(), 1); + ASSERT_EQ(cache.sizeInBytes(), 30); + + cache.add(makeBenchmarkOfSize("a", 50)); + ASSERT_EQ(cache.size(), 1); + ASSERT_EQ(cache.sizeInBytes(), 50); +} + +TEST(BenchmarkCache, pruneOnMaxSizeReached) { + BenchmarkCache cache; + cache.setMaxSizeInBytes(100); + + cache.add(makeBenchmarkOfSize("a", 30)); + cache.add(makeBenchmarkOfSize("b", 30)); + cache.add(makeBenchmarkOfSize("c", 30)); + ASSERT_EQ(cache.sizeInBytes(), 90); + ASSERT_EQ(cache.size(), 3); + + cache.add(makeBenchmarkOfSize("d", 30)); + ASSERT_EQ(cache.sizeInBytes(), 60); + ASSERT_EQ(cache.size(), 2); +} + +TEST(BenchmarkCache, getter) { + BenchmarkCache cache; + + const auto a = makeBenchmarkOfSize("a", 30); + cache.add(makeBenchmarkOfSize("a", 30)); + + const auto b = makeBenchmarkOfSize("b", 50); + cache.add(makeBenchmarkOfSize("b", 50)); + + ASSERT_EQ(cache.get("a")->DebugString(), a.DebugString()); + ASSERT_NE(cache.get("a")->DebugString(), b.DebugString()); + ASSERT_EQ(cache.get("b")->DebugString(), b.DebugString()); +} + +TEST(BenchmarkCache, pruneOnMaximumSizeUpdate) { + BenchmarkCache cache; + + cache.add(makeBenchmarkOfSize("a", 30)); + cache.add(makeBenchmarkOfSize("b", 30)); + cache.add(makeBenchmarkOfSize("c", 30)); + ASSERT_EQ(cache.sizeInBytes(), 90); + ASSERT_EQ(cache.size(), 3); + + cache.setMaxSizeInBytes(50); + ASSERT_EQ(cache.size(), 1); + ASSERT_EQ(cache.sizeInBytes(), 30); +} + +} // anonymous namespace +} // namespace compiler_gym::runtime diff --git a/tests/service/runtime/benchmark_cache_test.py b/tests/service/runtime/benchmark_cache_test.py new file mode 100644 index 0000000000..28e3ac6517 --- /dev/null +++ b/tests/service/runtime/benchmark_cache_test.py @@ -0,0 +1,132 @@ +# Copyright (c) Facebook, Inc. and its affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. +"""Unit tests for //compiler_gym/service/runtime:benchmark_cache.""" + +import pytest + +from compiler_gym.service.proto import Benchmark, File +from compiler_gym.service.runtime.benchmark_cache import BenchmarkCache +from tests.test_main import main + + +def make_benchmark_of_size(size_in_bytes: int, target: int = 0) -> Benchmark: + """Test helper. Generate a benchmark of the given size in bytes.""" + target = target or size_in_bytes + bm = Benchmark(program=File(contents=("." * target).encode("utf-8"))) + size_offset = bm.ByteSize() - size_in_bytes + if size_offset: + return make_benchmark_of_size(size_in_bytes, size_in_bytes - size_offset) + return bm + + +@pytest.mark.parametrize("size", [5, 10, 100, 1024]) +def test_make_benchmark_of_size(size: int): + """Sanity check for test helper function.""" + assert make_benchmark_of_size(size).ByteSize() == size + + +def test_oversized_benchmark_triggers_prune(mocker): + cache = BenchmarkCache(max_size_in_bytes=10) + + mocker.spy(cache, "prune") + + cache["test"] = make_benchmark_of_size(50) + + assert cache.size == 1 + assert cache.size_in_bytes == 50 + + cache.prune.assert_called_once() + + +def test_prune_on_max_size_reached(mocker): + """Test that cache is pruned when the maximum size is exceeded.""" + cache = BenchmarkCache(max_size_in_bytes=100) + + mocker.spy(cache, "prune") + mocker.spy(cache.logger, "info") + + cache["a"] = make_benchmark_of_size(30) + cache["b"] = make_benchmark_of_size(30) + cache["c"] = make_benchmark_of_size(30) + assert cache.prune.call_count == 0 + + cache["d"] = make_benchmark_of_size(30) + assert cache.prune.call_count == 1 + + assert cache.size == 2 + assert cache.size_in_bytes == 60 + + cache.logger.info.assert_called_once_with( + "Evicted %d benchmarks from cache. Benchmark cache size now %d bytes, " + "%d items", + 2, + 30, + 1, + ) + + +def test_oversized_benchmark_emits_warning(mocker): + """Test that a warning is emitted when a single item is larger than the + entire target cache size. + """ + cache = BenchmarkCache(max_size_in_bytes=10) + + mocker.spy(cache.logger, "warning") + + cache["test"] = make_benchmark_of_size(50) + + cache.logger.warning.assert_called_once_with( + "Adding new benchmark with size %d bytes exceeds total target cache " + "size of %d bytes", + 50, + 10, + ) + + +def test_contains(): + cache = BenchmarkCache(max_size_in_bytes=100) + + cache["a"] = make_benchmark_of_size(30) + + assert "a" in cache + assert "b" not in cache + + +def test_getter(): + cache = BenchmarkCache(max_size_in_bytes=100) + + a = make_benchmark_of_size(30) + b = make_benchmark_of_size(40) + + cache["a"] = a + cache["b"] = b + + assert cache["a"] == a + assert cache["a"] != b + assert cache["b"] == b + + with pytest.raises(KeyError, match="c"): + cache["c"] + + +def test_prune_on_maximum_size_update(mocker): + """Test that cache is pruned when the maximum size is exceeded.""" + cache = BenchmarkCache(max_size_in_bytes=100) + + mocker.spy(cache, "prune") + mocker.spy(cache.logger, "info") + + cache["a"] = make_benchmark_of_size(30) + cache["b"] = make_benchmark_of_size(30) + cache["c"] = make_benchmark_of_size(30) + assert cache.prune.call_count == 0 + + cache.max_size_in_bytes = 50 + assert cache.prune.call_count == 1 + assert cache.size_in_bytes == 30 + + +if __name__ == "__main__": + main()