Skip to content

Commit

Permalink
trigger actions
Browse files Browse the repository at this point in the history
Signed-off-by: pingyu <yuping@pingcap.com>
  • Loading branch information
pingyu committed Feb 17, 2022
1 parent ee61fcb commit e9e556c
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,4 @@ jobs:
- name: start tiup playground
run: /home/runner/.tiup/bin/tiup playground nightly --mode tikv-slim --kv 3 --without-monitor --kv.config /home/runner/work/client-rust/client-rust/config/tikv.toml --pd.config /home/runner/work/client-rust/client-rust/config/pd.toml &
- name: integration test
run: make integration-test
run: MULTI_REGION=1 make integration-test
30 changes: 20 additions & 10 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ pub struct RetryableMultiRegion<P: Plan, PdC: PdClient> {
pub(super) inner: P,
pub pd_client: Arc<PdC>,
pub backoff: Backoff,
pub preserve_results: bool,

/// Preserve all regions' results for other downstream plans to handle.
/// If true, return Ok and preserve all regions' results, even if some of them are Err.
/// Otherwise, return the first Err if there is any.
pub preserve_region_results: bool,
}

impl<P: Plan + Shardable, PdC: PdClient> RetryableMultiRegion<P, PdC>
Expand All @@ -78,7 +82,7 @@ where
current_plan: P,
backoff: Backoff,
permits: Arc<Semaphore>,
preserve_results: bool,
preserve_region_results: bool,
) -> Result<<Self as Plan>::Result> {
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
let mut handles = Vec::new();
Expand All @@ -92,13 +96,13 @@ where
region_store,
backoff.clone(),
permits.clone(),
preserve_results,
preserve_region_results,
));
handles.push(handle);
}

let results = try_join_all(handles).await?;
if preserve_results {
if preserve_region_results {
Ok(results
.into_iter()
.flat_map_ok(|x| x)
Expand All @@ -124,7 +128,7 @@ where
region_store: RegionStore,
mut backoff: Backoff,
permits: Arc<Semaphore>,
preserve_results: bool,
preserve_region_results: bool,
) -> Result<<Self as Plan>::Result> {
// limit concurrent requests
let permit = permits.acquire().await.unwrap();
Expand All @@ -142,8 +146,14 @@ where
if !region_error_resolved {
futures_timer::Delay::new(duration).await;
}
Self::single_plan_handler(pd_client, plan, backoff, permits, preserve_results)
.await
Self::single_plan_handler(
pd_client,
plan,
backoff,
permits,
preserve_region_results,
)
.await
}
None => Err(Error::RegionError(e)),
}
Expand Down Expand Up @@ -260,7 +270,7 @@ impl<P: Plan, PdC: PdClient> Clone for RetryableMultiRegion<P, PdC> {
inner: self.inner.clone(),
pd_client: self.pd_client.clone(),
backoff: self.backoff.clone(),
preserve_results: self.preserve_results,
preserve_region_results: self.preserve_region_results,
}
}
}
Expand All @@ -282,7 +292,7 @@ where
self.inner.clone(),
self.backoff.clone(),
concurrency_permits.clone(),
self.preserve_results,
self.preserve_region_results,
)
.await
}
Expand Down Expand Up @@ -576,7 +586,7 @@ mod test {
},
pd_client: Arc::new(MockPdClient::default()),
backoff: Backoff::no_backoff(),
preserve_results: false,
preserve_region_results: false,
};
assert!(plan.execute().await.is_err())
}
Expand Down
4 changes: 2 additions & 2 deletions src/request/plan_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,15 +129,15 @@ where
fn make_retry_multi_region(
self,
backoff: Backoff,
preserve_results: bool,
preserve_region_results: bool,
) -> PlanBuilder<PdC, RetryableMultiRegion<P, PdC>, Targetted> {
PlanBuilder {
pd_client: self.pd_client.clone(),
plan: RetryableMultiRegion {
inner: self.plan,
pd_client: self.pd_client,
backoff,
preserve_results,
preserve_region_results,
},
phantom: PhantomData,
}
Expand Down
41 changes: 41 additions & 0 deletions src/transaction/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,19 @@ impl Buffer {
}
}

/// Unlock the given key if locked.
pub fn unlock(&mut self, key: &Key) {
if let Some(value) = self.entry_map.get_mut(key) {
if let BufferEntry::Locked(v) = value {
if let Some(v) = v {
*value = BufferEntry::Cached(v.take());
} else {
self.entry_map.remove(key);
}
}
}
}

/// Put a value into the buffer (does not write through).
pub fn put(&mut self, key: Key, value: Value) {
let mut entry = self.entry_map.entry(key.clone());
Expand Down Expand Up @@ -485,6 +498,12 @@ mod tests {
};
}

macro_rules! assert_entry_none {
($key: ident) => {
assert!(matches!(buffer.entry_map.get(&$key), None,))
};
}

// Insert + Delete = CheckNotExists
let key: Key = b"key1".to_vec().into();
buffer.insert(key.clone(), b"value1".to_vec());
Expand All @@ -510,5 +529,27 @@ mod tests {
buffer.delete(key.clone());
buffer.insert(key.clone(), b"value1".to_vec());
assert_entry!(key, BufferEntry::Put(_));

// Lock + Unlock = None
let key: Key = b"key4".to_vec().into();
buffer.lock(key.clone());
buffer.unlock(&key);
assert_entry_none!(key);

// Cached + Lock + Unlock = Cached
let key: Key = b"key5".to_vec().into();
let val: Value = b"value5".to_vec();
let val_ = val.clone();
let r = block_on(buffer.get_or_else(key.clone(), move |_| ready(Ok(Some(val_)))));
assert_eq!(r.unwrap().unwrap(), val);
buffer.lock(key.clone());
buffer.unlock(&key);
assert_entry!(key, BufferEntry::Cached(Some(_)));
assert_eq!(
block_on(buffer.get_or_else(key, move |_| ready(Err(internal_err!("")))))
.unwrap()
.unwrap(),
val
);
}
}
10 changes: 5 additions & 5 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,10 @@ impl Merge<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Muta
Result<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Mutation>>>,
>,
) -> Result<Self::Out> {
let (success, mut errors): (Vec<_>, Vec<_>) = input.into_iter().partition(Result::is_ok);
if let Some(first_err) = errors.pop() {
if input.iter().any(Result::is_err) {
let (success, mut errors): (Vec<_>, Vec<_>) =
input.into_iter().partition(Result::is_ok);
let first_err = errors.pop().unwrap();
let success_keys = success
.into_iter()
.map(Result::unwrap)
Expand All @@ -394,7 +396,7 @@ impl Merge<ResponseWithShard<kvrpcpb::PessimisticLockResponse, Vec<kvrpcpb::Muta
success_keys,
})
} else {
Ok(success
Ok(input
.into_iter()
.map(Result::unwrap)
.flat_map(|ResponseWithShard(mut resp, mutations)| {
Expand Down Expand Up @@ -739,7 +741,6 @@ mod tests {
Ok(resp_not_found.clone()),
];
let result = merger.merge(input);
println!("result: {:?}", result);

assert_eq!(
result.unwrap(),
Expand All @@ -757,7 +758,6 @@ mod tests {
Ok(resp_not_found),
];
let result = merger.merge(input);
println!("result: {:?}", result);

if let PessimisticLockError {
inner,
Expand Down
18 changes: 16 additions & 2 deletions src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -784,19 +784,33 @@ impl<PdC: PdClient> Transaction<PdC> {

/// Rollback pessimistic lock
async fn pessimistic_lock_rollback(
&self,
&mut self,
keys: impl Iterator<Item = Key>,
start_version: Timestamp,
for_update_ts: Timestamp,
) -> Result<()> {
debug!(self.logger, "rollback pessimistic lock");
let req = new_pessimistic_rollback_request(keys, start_version, for_update_ts);

let keys: Vec<_> = keys.into_iter().collect();
if keys.is_empty() {
return Ok(());
}

let req = new_pessimistic_rollback_request(
keys.clone().into_iter(),
start_version,
for_update_ts,
);
let plan = PlanBuilder::new(self.rpc.clone(), req)
.resolve_lock(self.options.retry_options.lock_backoff.clone())
.retry_multi_region(self.options.retry_options.region_backoff.clone())
.extract_error()
.plan();
plan.execute().await?;

for key in keys {
self.buffer.unlock(&key);
}
Ok(())
}

Expand Down
15 changes: 10 additions & 5 deletions tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -679,12 +679,17 @@ async fn txn_lock_keys() -> Result<()> {
async fn txn_lock_keys_error_handle() -> Result<()> {
init().await?;
let client = TransactionClient::new_with_config(pd_addrs(), Default::default(), None).await?;
let mut rng = thread_rng();

let k = gen_u32_keys(4, &mut rng)
.iter()
.cloned()
.collect::<Vec<_>>();
// Keys in `k` should locate in different regions. See `init()` for boundary of regions.
let k: Vec<Key> = vec![
0x00000000_u32,
0x40000000_u32,
0x80000000_u32,
0xC0000000_u32,
]
.into_iter()
.map(|x| x.to_be_bytes().to_vec().into())
.collect();

let mut t1 = client.begin_pessimistic().await?;
let mut t2 = client.begin_pessimistic().await?;
Expand Down

0 comments on commit e9e556c

Please sign in to comment.