Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom backoff support for raw api #350

Merged
merged 3 commits into from
Jun 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 139 additions & 14 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,15 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
self.get_opt(key, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`get`](Client::get) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn get_opt(&self, key: impl Into<Key>, backoff: Backoff) -> Result<Option<Value>> {
debug!(self.logger, "invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -219,11 +224,20 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
self.batch_get_opt(keys, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_get`](Client::batch_get) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_get_opt(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(Collect)
.plan();
plan.execute()
Expand All @@ -248,10 +262,20 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.put_opt(key, value, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn put_opt(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.extract_error()
.plan();
Expand Down Expand Up @@ -279,6 +303,15 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn batch_put(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
self.batch_put_opt(pairs, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_put_opt(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
let request = new_raw_batch_put_request(
Expand All @@ -287,7 +320,7 @@ impl<PdC: PdClient> Client<PdC> {
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.extract_error()
.plan();
plan.execute().await?;
Expand All @@ -312,10 +345,15 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
self.delete_opt(key, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`delete`](Client::delete) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn delete_opt(&self, key: impl Into<Key>, backoff: Backoff) -> Result<()> {
debug!(self.logger, "invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.extract_error()
.plan();
Expand All @@ -341,12 +379,21 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
self.batch_delete_opt(keys, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_delete`](Client::batch_delete) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_delete_opt(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.extract_error()
.plan();
plan.execute().await?;
Expand All @@ -372,6 +419,7 @@ impl<PdC: PdClient> Client<PdC> {
self.delete_range_opt(range, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`delete_range`](Client::delete_range) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn delete_range_opt(
&self,
range: impl Into<BoundRange>,
Expand Down Expand Up @@ -408,8 +456,18 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
self.scan_opt(range, limit, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`scan`](Client::scan) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn scan_opt(
&self,
range: impl Into<BoundRange>,
limit: u32,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw scan request");
self.scan_inner(range.into(), limit, false).await
self.scan_inner(range.into(), limit, false, backoff).await
}

/// Create a new 'scan' request that only returns the keys.
Expand All @@ -432,9 +490,20 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
self.scan_keys_opt(range, limit, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`scan_keys`](Client::scan_keys) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn scan_keys_opt(
&self,
range: impl Into<BoundRange>,
limit: u32,
backoff: Backoff,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true)
.scan_inner(range, limit, true, backoff)
.await?
.into_iter()
.map(KvPair::into_key)
Expand Down Expand Up @@ -468,9 +537,21 @@ impl<PdC: PdClient> Client<PdC> {
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<KvPair>> {
self.batch_scan_opt(ranges, each_limit, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`batch_scan`](Client::batch_scan) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_scan_opt(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_scan request");
self.batch_scan_inner(ranges, each_limit, false).await
self.batch_scan_inner(ranges, each_limit, false, backoff)
.await
}

/// Create a new 'batch scan' request that only returns the keys.
Expand Down Expand Up @@ -500,10 +581,21 @@ impl<PdC: PdClient> Client<PdC> {
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<Key>> {
self.batch_scan_keys_opt(ranges, each_limit, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`batch_scan_keys`](Client::batch_scan_keys) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_scan_keys_opt(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
backoff: Backoff,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw batch_scan_keys request");
Ok(self
.batch_scan_inner(ranges, each_limit, true)
.batch_scan_inner(ranges, each_limit, true, backoff)
.await?
.into_iter()
.map(KvPair::into_key)
Expand All @@ -527,6 +619,18 @@ impl<PdC: PdClient> Client<PdC> {
key: impl Into<Key>,
previous_value: impl Into<Option<Value>>,
new_value: impl Into<Value>,
) -> Result<(Option<Value>, bool)> {
self.compare_and_swap_opt(key, previous_value, new_value, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`compare_and_swap`](Client::compare_and_swap) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn compare_and_swap_opt(
&self,
key: impl Into<Key>,
previous_value: impl Into<Option<Value>>,
new_value: impl Into<Value>,
backoff: Backoff,
) -> Result<(Option<Value>, bool)> {
debug!(self.logger, "invoking raw compare_and_swap request");
self.assert_atomic()?;
Expand All @@ -537,7 +641,7 @@ impl<PdC: PdClient> Client<PdC> {
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.post_process_default()
.plan();
Expand All @@ -550,6 +654,25 @@ impl<PdC: PdClient> Client<PdC> {
copr_version_req: impl Into<String>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
self.coprocessor_opt(
copr_name,
copr_version_req,
ranges,
request_builder,
DEFAULT_REGION_BACKOFF,
)
.await
}

/// Same as [`coprocessor`](Client::coprocessor) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn coprocessor_opt(
&self,
copr_name: impl Into<String>,
copr_version_req: impl Into<String>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
backoff: Backoff,
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
let copr_version_req = copr_version_req.into();
semver::VersionReq::from_str(&copr_version_req)?;
Expand All @@ -561,7 +684,7 @@ impl<PdC: PdClient> Client<PdC> {
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.preserve_shard()
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.post_process_default()
.plan();
plan.execute().await
Expand All @@ -572,6 +695,7 @@ impl<PdC: PdClient> Client<PdC> {
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
if limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::MaxScanLimitExceeded {
Expand All @@ -582,7 +706,7 @@ impl<PdC: PdClient> Client<PdC> {

let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(Collect)
.plan();
let res = plan.execute().await;
Expand All @@ -597,6 +721,7 @@ impl<PdC: PdClient> Client<PdC> {
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
key_only: bool,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
if each_limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::MaxScanLimitExceeded {
Expand All @@ -612,7 +737,7 @@ impl<PdC: PdClient> Client<PdC> {
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(Collect)
.plan();
plan.execute().await
Expand Down