diff --git a/src/raw/client.rs b/src/raw/client.rs index 6f140ad7..da67c56a 100644 --- a/src/raw/client.rs +++ b/src/raw/client.rs @@ -188,10 +188,15 @@ impl Client { /// # }); /// ``` pub async fn get(&self, key: impl Into) -> Result> { + self.get_opt(key, DEFAULT_REGION_BACKOFF).await + } + + /// Same as [`get`](Client::get) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn get_opt(&self, key: impl Into, backoff: Backoff) -> Result> { debug!(self.logger, "invoking raw get request"); let request = new_raw_get_request(key.into(), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .merge(CollectSingle) .post_process_default() .plan(); @@ -219,11 +224,20 @@ impl Client { pub async fn batch_get( &self, keys: impl IntoIterator>, + ) -> Result> { + self.batch_get_opt(keys, DEFAULT_REGION_BACKOFF).await + } + + /// Same as [`batch_get`](Client::batch_get) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn batch_get_opt( + &self, + keys: impl IntoIterator>, + backoff: Backoff, ) -> Result> { debug!(self.logger, "invoking raw batch_get request"); let request = new_raw_batch_get_request(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .merge(Collect) .plan(); plan.execute() @@ -248,10 +262,20 @@ impl Client { /// # }); /// ``` pub async fn put(&self, key: impl Into, value: impl Into) -> Result<()> { + self.put_opt(key, value, DEFAULT_REGION_BACKOFF).await + } + + /// Same as [`put`](Client::put) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn put_opt( + &self, + key: impl Into, + value: impl Into, + backoff: Backoff, + ) -> Result<()> { debug!(self.logger, "invoking raw put request"); let request = new_raw_put_request(key.into(), value.into(), self.cf.clone(), self.atomic); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .merge(CollectSingle) .extract_error() .plan(); @@ -279,6 +303,15 @@ impl Client { pub async fn batch_put( &self, pairs: impl IntoIterator>, + ) -> Result<()> { + self.batch_put_opt(pairs, DEFAULT_REGION_BACKOFF).await + } + + /// Same as [`batch_put`](Client::batch_put) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn batch_put_opt( + &self, + pairs: impl IntoIterator>, + backoff: Backoff, ) -> Result<()> { debug!(self.logger, "invoking raw batch_put request"); let request = new_raw_batch_put_request( @@ -287,7 +320,7 @@ impl Client { self.atomic, ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .extract_error() .plan(); plan.execute().await?; @@ -312,10 +345,15 @@ impl Client { /// # }); /// ``` pub async fn delete(&self, key: impl Into) -> Result<()> { + self.delete_opt(key, DEFAULT_REGION_BACKOFF).await + } + + /// Same as [`delete`](Client::delete) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn delete_opt(&self, key: impl Into, backoff: Backoff) -> Result<()> { debug!(self.logger, "invoking raw delete request"); let request = new_raw_delete_request(key.into(), self.cf.clone(), self.atomic); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .merge(CollectSingle) .extract_error() .plan(); @@ -341,12 +379,21 @@ impl Client { /// # }); /// ``` pub async fn batch_delete(&self, keys: impl IntoIterator>) -> Result<()> { + self.batch_delete_opt(keys, DEFAULT_REGION_BACKOFF).await + } + + /// Same as [`batch_delete`](Client::batch_delete) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn batch_delete_opt( + &self, + keys: impl IntoIterator>, + backoff: Backoff, + ) -> Result<()> { debug!(self.logger, "invoking raw batch_delete request"); self.assert_non_atomic()?; let request = new_raw_batch_delete_request(keys.into_iter().map(Into::into), self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .extract_error() .plan(); plan.execute().await?; @@ -372,6 +419,7 @@ impl Client { self.delete_range_opt(range, DEFAULT_REGION_BACKOFF).await } + /// Same as [`delete_range`](Client::delete_range) but with custom [`backoff`](crate::Backoff) strategy. pub async fn delete_range_opt( &self, range: impl Into, @@ -408,8 +456,18 @@ impl Client { /// # }); /// ``` pub async fn scan(&self, range: impl Into, limit: u32) -> Result> { + self.scan_opt(range, limit, DEFAULT_REGION_BACKOFF).await + } + + /// Same as [`scan`](Client::scan) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn scan_opt( + &self, + range: impl Into, + limit: u32, + backoff: Backoff, + ) -> Result> { debug!(self.logger, "invoking raw scan request"); - self.scan_inner(range.into(), limit, false).await + self.scan_inner(range.into(), limit, false, backoff).await } /// Create a new 'scan' request that only returns the keys. @@ -432,9 +490,20 @@ impl Client { /// # }); /// ``` pub async fn scan_keys(&self, range: impl Into, limit: u32) -> Result> { + self.scan_keys_opt(range, limit, DEFAULT_REGION_BACKOFF) + .await + } + + /// Same as [`scan_keys`](Client::scan_keys) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn scan_keys_opt( + &self, + range: impl Into, + limit: u32, + backoff: Backoff, + ) -> Result> { debug!(self.logger, "invoking raw scan_keys request"); Ok(self - .scan_inner(range, limit, true) + .scan_inner(range, limit, true, backoff) .await? .into_iter() .map(KvPair::into_key) @@ -468,9 +537,21 @@ impl Client { &self, ranges: impl IntoIterator>, each_limit: u32, + ) -> Result> { + self.batch_scan_opt(ranges, each_limit, DEFAULT_REGION_BACKOFF) + .await + } + + /// Same as [`batch_scan`](Client::batch_scan) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn batch_scan_opt( + &self, + ranges: impl IntoIterator>, + each_limit: u32, + backoff: Backoff, ) -> Result> { debug!(self.logger, "invoking raw batch_scan request"); - self.batch_scan_inner(ranges, each_limit, false).await + self.batch_scan_inner(ranges, each_limit, false, backoff) + .await } /// Create a new 'batch scan' request that only returns the keys. @@ -500,10 +581,21 @@ impl Client { &self, ranges: impl IntoIterator>, each_limit: u32, + ) -> Result> { + self.batch_scan_keys_opt(ranges, each_limit, DEFAULT_REGION_BACKOFF) + .await + } + + /// Same as [`batch_scan_keys`](Client::batch_scan_keys) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn batch_scan_keys_opt( + &self, + ranges: impl IntoIterator>, + each_limit: u32, + backoff: Backoff, ) -> Result> { debug!(self.logger, "invoking raw batch_scan_keys request"); Ok(self - .batch_scan_inner(ranges, each_limit, true) + .batch_scan_inner(ranges, each_limit, true, backoff) .await? .into_iter() .map(KvPair::into_key) @@ -527,6 +619,18 @@ impl Client { key: impl Into, previous_value: impl Into>, new_value: impl Into, + ) -> Result<(Option, bool)> { + self.compare_and_swap_opt(key, previous_value, new_value, DEFAULT_REGION_BACKOFF) + .await + } + + /// Same as [`compare_and_swap`](Client::compare_and_swap) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn compare_and_swap_opt( + &self, + key: impl Into, + previous_value: impl Into>, + new_value: impl Into, + backoff: Backoff, ) -> Result<(Option, bool)> { debug!(self.logger, "invoking raw compare_and_swap request"); self.assert_atomic()?; @@ -537,7 +641,7 @@ impl Client { self.cf.clone(), ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .merge(CollectSingle) .post_process_default() .plan(); @@ -550,6 +654,25 @@ impl Client { copr_version_req: impl Into, ranges: impl IntoIterator>, request_builder: impl Fn(metapb::Region, Vec>) -> Vec + Send + Sync + 'static, + ) -> Result, Vec>)>> { + self.coprocessor_opt( + copr_name, + copr_version_req, + ranges, + request_builder, + DEFAULT_REGION_BACKOFF, + ) + .await + } + + /// Same as [`coprocessor`](Client::coprocessor) but with custom [`backoff`](crate::Backoff) strategy. + pub async fn coprocessor_opt( + &self, + copr_name: impl Into, + copr_version_req: impl Into, + ranges: impl IntoIterator>, + request_builder: impl Fn(metapb::Region, Vec>) -> Vec + Send + Sync + 'static, + backoff: Backoff, ) -> Result, Vec>)>> { let copr_version_req = copr_version_req.into(); semver::VersionReq::from_str(&copr_version_req)?; @@ -561,7 +684,7 @@ impl Client { ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), req) .preserve_shard() - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .post_process_default() .plan(); plan.execute().await @@ -572,6 +695,7 @@ impl Client { range: impl Into, limit: u32, key_only: bool, + backoff: Backoff, ) -> Result> { if limit > MAX_RAW_KV_SCAN_LIMIT { return Err(Error::MaxScanLimitExceeded { @@ -582,7 +706,7 @@ impl Client { let request = new_raw_scan_request(range.into(), limit, key_only, self.cf.clone()); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .merge(Collect) .plan(); let res = plan.execute().await; @@ -597,6 +721,7 @@ impl Client { ranges: impl IntoIterator>, each_limit: u32, key_only: bool, + backoff: Backoff, ) -> Result> { if each_limit > MAX_RAW_KV_SCAN_LIMIT { return Err(Error::MaxScanLimitExceeded { @@ -612,7 +737,7 @@ impl Client { self.cf.clone(), ); let plan = crate::request::PlanBuilder::new(self.rpc.clone(), request) - .retry_multi_region(DEFAULT_REGION_BACKOFF) + .retry_multi_region(backoff) .merge(Collect) .plan(); plan.execute().await