Skip to content

Commit

Permalink
Merge branch 'andriy/run-851-max-expiry' into 'master'
Browse files Browse the repository at this point in the history
feat: RUN-851: Implement max expiry for cache entries

With System API tracking it will be possible for the cache entries to never be invalidated. This MR caps the maximum cache entry lifetime to 60s.

Closes RUN-851 

Closes RUN-851

See merge request dfinity-lab/public/ic!16643
  • Loading branch information
berestovskyy committed Dec 12, 2023
2 parents ea368f6 + 80675d8 commit a9fc954
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 10 deletions.
7 changes: 7 additions & 0 deletions rs/config/src/execution_environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ const QUERY_SCHEDULING_TIME_SLICE_PER_CANISTER: Duration = Duration::from_millis
/// executions and user errors.
const QUERY_CACHE_CAPACITY: NumBytes = NumBytes::new(100 * MIB);

/// The upper limit on how long the cache entry stays valid in the query cache.
const QUERY_CACHE_MAX_EXPIRY_TIME: Duration = Duration::from_secs(60);

// The ID of the Bitcoin testnet canister.
pub const BITCOIN_TESTNET_CANISTER_ID: &str = "g4xu7-jiaaa-aaaan-aaaaq-cai";

Expand Down Expand Up @@ -224,6 +227,9 @@ pub struct Config {
/// Query cache capacity in bytes
pub query_cache_capacity: NumBytes,

/// The upper limit on how long the cache entry stays valid in the query cache.
pub query_cache_max_expiry_time: Duration,

/// The capacity of the Wasm compilation cache.
pub max_compilation_cache_size: NumBytes,

Expand Down Expand Up @@ -297,6 +303,7 @@ impl Default for Config {
composite_queries: FlagStatus::Enabled,
query_caching: FlagStatus::Enabled,
query_cache_capacity: QUERY_CACHE_CAPACITY,
query_cache_max_expiry_time: QUERY_CACHE_MAX_EXPIRY_TIME,
max_compilation_cache_size: MAX_COMPILATION_CACHE_SIZE,
query_stats_aggregation: FlagStatus::Disabled,
wasm_chunk_store: FlagStatus::Disabled,
Expand Down
7 changes: 6 additions & 1 deletion rs/execution_environment/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ impl InternalHttpQueryHandler {
local_query_execution_stats: QueryStatsCollector,
) -> Self {
let query_cache_capacity = config.query_cache_capacity;
let query_cache_max_expiry_time = config.query_cache_max_expiry_time;
Self {
log,
hypervisor,
Expand All @@ -139,7 +140,11 @@ impl InternalHttpQueryHandler {
max_instructions_per_query,
cycles_account_manager,
local_query_execution_stats,
query_cache: query_cache::QueryCache::new(metrics_registry, query_cache_capacity),
query_cache: query_cache::QueryCache::new(
metrics_registry,
query_cache_capacity,
query_cache_max_expiry_time,
),
}
}

Expand Down
40 changes: 34 additions & 6 deletions rs/execution_environment/src/query_handler/query_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use ic_replicated_state::ReplicatedState;
use ic_types::{ingress::WasmResult, messages::UserQuery, CountBytes, Cycles, Time, UserId};
use ic_utils_lru_cache::LruCache;
use prometheus::{Histogram, IntCounter, IntGauge};
use std::{mem::size_of_val, sync::Mutex};
use std::{mem::size_of_val, sync::Mutex, time::Duration};

use crate::metrics::duration_histogram;

Expand All @@ -21,6 +21,7 @@ pub(crate) struct QueryCacheMetrics {
pub evicted_entries_duration: Histogram,
pub invalidated_entries: IntCounter,
pub invalidated_entries_by_time: IntCounter,
pub invalidated_entries_by_max_expiry_time: IntCounter,
pub invalidated_entries_by_canister_version: IntCounter,
pub invalidated_entries_by_canister_balance: IntCounter,
pub invalidated_entries_duration: Histogram,
Expand Down Expand Up @@ -56,6 +57,10 @@ impl QueryCacheMetrics {
"execution_query_cache_invalidated_entries_by_time_total",
"The total number of invalidated entries due to the changed time",
),
invalidated_entries_by_max_expiry_time: metrics_registry.int_counter(
"execution_query_cache_invalidated_entries_by_max_expiry_time_total",
"The total number of invalidated entries due to the max expiry time",
),
invalidated_entries_by_canister_version: metrics_registry.int_counter(
"execution_query_cache_invalidated_entries_by_canister_version_total",
"The total number of invalidated entries due to the changed canister version",
Expand Down Expand Up @@ -167,14 +172,26 @@ impl EntryValue {
Self { env, result }
}

fn is_valid(&self, env: &EntryEnv) -> bool {
self.env == *env
fn is_valid(&self, env: &EntryEnv, max_expiry_time: Duration) -> bool {
self.is_valid_time(env)
&& self.is_not_expired(env, max_expiry_time)
&& self.is_valid_canister_version(env)
&& self.is_valid_canister_balance(env)
}

fn is_valid_time(&self, env: &EntryEnv) -> bool {
self.env.batch_time == env.batch_time
}

/// Check cache entry max expiration time.
fn is_not_expired(&self, env: &EntryEnv, max_expiry_time: Duration) -> bool {
if let Some(duration) = env.batch_time.checked_sub(self.env.batch_time) {
duration <= max_expiry_time
} else {
true
}
}

fn is_valid_canister_version(&self, env: &EntryEnv) -> bool {
self.env.canister_version == env.canister_version
}
Expand All @@ -198,7 +215,9 @@ pub(crate) struct QueryCache {
// We can't use `RwLock`, as the `LruCache::get()` requires mutable reference
// to update the LRU.
cache: Mutex<LruCache<EntryKey, EntryValue>>,
// Query cache metrics (public for tests)
/// The upper limit on how long the cache entry stays valid in the query cache.
max_expiry_time: Duration,
/// Query cache metrics (public for tests)
pub(crate) metrics: QueryCacheMetrics,
}

Expand All @@ -209,9 +228,15 @@ impl CountBytes for QueryCache {
}

impl QueryCache {
pub(crate) fn new(metrics_registry: &MetricsRegistry, capacity: NumBytes) -> Self {
/// Create a new `QueryCache` instance.
pub(crate) fn new(
metrics_registry: &MetricsRegistry,
capacity: NumBytes,
max_expiry_time: Duration,
) -> Self {
QueryCache {
cache: Mutex::new(LruCache::new(capacity)),
max_expiry_time,
metrics: QueryCacheMetrics::new(metrics_registry),
}
}
Expand All @@ -225,7 +250,7 @@ impl QueryCache {
let now = env.batch_time;

if let Some(value) = cache.get(key) {
if value.is_valid(env) {
if value.is_valid(env, self.max_expiry_time) {
let res = value.result();
// Update the metrics.
self.metrics.hits.inc();
Expand All @@ -243,6 +268,9 @@ impl QueryCache {
if !value.is_valid_time(env) {
self.metrics.invalidated_entries_by_time.inc();
}
if !value.is_not_expired(env, self.max_expiry_time) {
self.metrics.invalidated_entries_by_max_expiry_time.inc();
}
if !value.is_valid_canister_version(env) {
self.metrics.invalidated_entries_by_canister_version.inc();
}
Expand Down
73 changes: 71 additions & 2 deletions rs/execution_environment/src/query_handler/query_cache/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -480,6 +480,7 @@ fn query_cache_env_different_batch_time_returns_different_results() {
assert_eq!(output_1, output_2);
assert_eq!(1, metrics.invalidated_entries.get());
assert_eq!(1, metrics.invalidated_entries_by_time.get());
assert_eq!(0, metrics.invalidated_entries_by_max_expiry_time.get());
assert_eq!(0, metrics.invalidated_entries_by_canister_version.get());
assert_eq!(0, metrics.invalidated_entries_by_canister_balance.get());
assert_eq!(
Expand All @@ -493,6 +494,66 @@ fn query_cache_env_different_batch_time_returns_different_results() {
}
}

#[test]
fn query_cache_env_expired_entries_returns_different_results() {
let max_expiry_time = Duration::from_secs(10);
let mut test = ExecutionTestBuilder::new()
.with_query_caching()
.with_query_cache_max_expiry_time(max_expiry_time)
.build();
let canister_id = test.universal_canister_with_cycles(CYCLES_BALANCE).unwrap();
let output_1 = test.query(
UserQuery {
source: user_test_id(1),
receiver: canister_id,
method_name: "query".into(),
method_payload: wasm().reply_data(&[42]).build(),
ingress_expiry: 0,
nonce: None,
},
Arc::new(test.state().clone()),
vec![],
);
{
let query_handler = downcast_query_handler(test.query_handler());
assert_eq!(query_handler.query_cache.metrics.misses.get(), 1);
assert_eq!(output_1, Ok(WasmResult::Reply([42].into())));
}
test.state_mut().metadata.batch_time += max_expiry_time + Duration::from_secs(1);
let output_2 = test.query(
UserQuery {
source: user_test_id(1),
receiver: canister_id,
method_name: "query".into(),
method_payload: wasm().reply_data(&[42]).build(),
ingress_expiry: 0,
nonce: None,
},
Arc::new(test.state().clone()),
vec![],
);
{
let metrics = &downcast_query_handler(test.query_handler())
.query_cache
.metrics;
assert_eq!(2, metrics.misses.get());
assert_eq!(output_1, output_2);
assert_eq!(1, metrics.invalidated_entries.get());
assert_eq!(1, metrics.invalidated_entries_by_time.get());
assert_eq!(1, metrics.invalidated_entries_by_max_expiry_time.get());
assert_eq!(0, metrics.invalidated_entries_by_canister_version.get());
assert_eq!(0, metrics.invalidated_entries_by_canister_balance.get());
assert_eq!(
(max_expiry_time + Duration::from_secs(1)).as_secs(),
metrics.invalidated_entries_duration.get_sample_sum() as u64
);
assert_eq!(
1,
metrics.invalidated_entries_duration.get_sample_count() as usize
);
}
}

#[test]
fn query_cache_env_invalidated_entries_negative_duration_works() {
let mut test = ExecutionTestBuilder::new().with_query_caching().build();
Expand Down Expand Up @@ -537,6 +598,7 @@ fn query_cache_env_invalidated_entries_negative_duration_works() {
.metrics;
assert_eq!(output_1, output_2);
assert_eq!(1, metrics.invalidated_entries_by_time.get());
assert_eq!(0, metrics.invalidated_entries_by_max_expiry_time.get());
// Negative durations should give just 0.
assert_eq!(
0,
Expand Down Expand Up @@ -593,6 +655,7 @@ fn query_cache_env_different_canister_version_returns_different_results() {
assert_eq!(output_1, output_2);
assert_eq!(1, metrics.invalidated_entries.get());
assert_eq!(0, metrics.invalidated_entries_by_time.get());
assert_eq!(0, metrics.invalidated_entries_by_max_expiry_time.get());
assert_eq!(1, metrics.invalidated_entries_by_canister_version.get());
assert_eq!(0, metrics.invalidated_entries_by_canister_balance.get());
assert_eq!(
Expand Down Expand Up @@ -650,6 +713,7 @@ fn query_cache_env_different_canister_balance_returns_different_results() {
assert_eq!(output_1, output_2);
assert_eq!(1, metrics.invalidated_entries.get());
assert_eq!(0, metrics.invalidated_entries_by_time.get());
assert_eq!(0, metrics.invalidated_entries_by_max_expiry_time.get());
assert_eq!(0, metrics.invalidated_entries_by_canister_version.get());
assert_eq!(1, metrics.invalidated_entries_by_canister_balance.get());
assert_eq!(
Expand All @@ -665,7 +729,11 @@ fn query_cache_env_different_canister_balance_returns_different_results() {

#[test]
fn query_cache_env_combined_invalidation() {
let mut test = ExecutionTestBuilder::new().with_query_caching().build();
let max_expiry_time = Duration::from_secs(10);
let mut test = ExecutionTestBuilder::new()
.with_query_caching()
.with_query_cache_max_expiry_time(max_expiry_time)
.build();
let canister_id = test.universal_canister_with_cycles(CYCLES_BALANCE).unwrap();
let output_1 = test.query(
UserQuery {
Expand All @@ -679,7 +747,7 @@ fn query_cache_env_combined_invalidation() {
Arc::new(test.state().clone()),
vec![],
);
test.state_mut().metadata.batch_time += Duration::from_secs(1);
test.state_mut().metadata.batch_time += max_expiry_time + Duration::from_secs(1);
test.canister_state_mut(canister_id)
.system_state
.canister_version += 1;
Expand All @@ -706,6 +774,7 @@ fn query_cache_env_combined_invalidation() {
assert_eq!(output_1, output_2);
assert_eq!(1, metrics.invalidated_entries.get());
assert_eq!(1, metrics.invalidated_entries_by_time.get());
assert_eq!(1, metrics.invalidated_entries_by_max_expiry_time.get());
assert_eq!(1, metrics.invalidated_entries_by_canister_version.get());
assert_eq!(1, metrics.invalidated_entries_by_canister_balance.get());
}
Expand Down
10 changes: 9 additions & 1 deletion rs/test_utilities/execution_environment/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,12 @@ use ic_wasm_types::BinaryEncodedWasm;

use ic_config::embedders::MeteringType;
use maplit::{btreemap, btreeset};
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::convert::TryFrom;
use std::sync::Arc;
use std::{
collections::{BTreeMap, BTreeSet, HashMap},
time::Duration,
};
use std::{os::unix::prelude::FileExt, str::FromStr};
use tempfile::NamedTempFile;

Expand Down Expand Up @@ -1799,6 +1802,11 @@ impl ExecutionTestBuilder {
self
}

pub fn with_query_cache_max_expiry_time(mut self, max_expiry_time: Duration) -> Self {
self.execution_config.query_cache_max_expiry_time = max_expiry_time;
self
}

pub fn with_query_stats(mut self) -> Self {
self.execution_config.query_stats_aggregation = FlagStatus::Enabled;
self
Expand Down
14 changes: 14 additions & 0 deletions rs/tests/dashboards/IC/execution-metrics.json
Original file line number Diff line number Diff line change
Expand Up @@ -8871,6 +8871,20 @@
"range": true,
"refId": "A"
},
{
"datasource": {
"type": "prometheus",
"uid": "000000001"
},
"editorMode": "code",
"exemplar": true,
"expr": "label_replace(\n sum(\n rate(execution_query_cache_invalidated_entries_by_max_expiry_time_total{job=\"replica\",ic=\"$ic\",ic_subnet=~\"$ic_subnet\",instance=~\"$instance\"}[$__rate_interval])\n )\n ,\n \"ic_subnet\", \"$1\", \"ic_subnet\", \"([a-z0-9]+)-.*\"\n)\n",
"hide": false,
"interval": "",
"legendFormat": "Max expiry time",
"range": true,
"refId": "D"
},
{
"datasource": {
"type": "prometheus",
Expand Down

0 comments on commit a9fc954

Please sign in to comment.