diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 5148d515685bc0..4fc30f4968031b 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -23,6 +23,8 @@ #include "olap/data_dir.h" #include "olap/memtable.h" #include "olap/memtable_flush_executor.h" +#include "olap/rowset/beta_rowset_writer.h" +#include "olap/rowset/rowset_writer_context.h" #include "olap/schema.h" #include "olap/schema_change.h" #include "olap/storage_engine.h" @@ -393,7 +395,8 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes, if (_tablet->enable_unique_key_merge_on_write()) { _storage_engine->txn_manager()->set_txn_related_delete_bitmap( _req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(), - _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids); + _tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids, + dynamic_cast(_rowset_writer.get())->get_num_mow_keys()); } _delta_written_success = true; diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index a18d8c5f67c564..fa2fec2b102d2b 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -831,6 +831,7 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr rowset_met int64_t total_data_size = 0; int64_t total_index_size = 0; std::vector segments_encoded_key_bounds; + std::unordered_set key_set; { std::lock_guard lock(_segid_statistics_map_mutex); for (const auto& itr : _segid_statistics_map) { @@ -838,8 +839,13 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr rowset_met total_data_size += itr.second.data_size; total_index_size += itr.second.index_size; segments_encoded_key_bounds.push_back(itr.second.key_bounds); + if (_context.enable_unique_key_merge_on_write) { + DCHECK(itr.second.key_set.get() != nullptr); + key_set.insert(itr.second.key_set->begin(), itr.second.key_set->end()); + } } } + _num_mow_keys = key_set.size(); for (auto itr = _segments_encoded_key_bounds.begin(); itr != _segments_encoded_key_bounds.end(); ++itr) { segments_encoded_key_bounds.push_back(*itr); @@ -985,6 +991,7 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptrget_key_set(); { std::lock_guard lock(_segid_statistics_map_mutex); CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 6f32b24f0842fd..35d2596763d3b3 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -84,6 +84,8 @@ class BetaRowsetWriter : public RowsetWriter { int32_t get_atomic_num_segment() const override { return _num_segment.load(); } + uint64_t get_num_mow_keys() { return _num_mow_keys; } + private: template Status _add_row(const RowType& row); @@ -166,9 +168,13 @@ class BetaRowsetWriter : public RowsetWriter { int64_t data_size; int64_t index_size; KeyBoundsPB key_bounds; + std::shared_ptr> key_set; }; - std::map _segid_statistics_map; std::mutex _segid_statistics_map_mutex; + std::map _segid_statistics_map; + + // used for check correctness of unique key mow keys. + std::atomic _num_mow_keys; bool _is_pending = false; bool _already_built = false; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 76c10ae31e3ce4..7f533f0a437bcb 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -148,6 +148,7 @@ Status SegmentWriter::init(const std::vector& col_ids, bool has_key) { _primary_key_index_builder.reset( new PrimaryKeyIndexBuilder(_file_writer, seq_col_length)); RETURN_IF_ERROR(_primary_key_index_builder->init()); + _key_set.reset(new std::unordered_set()); } else { _short_key_index_builder.reset( new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block)); @@ -205,6 +206,8 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po // create primary indexes for (size_t pos = 0; pos < num_rows; pos++) { std::string key = _full_encode_keys(key_columns, pos); + DCHECK(_key_set.get() != nullptr); + _key_set->insert(key); if (_tablet_schema->has_sequence_col()) { _encode_seq_column(seq_column, pos, &key); } diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 43c74eb2c967ef..a5b32ceca1022a 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -56,6 +56,8 @@ class ColumnWriter; extern const char* k_segment_magic; extern const uint32_t k_segment_magic_length; +using KeySetPtr = std::shared_ptr>; + struct SegmentWriterOptions { uint32_t num_rows_per_block = 1024; bool enable_unique_key_merge_on_write = false; @@ -97,6 +99,8 @@ class SegmentWriter { Slice min_encoded_key(); Slice max_encoded_key(); + KeySetPtr get_key_set() { return _key_set; } + DataDir* get_data_dir() { return _data_dir; } bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; } @@ -150,6 +154,8 @@ class SegmentWriter { const KeyCoder* _seq_coder = nullptr; std::vector _key_index_size; size_t _short_key_row_pos = 0; + // used to check if there's duplicate key in aggregate key and unique key data model + KeySetPtr _key_set; std::vector _column_ids; bool _has_key = true; diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index bb6e0ebe1eeeb9..255c22cce76097 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -57,6 +57,7 @@ #include "olap/tablet_meta.h" #include "olap/tablet_meta_manager.h" #include "olap/tablet_schema.h" +#include "olap/txn_manager.h" #include "segment_loader.h" #include "util/path_util.h" #include "util/pretty_printer.h" @@ -2161,8 +2162,9 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset) return Status::OK(); } -Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& pre_rowset_ids) { +Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info) { + DeleteBitmapPtr delete_bitmap = load_info->delete_bitmap; + const RowsetIdUnorderedSet& pre_rowset_ids = load_info->rowset_ids; RowsetIdUnorderedSet cur_rowset_ids; RowsetIdUnorderedSet rowset_ids_to_add; RowsetIdUnorderedSet rowset_ids_to_del; @@ -2193,6 +2195,21 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapP RETURN_IF_ERROR(calc_delete_bitmap(rowset->rowset_id(), segments, &rowset_ids_to_add, delete_bitmap, cur_version - 1, true)); + // Check the delete_bitmap correctness. + DeleteBitmap rs_bm(tablet_id()); + delete_bitmap->subset({rowset->rowset_id(), 0, 0}, {rowset->rowset_id(), UINT32_MAX, INT64_MAX}, + &rs_bm); + auto num_rows = rowset->num_rows(); + auto bitmap_cardinality = rs_bm.cardinality(); + std::string err_msg = fmt::format( + "The delete bitmap of unique key table may not correct, expect num unique keys: {}, " + "now the num_rows: {}, delete bitmap cardinality: {}, num sgements: {}", + load_info->num_keys, num_rows, bitmap_cardinality, rowset->num_segments()); + DCHECK_EQ(load_info->num_keys, num_rows - bitmap_cardinality) << err_msg; + if (load_info->num_keys != num_rows - bitmap_cardinality) { + return Status::InternalError(err_msg); + } + // update version without write lock, compaction and publish_txn // will update delete bitmap, handle compaction with _rowset_update_lock // and publish_txn runs sequential so no need to lock here diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 242949b3e16538..5bf005865ebd05 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -54,6 +54,8 @@ class CumulativeCompactionPolicy; class CumulativeCompaction; class BaseCompaction; class RowsetWriter; + +struct TabletTxnInfo; struct RowsetWriterContext; using TabletSharedPtr = std::shared_ptr; @@ -351,8 +353,7 @@ class Tablet : public BaseTablet { bool check_pre_segments = false); Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset); - Status update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& pre_rowset_ids); + Status update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info); uint64_t calc_compaction_output_rowset_delete_bitmap( const std::vector& input_rowsets, const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version, diff --git a/be/src/olap/tablet_meta.cpp b/be/src/olap/tablet_meta.cpp index 407d97ac4c2a0e..3a916e74910961 100644 --- a/be/src/olap/tablet_meta.cpp +++ b/be/src/olap/tablet_meta.cpp @@ -944,6 +944,14 @@ void DeleteBitmap::merge(const DeleteBitmap& other) { } } +uint64_t DeleteBitmap::cardinality() { + uint64_t cardinality = 0; + for (auto entry : delete_bitmap) { + cardinality += entry.second.cardinality(); + } + return cardinality; +} + // We cannot just copy the underlying memory to construct a string // due to equivalent objects may have different padding bytes. // Reading padding bytes is undefined behavior, neither copy nor diff --git a/be/src/olap/tablet_meta.h b/be/src/olap/tablet_meta.h index 92284c83aa5f53..7f021c11bc4b27 100644 --- a/be/src/olap/tablet_meta.h +++ b/be/src/olap/tablet_meta.h @@ -369,6 +369,8 @@ class DeleteBitmap { */ void merge(const DeleteBitmap& other); + uint64_t cardinality(); + /** * Checks if the given row is marked deleted in bitmap with the condition: * all the bitmaps that diff --git a/be/src/olap/txn_manager.cpp b/be/src/olap/txn_manager.cpp index 5661804a920bcd..e97131cc009427 100644 --- a/be/src/olap/txn_manager.cpp +++ b/be/src/olap/txn_manager.cpp @@ -168,12 +168,10 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac return Status::OK(); } -void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id, - TTransactionId transaction_id, TTabletId tablet_id, - SchemaHash schema_hash, TabletUid tablet_uid, - bool unique_key_merge_on_write, - DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids) { +void TxnManager::set_txn_related_delete_bitmap( + TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id, + SchemaHash schema_hash, TabletUid tablet_uid, bool unique_key_merge_on_write, + DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, uint64_t num_keys) { pair key(partition_id, transaction_id); TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid); @@ -190,6 +188,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id, load_info.unique_key_merge_on_write = unique_key_merge_on_write; load_info.delete_bitmap = delete_bitmap; load_info.rowset_ids = rowset_ids; + load_info.num_keys = num_keys; } } @@ -320,8 +319,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id, if (tablet == nullptr) { return Status::OK(); } - RETURN_IF_ERROR(tablet->update_delete_bitmap( - rowset_ptr, load_info->delete_bitmap, load_info->rowset_ids)); + RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset_ptr, load_info)); std::shared_lock rlock(tablet->get_header_lock()); tablet->save_meta(); } diff --git a/be/src/olap/txn_manager.h b/be/src/olap/txn_manager.h index 197307f589dfc0..7319234b27b939 100644 --- a/be/src/olap/txn_manager.h +++ b/be/src/olap/txn_manager.h @@ -58,6 +58,7 @@ struct TabletTxnInfo { // records rowsets calc in commit txn RowsetIdUnorderedSet rowset_ids; int64_t creation_time; + uint64_t num_keys; TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset) : load_id(load_id), @@ -167,7 +168,7 @@ class TxnManager { TTabletId tablet_id, SchemaHash schema_hash, TabletUid tablet_uid, bool unique_key_merge_on_write, DeleteBitmapPtr delete_bitmap, - const RowsetIdUnorderedSet& rowset_ids); + const RowsetIdUnorderedSet& rowset_ids, uint64_t num_keys); private: using TxnKey = std::pair; // partition_id, transaction_id;