diff --git a/src/yb/docdb/conflict_resolution.cc b/src/yb/docdb/conflict_resolution.cc index 5c9b76233104..2e57bff631cf 100644 --- a/src/yb/docdb/conflict_resolution.cc +++ b/src/yb/docdb/conflict_resolution.cc @@ -1000,7 +1000,7 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase { /* intent_value */ Slice(), [&intent_processor, intent_types]( auto ancestor_doc_key, dockv::FullDocKey full_doc_key, auto, auto intent_key, - auto) { + auto, auto) { intent_processor.Process(ancestor_doc_key, full_doc_key, intent_key, intent_types); return Status::OK(); }, @@ -1010,13 +1010,16 @@ class TransactionConflictResolverContext : public ConflictResolverContextBase { } const auto& pairs = write_batch_.read_pairs(); - intent_types = dockv::GetIntentTypesForRead(metadata_.isolation, row_mark); if (!pairs.empty()) { RETURN_NOT_OK(EnumerateIntents( pairs, - [&intent_processor, intent_types] ( - auto ancestor_doc_key, dockv::FullDocKey full_doc_key, auto, auto intent_key, auto) { - intent_processor.Process(ancestor_doc_key, full_doc_key, intent_key, intent_types); + [&intent_processor, + intent_types = dockv::GetIntentTypesForRead(metadata_.isolation, row_mark)] ( + auto ancestor_doc_key, auto full_doc_key, auto, auto* intent_key, auto, + auto is_row_lock) { + intent_processor.Process( + ancestor_doc_key, full_doc_key, intent_key, + GetIntentTypes(intent_types, is_row_lock)); return Status::OK(); }, resolver->partial_range_key_intents())); @@ -1213,7 +1216,7 @@ class OperationConflictResolverContext : public ConflictResolverContextBase { /* intent_value */ Slice(), [&intent_processor, intent_types]( auto ancestor_doc_key, dockv::FullDocKey full_doc_key, auto, auto intent_key, - auto) { + auto, auto) { intent_processor.Process(ancestor_doc_key, full_doc_key, intent_key, intent_types); return Status::OK(); }, diff --git a/src/yb/docdb/docdb.cc b/src/yb/docdb/docdb.cc index 42a285e8e7bf..cf2e4e691fd8 100644 --- a/src/yb/docdb/docdb.cc +++ b/src/yb/docdb/docdb.cc @@ -169,15 +169,14 @@ Result DetermineKeysToLock( } if (!read_pairs.empty()) { - const auto read_intent_types = dockv::GetIntentTypesForRead(isolation_level, row_mark_type); RETURN_NOT_OK(EnumerateIntents( read_pairs, - [&result, &read_intent_types]( - dockv::AncestorDocKey ancestor_doc_key, dockv::FullDocKey, Slice, KeyBytes* key, - dockv::LastKey) { + [&result, intent_types = dockv:: GetIntentTypesForRead(isolation_level, row_mark_type)]( + auto ancestor_doc_key, auto, auto, auto* key, auto, auto is_row_lock) { + auto actual_intents = GetIntentTypes(intent_types, is_row_lock); return ApplyIntent( RefCntPrefix(key->AsSlice()), - ancestor_doc_key ? MakeWeak(read_intent_types) : read_intent_types, + ancestor_doc_key ? MakeWeak(actual_intents) : actual_intents, &result.lock_batch); }, partial_range_key_intents)); } diff --git a/src/yb/docdb/pgsql_operation.cc b/src/yb/docdb/pgsql_operation.cc index a82fe5fa1147..94e483da98a4 100644 --- a/src/yb/docdb/pgsql_operation.cc +++ b/src/yb/docdb/pgsql_operation.cc @@ -178,20 +178,68 @@ class DocKeyColumnPathBuilderHolder { YB_STRONGLY_TYPED_BOOL(KeyOnlyRequested); +YB_DEFINE_ENUM(IntentFeature, (kRegularRead)(kKeyOnlyRead)(kRowLock)); + +// Helper class to describe what kind of intents must be created on particular doc key: +// - regular_read - intent for regular read is required +// - key_only_read - intent for key column only read is required +// - row_lock - intent for row lock is required. +// Note: regular_read and key_only_read can't be both equaled true. +class IntentMode { + public: + IntentMode(IsolationLevel level, RowMarkType row_mark, KeyOnlyRequested key_only_requested) { + auto intent_types = dockv::GetIntentTypesForRead(level, row_mark); + const auto& read_intents = intent_types.read; + auto& lock_intents = intent_types.row_mark; + if (!read_intents.None()) { + auto all_read_intents = MakeWeak(read_intents); + if (key_only_requested) { + features_.Set(IntentFeature::kKeyOnlyRead); + } else { + all_read_intents |= read_intents; + features_.Set(IntentFeature::kRegularRead); + } + lock_intents &= ~(all_read_intents); + } + if (!lock_intents.None()) { + features_.Set(IntentFeature::kRowLock); + } + } + + [[nodiscard]] bool regular_read() const { + return features_.Test(IntentFeature::kRegularRead); + } + + [[nodiscard]] bool key_only_read() const { + return features_.Test(IntentFeature::kKeyOnlyRead); + } + + [[nodiscard]] bool row_lock() const { + return features_.Test(IntentFeature::kRowLock); + } + + private: + EnumBitSet features_; +}; + class IntentInserter { public: explicit IntentInserter(LWKeyValueWriteBatchPB* out) : out_(*out) {} - void Add(Slice encoded_key, KeyOnlyRequested key_only_requested) { - if (!key_only_requested) { + void Add(Slice encoded_key, const IntentMode& mode) { + if (mode.regular_read()) { Add(encoded_key); - return; + } else if (mode.key_only_read()) { + auto& buf = buffer(); + buf.Clear(); + DocKeyColumnPathBuilder doc_key_builder(&buf, encoded_key); + Add(doc_key_builder.Build(dockv::SystemColumnIds::kLivenessColumn)); + } + + if (mode.row_lock()) { + Add(encoded_key, /*row_lock=*/ true); } - auto& buf = buffer(); - buf.Clear(); - DocKeyColumnPathBuilder doc_key_builder(&buf, encoded_key); - Add(doc_key_builder.Build(dockv::SystemColumnIds::kLivenessColumn)); } private: @@ -202,10 +250,13 @@ class IntentInserter { return *buffer_; } - void Add(Slice encoded_key) { + void Add(Slice encoded_key, bool is_row_lock = false) { auto& pair = *out_.add_read_pairs(); pair.dup_key(encoded_key); - pair.dup_value(Slice(&dockv::ValueEntryTypeAsChar::kNullLow, 1)); + pair.dup_value(Slice( + is_row_lock ? &dockv::ValueEntryTypeAsChar::kRowLock + : &dockv::ValueEntryTypeAsChar::kNullLow, + 1)); } std::optional buffer_; @@ -1990,10 +2041,10 @@ Status PgsqlReadOperation::PopulateAggregate(WriteBuffer *result_buffer) { return Status::OK(); } -Status PgsqlReadOperation::GetIntents(const Schema& schema, LWKeyValueWriteBatchPB* out) { - const auto has_row_mark = IsValidRowMarkType( - request_.has_row_mark_type() ? request_.row_mark_type() : ROW_MARK_ABSENT); - if (has_row_mark) { +Status PgsqlReadOperation::GetIntents( + const Schema& schema, IsolationLevel level, LWKeyValueWriteBatchPB* out) { + const auto row_mark = request_.has_row_mark_type() ? request_.row_mark_type() : ROW_MARK_ABSENT; + if (IsValidRowMarkType(row_mark)) { RSTATUS_DCHECK(request_.has_wait_policy(), IllegalState, "wait policy is expected"); out->set_wait_policy(request_.wait_policy()); } @@ -2003,21 +2054,19 @@ Status PgsqlReadOperation::GetIntents(const Schema& schema, LWKeyValueWriteBatch DocKeyAccessor accessor(schema); if (!(has_batch_arguments || request_.has_ybctid_column_value())) { - inserter.Add(VERIFY_RESULT(accessor.GetEncoded(request_)), KeyOnlyRequested::kFalse); + inserter.Add( + VERIFY_RESULT(accessor.GetEncoded(request_)), {level, row_mark, KeyOnlyRequested::kFalse}); return Status::OK(); } - // TODO(dmitry): the '!has_row_mark' requirement will be removed in context of fix for #16212. - const KeyOnlyRequested key_only_requested{ - !has_row_mark && IsOnlyKeyColumnsRequested(schema, request_)}; + const IntentMode mode{ + level, row_mark, KeyOnlyRequested(IsOnlyKeyColumnsRequested(schema, request_))}; for (const auto& batch_argument : request_.batch_arguments()) { - inserter.Add( - VERIFY_RESULT(accessor.GetEncoded(batch_argument.ybctid())), key_only_requested); + inserter.Add(VERIFY_RESULT(accessor.GetEncoded(batch_argument.ybctid())), mode); } if (!has_batch_arguments) { DCHECK(request_.has_ybctid_column_value()); - inserter.Add( - VERIFY_RESULT(accessor.GetEncoded(request_.ybctid_column_value())), key_only_requested); + inserter.Add(VERIFY_RESULT(accessor.GetEncoded(request_.ybctid_column_value())), mode); } return Status::OK(); } diff --git a/src/yb/docdb/pgsql_operation.h b/src/yb/docdb/pgsql_operation.h index 48ee2411f676..71e7c93ee59d 100644 --- a/src/yb/docdb/pgsql_operation.h +++ b/src/yb/docdb/pgsql_operation.h @@ -182,7 +182,7 @@ class PgsqlReadOperation : public DocExprExecutor { Status GetSpecialColumn(ColumnIdRep column_id, QLValuePB* result); - Status GetIntents(const Schema& schema, LWKeyValueWriteBatchPB* out); + Status GetIntents(const Schema& schema, IsolationLevel level, LWKeyValueWriteBatchPB* out); private: // Execute a READ operator for a given scalar argument. diff --git a/src/yb/docdb/rocksdb_writer.cc b/src/yb/docdb/rocksdb_writer.cc index 927d05aa8559..99ddb0076fce 100644 --- a/src/yb/docdb/rocksdb_writer.cc +++ b/src/yb/docdb/rocksdb_writer.cc @@ -241,18 +241,29 @@ Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { LOG(WARNING) << "Performing a write with row lock " << RowMarkType_Name(row_mark_) << " when only reads are expected"; } - intent_types_ = dockv::GetIntentTypesForWrite(isolation_level_); // We cannot recover from failures here, because it means that we cannot apply replicated // operation. RETURN_NOT_OK(EnumerateIntents( - put_batch_.write_pairs(), std::ref(*this), partial_range_key_intents_)); + put_batch_.write_pairs(), + [this, intent_types = dockv::GetIntentTypesForWrite(isolation_level_)]( + auto ancestor_doc_key, auto full_doc_key, auto value, auto* key, auto last_key, auto) { + return (*this)(intent_types, ancestor_doc_key, full_doc_key, value, key, last_key); + }, + partial_range_key_intents_)); } if (!put_batch_.read_pairs().empty()) { - intent_types_ = dockv::GetIntentTypesForRead(isolation_level_, row_mark_); RETURN_NOT_OK(EnumerateIntents( - put_batch_.read_pairs(), std::ref(*this), partial_range_key_intents_)); + put_batch_.read_pairs(), + [this, intent_types = dockv::GetIntentTypesForRead(isolation_level_, row_mark_)]( + auto ancestor_doc_key, auto full_doc_key, + const auto& value, auto* key, auto last_key, auto is_row_lock) { + return (*this)( + dockv::GetIntentTypes(intent_types, is_row_lock), ancestor_doc_key, + full_doc_key, value, key, last_key); + }, + partial_range_key_intents_)); } return Finish(); @@ -260,16 +271,14 @@ Status TransactionalWriter::Apply(rocksdb::DirectWriteHandler* handler) { // Using operator() to pass this object conveniently to EnumerateIntents. Status TransactionalWriter::operator()( - dockv::AncestorDocKey ancestor_doc_key, dockv::FullDocKey, Slice value_slice, - dockv::KeyBytes* key, dockv::LastKey last_key) { + dockv::IntentTypeSet intent_types, dockv::AncestorDocKey ancestor_doc_key, dockv::FullDocKey, + Slice intent_value, dockv::KeyBytes* key, dockv::LastKey last_key) { if (ancestor_doc_key) { - weak_intents_[key->data()] |= MakeWeak(intent_types_); + weak_intents_[key->data()] |= MakeWeak(intent_types); return Status::OK(); } - const auto transaction_value_type = ValueEntryTypeAsChar::kTransactionId; const auto write_id_value_type = ValueEntryTypeAsChar::kWriteId; - const auto row_lock_value_type = ValueEntryTypeAsChar::kRowLock; IntraTxnWriteId big_endian_write_id = BigEndian::FromHost32(intra_txn_write_id_); const auto subtransaction_value_type = KeyEntryTypeAsChar::kSubTransactionId; @@ -291,17 +300,13 @@ Status TransactionalWriter::operator()( subtransaction_id, Slice(&write_id_value_type, 1), Slice::FromPod(&big_endian_write_id), - value_slice, + intent_value, }}; - // Store a row lock indicator rather than data (in value_slice) for row lock intents. - if (IsValidRowMarkType(row_mark_)) { - value.back() = Slice(&row_lock_value_type, 1); - } ++intra_txn_write_id_; char intent_type[2] = { KeyEntryTypeAsChar::kIntentTypeSet, - static_cast(intent_types_.ToUIntPtr()) }; + static_cast(intent_types.ToUIntPtr()) }; DocHybridTimeBuffer doc_ht_buffer; diff --git a/src/yb/docdb/rocksdb_writer.h b/src/yb/docdb/rocksdb_writer.h index 79ce2ee9a9b0..494ec5e9fd23 100644 --- a/src/yb/docdb/rocksdb_writer.h +++ b/src/yb/docdb/rocksdb_writer.h @@ -80,11 +80,12 @@ class TransactionalWriter : public rocksdb::DirectWriter { metadata_to_store_ = value; } + private: Status operator()( - dockv::AncestorDocKey ancestor_doc_key, dockv::FullDocKey full_doc_key, Slice value_slice, - dockv::KeyBytes* key, dockv::LastKey last_key); + dockv::IntentTypeSet intent_types, dockv::AncestorDocKey ancestor_doc_key, + dockv::FullDocKey full_doc_key, Slice value_slice, dockv::KeyBytes* key, + dockv::LastKey last_key); - private: Status Finish(); Status AddWeakIntent( const std::pair& intent_and_types, @@ -107,7 +108,6 @@ class TransactionalWriter : public rocksdb::DirectWriter { rocksdb::DirectWriteHandler* handler_; RowMarkType row_mark_; SubTransactionId subtransaction_id_; - dockv::IntentTypeSet intent_types_; std::unordered_map weak_intents_; }; diff --git a/src/yb/dockv/doc_key-test.cc b/src/yb/dockv/doc_key-test.cc index b21feb53d1bc..f6adfe0f7a44 100644 --- a/src/yb/dockv/doc_key-test.cc +++ b/src/yb/dockv/doc_key-test.cc @@ -418,7 +418,8 @@ class IntentCollector { FullDocKey full_doc_key, Slice value, KeyBytes* key, - LastKey) { + LastKey, + IsRowLock) { out_->push_back(CollectedIntent{ .ancestor_doc_key = ancestor_doc_key, .full_doc_key = full_doc_key, diff --git a/src/yb/dockv/intent.cc b/src/yb/dockv/intent.cc index d84d029d797e..d43d6f07d1fc 100644 --- a/src/yb/dockv/intent.cc +++ b/src/yb/dockv/intent.cc @@ -30,47 +30,15 @@ namespace yb::dockv { namespace { -inline IntentTypeSet GetIntentTypeSet( - IsolationLevel level, RowMarkType row_mark, bool is_write) { - if (IsValidRowMarkType(row_mark)) { - // Mapping of postgres locking levels to DocDB intent types is described in details by the - // following comment https://github.com/yugabyte/yugabyte-db/issues/1199#issuecomment-501041018 - switch (row_mark) { - case RowMarkType::ROW_MARK_EXCLUSIVE: - // FOR UPDATE: strong read + strong write lock on the DocKey, - // as if we're replacing or deleting the entire row in DocDB. - return IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite}); - case RowMarkType::ROW_MARK_NOKEYEXCLUSIVE: - // FOR NO KEY UPDATE: strong read + weak write lock on the DocKey, as if we're reading - // the entire row and then writing only a subset of columns in DocDB. - return IntentTypeSet({IntentType::kStrongRead, IntentType::kWeakWrite}); - case RowMarkType::ROW_MARK_SHARE: - // FOR SHARE: strong read on the DocKey, as if we're reading the entire row in DocDB. - return IntentTypeSet({IntentType::kStrongRead}); - case RowMarkType::ROW_MARK_KEYSHARE: - // FOR KEY SHARE: weak read lock on the DocKey, preventing the entire row from being - // replaced / deleted, as if we're simply reading some of the column. - // This is the type of locking that is used by foreign keys, so this will - // prevent the referenced row from disappearing. The reason it does not - // conflict with the FOR NO KEY UPDATE above is conceptually the following: - // an operation that reads the entire row and then writes a subset of columns - // (FOR NO KEY UPDATE) does not have to conflict with an operation that could - // be reading a different subset of columns (FOR KEY SHARE). - return IntentTypeSet({IntentType::kWeakRead}); - default: - // We shouldn't get here because other row lock types are disabled at the postgres level. - LOG(DFATAL) << "Unsupported row lock of type " << RowMarkType_Name(row_mark); - break; - } - } - +inline IntentTypeSet GetIntentTypes(IsolationLevel level, bool is_for_write = false) { switch (level) { case IsolationLevel::READ_COMMITTED: case IsolationLevel::SNAPSHOT_ISOLATION: - return IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite}); + return is_for_write + ? IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite}) : IntentTypeSet(); case IsolationLevel::SERIALIZABLE_ISOLATION: - return is_write ? IntentTypeSet({IntentType::kStrongWrite}) - : IntentTypeSet({IntentType::kStrongRead}); + return is_for_write + ? IntentTypeSet({IntentType::kStrongWrite}) : IntentTypeSet({IntentType::kStrongRead}); case IsolationLevel::NON_TRANSACTIONAL: LOG(DFATAL) << "GetStrongIntentTypeSet invoked for non transactional isolation"; return IntentTypeSet(); @@ -78,6 +46,47 @@ inline IntentTypeSet GetIntentTypeSet( FATAL_INVALID_ENUM_VALUE(IsolationLevel, level); } +inline IntentTypeSet GetIntentTypes(RowMarkType row_mark) { + if (!IsValidRowMarkType(row_mark)) { + return IntentTypeSet(); + } + + // Mapping of postgres locking levels to DocDB intent types is described in details by the + // following comment https://github.com/yugabyte/yugabyte-db/issues/1199#issuecomment-501041018 + switch (row_mark) { + case RowMarkType::ROW_MARK_EXCLUSIVE: + // FOR UPDATE: strong read + strong write lock on the DocKey, + // as if we're replacing or deleting the entire row in DocDB. + return IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite}); + case RowMarkType::ROW_MARK_NOKEYEXCLUSIVE: + // FOR NO KEY UPDATE: strong read + weak write lock on the DocKey, as if we're reading + // the entire row and then writing only a subset of columns in DocDB. + return IntentTypeSet({IntentType::kStrongRead, IntentType::kWeakWrite}); + case RowMarkType::ROW_MARK_SHARE: + // FOR SHARE: strong read on the DocKey, as if we're reading the entire row in DocDB. + return IntentTypeSet({IntentType::kStrongRead}); + case RowMarkType::ROW_MARK_KEYSHARE: + // FOR KEY SHARE: weak read lock on the DocKey, preventing the entire row from being + // replaced / deleted, as if we're simply reading some of the column. + // This is the type of locking that is used by foreign keys, so this will + // prevent the referenced row from disappearing. The reason it does not + // conflict with the FOR NO KEY UPDATE above is conceptually the following: + // an operation that reads the entire row and then writes a subset of columns + // (FOR NO KEY UPDATE) does not have to conflict with an operation that could + // be reading a different subset of columns (FOR KEY SHARE). + return IntentTypeSet({IntentType::kWeakRead}); + + case ROW_MARK_REFERENCE: [[fallthrough]]; + case ROW_MARK_COPY: [[fallthrough]]; + case ROW_MARK_ABSENT: + // We shouldn't get here because other row lock types are disabled at the postgres level. + LOG(DFATAL) << "Unsupported row lock of type " << RowMarkType_Name(row_mark); + return IntentTypeSet(); + } + + FATAL_INVALID_ENUM_VALUE(RowMarkType, row_mark); +} + } // namespace Status RemoveGroupEndSuffix(RefCntPrefix* key) { @@ -155,20 +164,17 @@ Result DecodeTransactionIdFromIntentValue(Slice* intent_value) { return DecodeTransactionId(intent_value); } -IntentTypeSet AllStrongIntents() { - return IntentTypeSet({IntentType::kStrongRead, IntentType::kStrongWrite}); -} - -IntentTypeSet GetIntentTypesForRead(IsolationLevel level, RowMarkType row_mark) { - return GetIntentTypeSet(level, row_mark, /*is_write=*/ false); +ReadIntentTypeSets GetIntentTypesForRead(IsolationLevel level, RowMarkType row_mark) { + return {.read = GetIntentTypes(level), .row_mark = GetIntentTypes(row_mark)}; } IntentTypeSet GetIntentTypesForWrite(IsolationLevel level) { - return GetIntentTypeSet(level, RowMarkType::ROW_MARK_ABSENT, /*is_write=*/ true); + return GetIntentTypes(level, /*is_for_write=*/true); } bool HasStrong(IntentTypeSet inp) { - return (inp & AllStrongIntents()).Any(); + static const IntentTypeSet all_strong_intents{IntentType::kStrongRead, IntentType::kStrongWrite}; + return (inp & all_strong_intents).Any(); } #define INTENT_VALUE_SCHECK(lhs, op, rhs, msg) \ @@ -417,23 +423,27 @@ Status EnumerateWeakIntents( } // namespace Status EnumerateIntents( - Slice key, const Slice& intent_value, const EnumerateIntentsCallback& functor, + Slice key, Slice intent_value, const EnumerateIntentsCallback& functor, KeyBytes* encoded_key_buffer, PartialRangeKeyIntents partial_range_key_intents, LastKey last_key) { + static const Slice kRowLockValue{&dockv::ValueEntryTypeAsChar::kRowLock, 1}; + const IsRowLock is_row_lock{intent_value == kRowLockValue}; RETURN_NOT_OK(EnumerateWeakIntents( key, - [&functor](FullDocKey full_doc_key, KeyBytes* encoded_key_buffer) { + [&functor, is_row_lock](FullDocKey full_doc_key, KeyBytes* encoded_key_buffer) { return functor( AncestorDocKey::kTrue, full_doc_key, Slice() /* intent_value */, encoded_key_buffer, - LastKey::kFalse); + LastKey::kFalse, + is_row_lock); }, encoded_key_buffer, partial_range_key_intents)); return functor( - AncestorDocKey::kFalse, FullDocKey::kTrue, intent_value, encoded_key_buffer, last_key); + AncestorDocKey::kFalse, FullDocKey::kTrue, intent_value, encoded_key_buffer, + last_key, is_row_lock); } } // namespace yb::dockv diff --git a/src/yb/dockv/intent.h b/src/yb/dockv/intent.h index 6357be329038..8e0b3d706ad8 100644 --- a/src/yb/dockv/intent.h +++ b/src/yb/dockv/intent.h @@ -118,11 +118,23 @@ Result DecodeIntentValue( // Decodes transaction ID from intent value. Consumes it from intent_value slice. Result DecodeTransactionIdFromIntentValue(Slice* intent_value); -IntentTypeSet GetIntentTypesForRead(IsolationLevel level, RowMarkType row_mark); +struct ReadIntentTypeSets { + IntentTypeSet read; + IntentTypeSet row_mark; +}; + +[[nodiscard]] ReadIntentTypeSets GetIntentTypesForRead(IsolationLevel level, RowMarkType row_mark); + +[[nodiscard]] IntentTypeSet GetIntentTypesForWrite(IsolationLevel level); -IntentTypeSet GetIntentTypesForWrite(IsolationLevel level); +YB_STRONGLY_TYPED_BOOL(IsRowLock); + +[[nodiscard]] inline IntentTypeSet GetIntentTypes( + const ReadIntentTypeSets& intents, IsRowLock is_row_lock) { + return is_row_lock ? intents.row_mark : intents.read; +} -inline IntentTypeSet MakeWeak(IntentTypeSet inp) { +[[nodiscard]] inline IntentTypeSet MakeWeak(IntentTypeSet inp) { static constexpr auto kWeakIntentMask = (1 << kStrongIntentFlag) - 1; const auto value = inp.ToUIntPtr(); @@ -163,10 +175,10 @@ YB_STRONGLY_TYPED_BOOL(FullDocKey); // So, we use boost::function which doesn't have such issue: // http://www.boost.org/doc/libs/1_65_1/doc/html/function/misc.html using EnumerateIntentsCallback = boost::function< - Status(AncestorDocKey, FullDocKey, Slice, KeyBytes*, LastKey)>; + Status(AncestorDocKey, FullDocKey, Slice, KeyBytes*, LastKey, IsRowLock)>; Status EnumerateIntents( - Slice key, const Slice& intent_value, const EnumerateIntentsCallback& functor, + Slice key, Slice intent_value, const EnumerateIntentsCallback& functor, KeyBytes* encoded_key_buffer, PartialRangeKeyIntents partial_range_key_intents, LastKey last_key = LastKey::kFalse); diff --git a/src/yb/tablet/tablet.cc b/src/yb/tablet/tablet.cc index 07a9626eb958..d44fa0ce652e 100644 --- a/src/yb/tablet/tablet.cc +++ b/src/yb/tablet/tablet.cc @@ -3628,6 +3628,7 @@ Result Tablet::CreateTransactionOperationContext( } Status Tablet::CreateReadIntents( + IsolationLevel level, const TransactionMetadataPB& transaction_metadata, const SubTransactionMetadataPB& subtransaction_metadata, const google::protobuf::RepeatedPtrField& ql_batch, @@ -3649,7 +3650,7 @@ Status Tablet::CreateReadIntents( table_info = VERIFY_RESULT(metadata_->GetTableInfo(pgsql_read.table_id())); } docdb::PgsqlReadOperation doc_op(pgsql_read, txn_op_ctx); - RETURN_NOT_OK(doc_op.GetIntents(table_info->schema(), write_batch)); + RETURN_NOT_OK(doc_op.GetIntents(table_info->schema(), level, write_batch)); } return Status::OK(); diff --git a/src/yb/tablet/tablet.h b/src/yb/tablet/tablet.h index 02f13a4ab467..c4da2c5d20b4 100644 --- a/src/yb/tablet/tablet.h +++ b/src/yb/tablet/tablet.h @@ -647,6 +647,7 @@ class Tablet : public AbstractTablet, Result TEST_CountRegularDBRecords(); Status CreateReadIntents( + IsolationLevel level, const TransactionMetadataPB& transaction_metadata, const SubTransactionMetadataPB& subtransaction_metadata, const google::protobuf::RepeatedPtrField& ql_batch, diff --git a/src/yb/tserver/read_query.cc b/src/yb/tserver/read_query.cc index dd9920b36bfe..c2aba705e68c 100644 --- a/src/yb/tserver/read_query.cc +++ b/src/yb/tserver/read_query.cc @@ -80,6 +80,28 @@ void HandleRedisReadRequestAsync( status_cb(tablet->HandleRedisReadRequest(read_operation_data, redis_read_request, response)); } +Result GetIsolationLevel( + const ReadRequestPB& req, TabletServerIf* server, TabletPeerTablet* peer_tablet) { + if (!req.has_transaction()) { + return IsolationLevel::NON_TRANSACTIONAL; + } + + // Unfortunately, determining the isolation level is not as straightforward as it seems. All but + // the first request to a given tablet by a particular transaction assume that the tablet already + // has the transaction metadata, including the isolation level, and those requests expect us to + // retrieve the isolation level from that metadata. Failure to do so was the cause of a + // serialization anomaly tested by TestOneOrTwoAdmins + // (https://github.com/yugabyte/yugabyte-db/issues/1572). + + const auto& transaction = req.transaction(); + if (transaction.has_isolation()) { + // This must be the first request to this tablet by this particular transaction. + return transaction.isolation(); + } + *peer_tablet = VERIFY_RESULT(LookupTabletPeer(server->tablet_peer_lookup(), req.tablet_id())); + return peer_tablet->tablet->GetIsolationLevel(transaction); +} + class ReadQuery : public std::enable_shared_from_this, public rpc::ThreadPoolTask { public: ReadQuery( @@ -212,27 +234,10 @@ Status ReadQuery::DoPerform() { TRACE("Start Read"); TRACE_EVENT1("tserver", "TabletServiceImpl::Read", "tablet_id", req_->tablet_id()); VLOG(2) << "Received Read RPC: " << req_->DebugString(); - // Unfortunately, determining the isolation level is not as straightforward as it seems. All but - // the first request to a given tablet by a particular transaction assume that the tablet already - // has the transaction metadata, including the isolation level, and those requests expect us to - // retrieve the isolation level from that metadata. Failure to do so was the cause of a - // serialization anomaly tested by TestOneOrTwoAdmins - // (https://github.com/yugabyte/yugabyte-db/issues/1572). - bool serializable_isolation = false; TabletPeerTablet peer_tablet; - if (req_->has_transaction()) { - IsolationLevel isolation_level; - if (req_->transaction().has_isolation()) { - // This must be the first request to this tablet by this particular transaction. - isolation_level = req_->transaction().isolation(); - } else { - peer_tablet = VERIFY_RESULT(LookupTabletPeer( - server_.tablet_peer_lookup(), req_->tablet_id())); - isolation_level = VERIFY_RESULT(peer_tablet.tablet->GetIsolationLevelFromPB(*req_)); - } - serializable_isolation = isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION; - + const auto isolation_level = VERIFY_RESULT(GetIsolationLevel(*req_, &server_, &peer_tablet)); + if (isolation_level != IsolationLevel::NON_TRANSACTIONAL) { if (PREDICT_FALSE(FLAGS_TEST_transactional_read_delay_ms > 0)) { LOG(INFO) << "Delaying transactional read for " << FLAGS_TEST_transactional_read_delay_ms << " ms."; @@ -248,13 +253,15 @@ Status ReadQuery::DoPerform() { #endif } + const auto serializable_isolation = isolation_level == IsolationLevel::SERIALIZABLE_ISOLATION; + // Get the most restrictive row mark present in the batch of PostgreSQL requests. // TODO: rather handle individual row marks once we start batching read requests (issue #2495) - RowMarkType batch_row_mark = RowMarkType::ROW_MARK_ABSENT; + auto batch_row_mark = RowMarkType::ROW_MARK_ABSENT; CatalogVersionChecker catalog_version_checker(server_); for (const auto& pg_req : req_->pgsql_batch()) { RETURN_NOT_OK(catalog_version_checker(pg_req)); - RowMarkType current_row_mark = GetRowMarkTypeFromPB(pg_req); + auto current_row_mark = GetRowMarkTypeFromPB(pg_req); if (IsValidRowMarkType(current_row_mark)) { if (!req_->has_transaction()) { return STATUS( @@ -264,7 +271,7 @@ Status ReadQuery::DoPerform() { batch_row_mark = GetStrongestRowMarkType({current_row_mark, batch_row_mark}); } } - const bool has_row_mark = IsValidRowMarkType(batch_row_mark); + const auto has_row_mark = IsValidRowMarkType(batch_row_mark); LeaderTabletPeer leader_peer; auto tablet_peer = peer_tablet.tablet_peer; @@ -391,8 +398,8 @@ Status ReadQuery::DoPerform() { // TODO(dtxn) write request id RETURN_NOT_OK(leader_peer.tablet->CreateReadIntents( - req_->transaction(), req_->subtransaction(), req_->ql_batch(), req_->pgsql_batch(), - &write_batch)); + isolation_level, req_->transaction(), req_->subtransaction(), + req_->ql_batch(), req_->pgsql_batch(), &write_batch)); query->AdjustYsqlQueryTransactionality(req_->pgsql_batch_size()); diff --git a/src/yb/util/enums.h b/src/yb/util/enums.h index 4436594960cc..ba9a1cf2a43a 100644 --- a/src/yb/util/enums.h +++ b/src/yb/util/enums.h @@ -378,6 +378,12 @@ class EnumBitSet { return impl_.to_ullong() > rhs.impl_.to_ullong(); } + EnumBitSet operator~() const { + EnumBitSet result; + result.impl_ = ~impl_; + return result; + } + private: std::bitset(nullptr))> impl_; diff --git a/src/yb/yql/pgwrapper/pg_misc_conflicts-test.cc b/src/yb/yql/pgwrapper/pg_misc_conflicts-test.cc index 371ba1fa8b8e..b7e176e5e9e7 100644 --- a/src/yb/yql/pgwrapper/pg_misc_conflicts-test.cc +++ b/src/yb/yql/pgwrapper/pg_misc_conflicts-test.cc @@ -11,10 +11,15 @@ // under the License. // +#include + #include #include "yb/gutil/dynamic_annotations.h" +#include "yb/util/enums.h" +#include "yb/util/result.h" +#include "yb/util/tostring.h" #include "yb/util/tsan_util.h" #include "yb/yql/pgwrapper/libpq_utils.h" @@ -27,7 +32,21 @@ DECLARE_int32(ysql_max_write_restart_attempts); namespace yb::pgwrapper { +using namespace std::literals; + +namespace { + +YB_DEFINE_ENUM(ExplicitRowLock, (kNoLock)(kForUpdate)(kForNoKeyUpdate)(kForShare)(kForKeyShare)); + +template +std::string TestParamToString(const testing::TestParamInfo& param_info) { + return ToString(param_info.param); +} + +} // namespace + class PgMiscConflictsTest : public PgMiniTestBase { + protected: void SetUp() override { ANNOTATE_UNPROTECTED_WRITE(FLAGS_enable_wait_queues) = false; ANNOTATE_UNPROTECTED_WRITE(FLAGS_ysql_max_write_restart_attempts) = 0; @@ -90,7 +109,7 @@ TEST_F(PgMiscConflictsTest, TablegroupFKDelete) { // Test checks absence of conflict with concurrent update in case only key columns are read by the // query and all key columns are specified. -TEST_F(PgMiscConflictsTest, SerializableInsolationWeakReadIntent) { +TEST_F(PgMiscConflictsTest, SerializableIsolationWeakReadIntent) { constexpr auto* kTable = "tbl"; auto conn = ASSERT_RESULT(SetHighPriTxn(Connect())); @@ -108,14 +127,72 @@ TEST_F(PgMiscConflictsTest, SerializableInsolationWeakReadIntent) { ASSERT_OK(aux_conn.FetchFormat("SELECT k FROM $0 WHERE k = 1", kTable)); ASSERT_OK(aux_conn.CommitTransaction()); - ASSERT_OK(aux_conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); + auto serialize_access_conflict_checker = [&aux_conn](auto&&... args) { + ASSERT_OK(aux_conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); + auto status = aux_conn.ExecuteFormat(std::forward(args)...); + ASSERT_TRUE(IsSerializeAccessError(status)) << status; + ASSERT_OK(aux_conn.RollbackTransaction()); + }; + // Conflict. Because not only key columns are read and StrongRead intent is created on // entire k = 1 row - auto status = aux_conn.ExecuteFormat("SELECT * FROM $0 WHERE k = 1", kTable); - ASSERT_TRUE(IsSerializeAccessError(status)) << status; - ASSERT_OK(aux_conn.RollbackTransaction()); + serialize_access_conflict_checker("SELECT * FROM $0 WHERE k = 1", kTable); + + // Conflict due to scan (non single row read). + serialize_access_conflict_checker("SELECT * FROM $0", kTable); + + // Conflict due to scan (non single row read). + serialize_access_conflict_checker("SELECT k FROM $0", kTable); ASSERT_OK(conn.CommitTransaction()); } +class PgRowLockTest : public PgMiscConflictsTest, + public testing::WithParamInterface { + protected: + void SetUp() override { + PgMiscConflictsTest::SetUp(); + auto conn = ASSERT_RESULT(Connect()); + ASSERT_OK(conn.ExecuteFormat("CREATE TABLE $0 (k INT PRIMARY KEY, v INT)", kTable)); + ASSERT_OK(conn.ExecuteFormat("INSERT INTO $0 VALUES (1, 10)", kTable)); + } + + // Test checks that that serializable read locks are taken even when an explicit row locking + // clause is specified. + void DoSerializableTxnUpdate() { + auto conn = ASSERT_RESULT(SetHighPriTxn(Connect())); + auto aux_conn = ASSERT_RESULT(SetLowPriTxn(Connect())); + ASSERT_OK(conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); + ASSERT_OK(conn.FetchFormat( + "SELECT * FROM $0 WHERE k = 1 $1", kTable, ToQueryString(GetParam()))); + ASSERT_OK(aux_conn.StartTransaction(IsolationLevel::SERIALIZABLE_ISOLATION)); + const auto status = aux_conn.ExecuteFormat("UPDATE $0 SET v = 100 WHERE k = 1", kTable); + ASSERT_TRUE(IsSerializeAccessError(status)) << "Unexpected status " << status; + ASSERT_OK(aux_conn.RollbackTransaction()); + ASSERT_OK(conn.CommitTransaction()); + } + + static constexpr auto kTable = "tbl"sv; + + private: + std::string_view ToQueryString(ExplicitRowLock row_lock) { + switch(row_lock) { + case ExplicitRowLock::kNoLock: return std::string_view(); + case ExplicitRowLock::kForUpdate: return "FOR UPDATE"sv; + case ExplicitRowLock::kForNoKeyUpdate: return "FOR NO KEY UPDATE"sv; + case ExplicitRowLock::kForShare: return "FOR SHARE"sv; + case ExplicitRowLock::kForKeyShare: return "FOR KEY SHARE"sv; + } + FATAL_INVALID_ENUM_VALUE(ExplicitRowLock, row_lock); + } +}; + +INSTANTIATE_TEST_CASE_P( + PgMiscConflictsTest, PgRowLockTest, + testing::ValuesIn(kExplicitRowLockArray), TestParamToString); + +TEST_P(PgRowLockTest, SerializableTxnUpdate) { + DoSerializableTxnUpdate(); +} + } // namespace yb::pgwrapper diff --git a/src/yb/yql/pgwrapper/pg_row_lock-test.cc b/src/yb/yql/pgwrapper/pg_row_lock-test.cc index d9f52e075352..dbd3f34f555b 100644 --- a/src/yb/yql/pgwrapper/pg_row_lock-test.cc +++ b/src/yb/yql/pgwrapper/pg_row_lock-test.cc @@ -464,44 +464,44 @@ class PgMiniTestTxnHelper : public PgMiniTestNoTxnRetry { // Transaction 1. ASSERT_OK(StartTxn(&conn)); - RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE", cur_name); + RowLock(&conn, "SELECT k FROM t WHERE k = 1 FOR UPDATE", cur_name); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR UPDATE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR NO KEY UPDATE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR SHARE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR KEY SHARE")); ASSERT_OK(conn.Execute("COMMIT")); // Transaction 2. ASSERT_OK(StartTxn(&conn)); - RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE", cur_name); + RowLock(&conn, "SELECT k FROM t WHERE k = 1 FOR NO KEY UPDATE", cur_name); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); - ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR UPDATE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR NO KEY UPDATE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR SHARE")); + ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR KEY SHARE")); ASSERT_OK(conn.Execute("COMMIT")); // Transaction 3. ASSERT_OK(StartTxn(&conn)); - RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR SHARE", cur_name); + RowLock(&conn, "SELECT k FROM t WHERE k = 1 FOR SHARE", cur_name); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); - ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); - ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); - ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR UPDATE")); + ASSERT_NOK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR NO KEY UPDATE")); + ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR SHARE")); + ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR KEY SHARE")); ASSERT_OK(conn.Execute("COMMIT")); // Transaction 4. ASSERT_OK(StartTxn(&conn)); - RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE", cur_name); + RowLock(&conn, "SELECT k FROM t WHERE k = 1 FOR KEY SHARE", cur_name); - ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR NO KEY UPDATE")); - ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR SHARE")); - ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE")); + ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR NO KEY UPDATE")); + ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR SHARE")); + ASSERT_RESULT(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR KEY SHARE")); ASSERT_OK(conn.Execute("COMMIT")); @@ -509,9 +509,9 @@ class PgMiniTestTxnHelper : public PgMiniTestNoTxnRetry { // Check FOR KEY SHARE + FOR UPDATE conflict separately // as FOR KEY SHARE uses regular and FOR UPDATE uses high txn priority. ASSERT_OK(StartTxn(&conn)); - RowLock(&conn, "SELECT * FROM t WHERE k = 1 FOR KEY SHARE", cur_name); + RowLock(&conn, "SELECT k FROM t WHERE k = 1 FOR KEY SHARE", cur_name); - ASSERT_OK(FetchInTxn(&extra_conn, "SELECT * FROM t WHERE k = 1 FOR UPDATE")); + ASSERT_OK(FetchInTxn(&extra_conn, "SELECT k FROM t WHERE k = 1 FOR UPDATE")); ASSERT_NOK(conn.Execute("COMMIT")); }