Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jitter to ExponentialBackoff #4476

Merged
merged 2 commits into from
Mar 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,10 @@ where
should_try_unfail_non_deterministic: true,
synced: false,
skip_ptr_updates_timer: Instant::now(),
backoff: ExponentialBackoff::new(
backoff: ExponentialBackoff::with_jitter(
(MINUTE * 2).min(env_vars.subgraph_error_retry_ceil),
env_vars.subgraph_error_retry_ceil,
env_vars.subgraph_error_retry_jitter,
),
entity_lfu_cache: LfuCache::new(),
},
Expand Down
12 changes: 10 additions & 2 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,13 @@ pub struct EnvVars {
/// Ceiling for the backoff retry of non-deterministic errors.
///
/// Set by the environment variable `GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS`
/// (expressed in seconds). The default value is 1800s (30 minutes).
/// (expressed in seconds). The default value is 3600s (60 minutes).
pub subgraph_error_retry_ceil: Duration,
/// Jitter factor for the backoff retry of non-deterministic errors.
///
/// Set by the environment variable `GRAPH_SUBGRAPH_ERROR_RETRY_JITTER`
/// (clamped between 0.0 and 1.0). The default value is 0.2.
pub subgraph_error_retry_jitter: f64,
/// Experimental feature.
///
/// Set by the flag `GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES`. Off by
Expand Down Expand Up @@ -210,6 +215,7 @@ impl EnvVars {
subgraph_max_data_sources: inner.subgraph_max_data_sources.0,
disable_fail_fast: inner.disable_fail_fast.0,
subgraph_error_retry_ceil: Duration::from_secs(inner.subgraph_error_retry_ceil_in_secs),
subgraph_error_retry_jitter: inner.subgraph_error_retry_jitter,
enable_select_by_specific_attributes: inner.enable_select_by_specific_attributes.0,
log_trigger_data: inner.log_trigger_data.0,
explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs),
Expand Down Expand Up @@ -313,8 +319,10 @@ struct Inner {
subgraph_max_data_sources: NoUnderscores<usize>,
#[envconfig(from = "GRAPH_DISABLE_FAIL_FAST", default = "false")]
disable_fail_fast: EnvVarBoolean,
#[envconfig(from = "GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS", default = "1800")]
#[envconfig(from = "GRAPH_SUBGRAPH_ERROR_RETRY_CEIL_SECS", default = "3600")]
subgraph_error_retry_ceil_in_secs: u64,
#[envconfig(from = "GRAPH_SUBGRAPH_ERROR_RETRY_JITTER", default = "0.2")]
subgraph_error_retry_jitter: f64,
#[envconfig(from = "GRAPH_ENABLE_SELECT_BY_SPECIFIC_ATTRIBUTES", default = "false")]
enable_select_by_specific_attributes: EnvVarBoolean,
#[envconfig(from = "GRAPH_LOG_TRIGGER_DATA", default = "false")]
Expand Down
40 changes: 39 additions & 1 deletion graph/src/util/backoff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub struct ExponentialBackoff {
pub attempt: u64,
base: Duration,
ceiling: Duration,
jitter: f64,
}

impl ExponentialBackoff {
Expand All @@ -16,6 +17,19 @@ impl ExponentialBackoff {
attempt: 0,
base,
ceiling,
jitter: 0.0,
}
}

// Create ExponentialBackoff with jitter
// jitter is a value between 0.0 and 1.0. Sleep delay will be randomized
// within `jitter` of the normal sleep delay
pub fn with_jitter(base: Duration, ceiling: Duration, jitter: f64) -> Self {
ExponentialBackoff {
attempt: 0,
base,
ceiling,
jitter: jitter.clamp(0.0, 1.0),
}
}

Expand All @@ -37,7 +51,8 @@ impl ExponentialBackoff {
if delay > self.ceiling {
delay = self.ceiling;
}
delay
let jitter = rand::Rng::gen_range(&mut rand::thread_rng(), -self.jitter..=self.jitter);
delay.mul_f64(1.0 + jitter)
}

fn next_attempt(&mut self) -> Duration {
Expand Down Expand Up @@ -80,6 +95,29 @@ mod tests {
assert_eq!(backoff.next_attempt(), Duration::from_secs(5));
}

#[test]
fn test_delay_with_jitter() {
let mut backoff = ExponentialBackoff::with_jitter(
Duration::from_millis(1000),
Duration::from_secs(5),
0.1,
);

// Delay should be between 0.5s and 1.5s
let delay1 = backoff.delay();
assert!(delay1 > Duration::from_millis(900) && delay1 <= Duration::from_millis(1100));
let delay2 = backoff.delay();
assert!(delay2 > Duration::from_millis(900) && delay2 <= Duration::from_millis(1100));

// Delays should be random and different
assert_ne!(delay1, delay2);

// Test ceiling
backoff.attempt = 123456;
let delay = backoff.delay();
assert!(delay > Duration::from_millis(4500) && delay <= Duration::from_millis(5500));
}

#[test]
fn test_overflow_delay() {
let mut backoff =
Expand Down