Skip to content

Commit

Permalink
Custom backoff support for raw api (#350)
Browse files Browse the repository at this point in the history
Signed-off-by: Andrey Koshchiy <roguepnz@gmail.com>

Co-authored-by: ekexium <ekexium@fastmail.com>
  • Loading branch information
Andrey Koshchiy and ekexium authored Jun 21, 2022
1 parent b524bc6 commit e9d0dcd
Showing 1 changed file with 139 additions and 14 deletions.
153 changes: 139 additions & 14 deletions src/raw/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,10 +188,15 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn get(&self, key: impl Into<Key>) -> Result<Option<Value>> {
self.get_opt(key, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`get`](Client::get) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn get_opt(&self, key: impl Into<Key>, backoff: Backoff) -> Result<Option<Value>> {
debug!(self.logger, "invoking raw get request");
let request = new_raw_get_request(key.into(), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.post_process_default()
.plan();
Expand Down Expand Up @@ -219,11 +224,20 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn batch_get(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
) -> Result<Vec<KvPair>> {
self.batch_get_opt(keys, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_get`](Client::batch_get) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_get_opt(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_get request");
let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(Collect)
.plan();
plan.execute()
Expand All @@ -248,10 +262,20 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn put(&self, key: impl Into<Key>, value: impl Into<Value>) -> Result<()> {
self.put_opt(key, value, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn put_opt(
&self,
key: impl Into<Key>,
value: impl Into<Value>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw put request");
let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.extract_error()
.plan();
Expand Down Expand Up @@ -279,6 +303,15 @@ impl<PdC: PdClient> Client<PdC> {
pub async fn batch_put(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
) -> Result<()> {
self.batch_put_opt(pairs, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_put_opt(
&self,
pairs: impl IntoIterator<Item = impl Into<KvPair>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_put request");
let request = new_raw_batch_put_request(
Expand All @@ -287,7 +320,7 @@ impl<PdC: PdClient> Client<PdC> {
self.atomic,
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.extract_error()
.plan();
plan.execute().await?;
Expand All @@ -312,10 +345,15 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn delete(&self, key: impl Into<Key>) -> Result<()> {
self.delete_opt(key, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`delete`](Client::delete) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn delete_opt(&self, key: impl Into<Key>, backoff: Backoff) -> Result<()> {
debug!(self.logger, "invoking raw delete request");
let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.extract_error()
.plan();
Expand All @@ -341,12 +379,21 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn batch_delete(&self, keys: impl IntoIterator<Item = impl Into<Key>>) -> Result<()> {
self.batch_delete_opt(keys, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`batch_delete`](Client::batch_delete) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_delete_opt(
&self,
keys: impl IntoIterator<Item = impl Into<Key>>,
backoff: Backoff,
) -> Result<()> {
debug!(self.logger, "invoking raw batch_delete request");
self.assert_non_atomic()?;
let request =
new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.extract_error()
.plan();
plan.execute().await?;
Expand All @@ -372,6 +419,7 @@ impl<PdC: PdClient> Client<PdC> {
self.delete_range_opt(range, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`delete_range`](Client::delete_range) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn delete_range_opt(
&self,
range: impl Into<BoundRange>,
Expand Down Expand Up @@ -408,8 +456,18 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn scan(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<KvPair>> {
self.scan_opt(range, limit, DEFAULT_REGION_BACKOFF).await
}

/// Same as [`scan`](Client::scan) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn scan_opt(
&self,
range: impl Into<BoundRange>,
limit: u32,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw scan request");
self.scan_inner(range.into(), limit, false).await
self.scan_inner(range.into(), limit, false, backoff).await
}

/// Create a new 'scan' request that only returns the keys.
Expand All @@ -432,9 +490,20 @@ impl<PdC: PdClient> Client<PdC> {
/// # });
/// ```
pub async fn scan_keys(&self, range: impl Into<BoundRange>, limit: u32) -> Result<Vec<Key>> {
self.scan_keys_opt(range, limit, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`scan_keys`](Client::scan_keys) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn scan_keys_opt(
&self,
range: impl Into<BoundRange>,
limit: u32,
backoff: Backoff,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw scan_keys request");
Ok(self
.scan_inner(range, limit, true)
.scan_inner(range, limit, true, backoff)
.await?
.into_iter()
.map(KvPair::into_key)
Expand Down Expand Up @@ -468,9 +537,21 @@ impl<PdC: PdClient> Client<PdC> {
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<KvPair>> {
self.batch_scan_opt(ranges, each_limit, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`batch_scan`](Client::batch_scan) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_scan_opt(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
debug!(self.logger, "invoking raw batch_scan request");
self.batch_scan_inner(ranges, each_limit, false).await
self.batch_scan_inner(ranges, each_limit, false, backoff)
.await
}

/// Create a new 'batch scan' request that only returns the keys.
Expand Down Expand Up @@ -500,10 +581,21 @@ impl<PdC: PdClient> Client<PdC> {
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
) -> Result<Vec<Key>> {
self.batch_scan_keys_opt(ranges, each_limit, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`batch_scan_keys`](Client::batch_scan_keys) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn batch_scan_keys_opt(
&self,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
backoff: Backoff,
) -> Result<Vec<Key>> {
debug!(self.logger, "invoking raw batch_scan_keys request");
Ok(self
.batch_scan_inner(ranges, each_limit, true)
.batch_scan_inner(ranges, each_limit, true, backoff)
.await?
.into_iter()
.map(KvPair::into_key)
Expand All @@ -527,6 +619,18 @@ impl<PdC: PdClient> Client<PdC> {
key: impl Into<Key>,
previous_value: impl Into<Option<Value>>,
new_value: impl Into<Value>,
) -> Result<(Option<Value>, bool)> {
self.compare_and_swap_opt(key, previous_value, new_value, DEFAULT_REGION_BACKOFF)
.await
}

/// Same as [`compare_and_swap`](Client::compare_and_swap) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn compare_and_swap_opt(
&self,
key: impl Into<Key>,
previous_value: impl Into<Option<Value>>,
new_value: impl Into<Value>,
backoff: Backoff,
) -> Result<(Option<Value>, bool)> {
debug!(self.logger, "invoking raw compare_and_swap request");
self.assert_atomic()?;
Expand All @@ -537,7 +641,7 @@ impl<PdC: PdClient> Client<PdC> {
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(CollectSingle)
.post_process_default()
.plan();
Expand All @@ -550,6 +654,25 @@ impl<PdC: PdClient> Client<PdC> {
copr_version_req: impl Into<String>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
self.coprocessor_opt(
copr_name,
copr_version_req,
ranges,
request_builder,
DEFAULT_REGION_BACKOFF,
)
.await
}

/// Same as [`coprocessor`](Client::coprocessor) but with custom [`backoff`](crate::Backoff) strategy.
pub async fn coprocessor_opt(
&self,
copr_name: impl Into<String>,
copr_version_req: impl Into<String>,
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
request_builder: impl Fn(metapb::Region, Vec<Range<Key>>) -> Vec<u8> + Send + Sync + 'static,
backoff: Backoff,
) -> Result<Vec<(Vec<u8>, Vec<Range<Key>>)>> {
let copr_version_req = copr_version_req.into();
semver::VersionReq::from_str(&copr_version_req)?;
Expand All @@ -561,7 +684,7 @@ impl<PdC: PdClient> Client<PdC> {
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req)
.preserve_shard()
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.post_process_default()
.plan();
plan.execute().await
Expand All @@ -572,6 +695,7 @@ impl<PdC: PdClient> Client<PdC> {
range: impl Into<BoundRange>,
limit: u32,
key_only: bool,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
if limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::MaxScanLimitExceeded {
Expand All @@ -582,7 +706,7 @@ impl<PdC: PdClient> Client<PdC> {

let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone());
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(Collect)
.plan();
let res = plan.execute().await;
Expand All @@ -597,6 +721,7 @@ impl<PdC: PdClient> Client<PdC> {
ranges: impl IntoIterator<Item = impl Into<BoundRange>>,
each_limit: u32,
key_only: bool,
backoff: Backoff,
) -> Result<Vec<KvPair>> {
if each_limit > MAX_RAW_KV_SCAN_LIMIT {
return Err(Error::MaxScanLimitExceeded {
Expand All @@ -612,7 +737,7 @@ impl<PdC: PdClient> Client<PdC> {
self.cf.clone(),
);
let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request)
.retry_multi_region(DEFAULT_REGION_BACKOFF)
.retry_multi_region(backoff)
.merge(Collect)
.plan();
plan.execute().await
Expand Down

0 comments on commit e9d0dcd

Please sign in to comment.