Skip to content

Commit

Permalink
txn: check data constraint after lock
Browse files Browse the repository at this point in the history
Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
  • Loading branch information
youjiali1995 committed Dec 8, 2020
1 parent 4482d82 commit c08c7ca
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 12 deletions.
5 changes: 2 additions & 3 deletions src/storage/kv/rocksdb_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,13 +269,13 @@ impl Snapshot for RocksSnapshot {

fn get(&self, key: &Key) -> Result<Option<Value>> {
trace!("RocksSnapshot: get"; "key" => %key);
let v = box_try!(self.get_value(key.as_encoded()));
let v = self.get_value(key.as_encoded())?;
Ok(v.map(|v| v.to_vec()))
}

fn get_cf(&self, cf: CfName, key: &Key) -> Result<Option<Value>> {
trace!("RocksSnapshot: get_cf"; "cf" => cf, "key" => %key);
let v = box_try!(self.get_value_cf(cf, key.as_encoded()));
let v = self.get_value_cf(cf, key.as_encoded())?;
Ok(v.map(|v| v.to_vec()))
}

Expand Down Expand Up @@ -439,5 +439,4 @@ mod tests {
iter.prev(&mut statistics).unwrap();
assert_eq!(perf_statistics.delta().internal_delete_skipped_count, 3);
}

}
57 changes: 48 additions & 9 deletions src/storage/mvcc/txn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,6 +447,8 @@ impl<S: Snapshot> MvccTxn<S> {
};

{
// Check write conflict first to make the transaction fail or retry fast.
let mut prev_write: Option<(u64, Write)> = None;
if !options.skip_constraint_check {
if let Some((commit_ts, write)) = self.reader.seek_write(&key, u64::max_value())? {
// Abort on writes after our start timestamp ...
Expand All @@ -463,15 +465,11 @@ impl<S: Snapshot> MvccTxn<S> {
primary: primary.to_vec(),
});
}
self.check_data_constraint(should_not_exist, &write, commit_ts, &key)?;
prev_write = Some((commit_ts, write));
}
}

if should_not_write {
return Ok(());
}

// ... or locks at any timestamp.
// Check whether the current key is locked at any timestamp.
if let Some(lock) = self.reader.load_lock(&key)? {
if lock.ts != self.start_ts {
return Err(Error::KeyIsLocked(lock.into_lock_info(key.into_raw()?)));
Expand All @@ -488,6 +486,18 @@ impl<S: Snapshot> MvccTxn<S> {
MVCC_DUPLICATE_CMD_COUNTER_VEC.prewrite.inc();
return Ok(());
}

if !options.skip_constraint_check {
// Should check it when no lock exists, otherwise it can report error when there is
// a lock belonging to a committed transaction which deletes the key.
if let Some((commit_ts, write)) = prev_write {
self.check_data_constraint(should_not_exist, &write, commit_ts, &key)?;
}
}

if should_not_write {
return Ok(());
}
}

self.prewrite_key_value(
Expand Down Expand Up @@ -768,7 +778,7 @@ mod tests {
use crate::storage::kv::Engine;
use crate::storage::mvcc::tests::*;
use crate::storage::mvcc::WriteType;
use crate::storage::mvcc::{Error, MvccReader, MvccTxn};
use crate::storage::mvcc::{Error, MvccReader, MvccTxn, Result};
use crate::storage::{
Key, Mutation, Options, ScanMode, TestEngineBuilder, SHORT_VALUE_MAX_LEN,
};
Expand Down Expand Up @@ -882,11 +892,37 @@ mod tests {
must_prewrite_put(&engine, k1, v1, k1, 1);
must_commit(&engine, k1, 1, 2);

fn expect_error<T, F>(x: Result<T>, err_matcher: F)
where
F: FnOnce(Error) + Send + 'static,
{
match x {
Err(e) => err_matcher(e),
_ => panic!("expect result to be an error"),
}
}

// "k1" already exist, returns AlreadyExist error.
assert!(try_prewrite_insert(&engine, k1, v2, k1, 3).is_err());
expect_error(try_prewrite_insert(&engine, k1, v2, k1, 3), |e| match e {
Error::AlreadyExist { .. } => (),
_ => panic!("unexpected error: {:?}", e),
});

// Delete "k1"
must_prewrite_delete(&engine, k1, k1, 4);

// There is a lock, returns KeyIsLocked error.
expect_error(try_prewrite_insert(&engine, k1, v2, k1, 6), |e| match e {
Error::KeyIsLocked(_) => (),
_ => panic!("unexpected error: {:?}", e),
});

// Check write conflict before lock.
expect_error(try_prewrite_insert(&engine, k1, v2, k1, 1), |e| match e {
Error::WriteConflict { .. } => (),
_ => panic!("unexpected error: {:?}", e),
});

must_commit(&engine, k1, 4, 5);

// After delete "k1", insert returns ok.
Expand All @@ -897,7 +933,10 @@ mod tests {
must_prewrite_put(&engine, k1, v3, k1, 8);
must_rollback(&engine, k1, 8);

assert!(try_prewrite_insert(&engine, k1, v3, k1, 9).is_err());
expect_error(try_prewrite_insert(&engine, k1, v3, k1, 9), |e| match e {
Error::AlreadyExist { .. } => (),
_ => panic!("unexpected error: {:?}", e),
});

// Delete "k1" again
must_prewrite_delete(&engine, k1, k1, 10);
Expand Down

0 comments on commit c08c7ca

Please sign in to comment.