Skip to content

Commit

Permalink
[enhancement](merge-on-write) add delete bitmap correctness check for…
Browse files Browse the repository at this point in the history
… single load (apache#17168)

cherry-pick apache#17147
  • Loading branch information
zhannngchen authored Feb 27, 2023
1 parent 33c538a commit af8fca0
Show file tree
Hide file tree
Showing 11 changed files with 67 additions and 15 deletions.
5 changes: 4 additions & 1 deletion be/src/olap/delta_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "olap/data_dir.h"
#include "olap/memtable.h"
#include "olap/memtable_flush_executor.h"
#include "olap/rowset/beta_rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
#include "olap/schema_change.h"
#include "olap/storage_engine.h"
Expand Down Expand Up @@ -393,7 +395,8 @@ Status DeltaWriter::close_wait(const PSlaveTabletNodes& slave_tablet_nodes,
if (_tablet->enable_unique_key_merge_on_write()) {
_storage_engine->txn_manager()->set_txn_related_delete_bitmap(
_req.partition_id, _req.txn_id, _tablet->tablet_id(), _tablet->schema_hash(),
_tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids);
_tablet->tablet_uid(), true, _delete_bitmap, _rowset_ids,
dynamic_cast<BetaRowsetWriter*>(_rowset_writer.get())->get_num_mow_keys());
}

_delta_written_success = true;
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/rowset/beta_rowset_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -831,15 +831,21 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
int64_t total_data_size = 0;
int64_t total_index_size = 0;
std::vector<KeyBoundsPB> segments_encoded_key_bounds;
std::unordered_set<std::string> key_set;
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
for (const auto& itr : _segid_statistics_map) {
num_rows_written += itr.second.row_num;
total_data_size += itr.second.data_size;
total_index_size += itr.second.index_size;
segments_encoded_key_bounds.push_back(itr.second.key_bounds);
if (_context.enable_unique_key_merge_on_write) {
DCHECK(itr.second.key_set.get() != nullptr);
key_set.insert(itr.second.key_set->begin(), itr.second.key_set->end());
}
}
}
_num_mow_keys = key_set.size();
for (auto itr = _segments_encoded_key_bounds.begin(); itr != _segments_encoded_key_bounds.end();
++itr) {
segments_encoded_key_bounds.push_back(*itr);
Expand Down Expand Up @@ -985,6 +991,7 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
segstat.data_size = segment_size;
segstat.index_size = index_size;
segstat.key_bounds = key_bounds;
segstat.key_set = (*writer)->get_key_set();
{
std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true);
Expand Down
8 changes: 7 additions & 1 deletion be/src/olap/rowset/beta_rowset_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ class BetaRowsetWriter : public RowsetWriter {

int32_t get_atomic_num_segment() const override { return _num_segment.load(); }

uint64_t get_num_mow_keys() { return _num_mow_keys; }

private:
template <typename RowType>
Status _add_row(const RowType& row);
Expand Down Expand Up @@ -166,9 +168,13 @@ class BetaRowsetWriter : public RowsetWriter {
int64_t data_size;
int64_t index_size;
KeyBoundsPB key_bounds;
std::shared_ptr<std::unordered_set<std::string>> key_set;
};
std::map<uint32_t, Statistics> _segid_statistics_map;
std::mutex _segid_statistics_map_mutex;
std::map<uint32_t, Statistics> _segid_statistics_map;

// used for check correctness of unique key mow keys.
std::atomic<uint64_t> _num_mow_keys;

bool _is_pending = false;
bool _already_built = false;
Expand Down
3 changes: 3 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ Status SegmentWriter::init(const std::vector<uint32_t>& col_ids, bool has_key) {
_primary_key_index_builder.reset(
new PrimaryKeyIndexBuilder(_file_writer, seq_col_length));
RETURN_IF_ERROR(_primary_key_index_builder->init());
_key_set.reset(new std::unordered_set<std::string>());
} else {
_short_key_index_builder.reset(
new ShortKeyIndexBuilder(_segment_id, _opts.num_rows_per_block));
Expand Down Expand Up @@ -205,6 +206,8 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po
// create primary indexes
for (size_t pos = 0; pos < num_rows; pos++) {
std::string key = _full_encode_keys(key_columns, pos);
DCHECK(_key_set.get() != nullptr);
_key_set->insert(key);
if (_tablet_schema->has_sequence_col()) {
_encode_seq_column(seq_column, pos, &key);
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ class ColumnWriter;
extern const char* k_segment_magic;
extern const uint32_t k_segment_magic_length;

using KeySetPtr = std::shared_ptr<std::unordered_set<std::string>>;

struct SegmentWriterOptions {
uint32_t num_rows_per_block = 1024;
bool enable_unique_key_merge_on_write = false;
Expand Down Expand Up @@ -97,6 +99,8 @@ class SegmentWriter {
Slice min_encoded_key();
Slice max_encoded_key();

KeySetPtr get_key_set() { return _key_set; }

DataDir* get_data_dir() { return _data_dir; }
bool is_unique_key() { return _tablet_schema->keys_type() == UNIQUE_KEYS; }

Expand Down Expand Up @@ -150,6 +154,8 @@ class SegmentWriter {
const KeyCoder* _seq_coder = nullptr;
std::vector<uint16_t> _key_index_size;
size_t _short_key_row_pos = 0;
// used to check if there's duplicate key in aggregate key and unique key data model
KeySetPtr _key_set;

std::vector<uint32_t> _column_ids;
bool _has_key = true;
Expand Down
21 changes: 19 additions & 2 deletions be/src/olap/tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
#include "olap/tablet_meta.h"
#include "olap/tablet_meta_manager.h"
#include "olap/tablet_schema.h"
#include "olap/txn_manager.h"
#include "segment_loader.h"
#include "util/path_util.h"
#include "util/pretty_printer.h"
Expand Down Expand Up @@ -2161,8 +2162,9 @@ Status Tablet::update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset)
return Status::OK();
}

Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& pre_rowset_ids) {
Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info) {
DeleteBitmapPtr delete_bitmap = load_info->delete_bitmap;
const RowsetIdUnorderedSet& pre_rowset_ids = load_info->rowset_ids;
RowsetIdUnorderedSet cur_rowset_ids;
RowsetIdUnorderedSet rowset_ids_to_add;
RowsetIdUnorderedSet rowset_ids_to_del;
Expand Down Expand Up @@ -2193,6 +2195,21 @@ Status Tablet::update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapP
RETURN_IF_ERROR(calc_delete_bitmap(rowset->rowset_id(), segments, &rowset_ids_to_add,
delete_bitmap, cur_version - 1, true));

// Check the delete_bitmap correctness.
DeleteBitmap rs_bm(tablet_id());
delete_bitmap->subset({rowset->rowset_id(), 0, 0}, {rowset->rowset_id(), UINT32_MAX, INT64_MAX},
&rs_bm);
auto num_rows = rowset->num_rows();
auto bitmap_cardinality = rs_bm.cardinality();
std::string err_msg = fmt::format(
"The delete bitmap of unique key table may not correct, expect num unique keys: {}, "
"now the num_rows: {}, delete bitmap cardinality: {}, num sgements: {}",
load_info->num_keys, num_rows, bitmap_cardinality, rowset->num_segments());
DCHECK_EQ(load_info->num_keys, num_rows - bitmap_cardinality) << err_msg;
if (load_info->num_keys != num_rows - bitmap_cardinality) {
return Status::InternalError(err_msg);
}

// update version without write lock, compaction and publish_txn
// will update delete bitmap, handle compaction with _rowset_update_lock
// and publish_txn runs sequential so no need to lock here
Expand Down
5 changes: 3 additions & 2 deletions be/src/olap/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class CumulativeCompactionPolicy;
class CumulativeCompaction;
class BaseCompaction;
class RowsetWriter;

struct TabletTxnInfo;
struct RowsetWriterContext;

using TabletSharedPtr = std::shared_ptr<Tablet>;
Expand Down Expand Up @@ -351,8 +353,7 @@ class Tablet : public BaseTablet {
bool check_pre_segments = false);

Status update_delete_bitmap_without_lock(const RowsetSharedPtr& rowset);
Status update_delete_bitmap(const RowsetSharedPtr& rowset, DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& pre_rowset_ids);
Status update_delete_bitmap(const RowsetSharedPtr& rowset, const TabletTxnInfo* load_info);
uint64_t calc_compaction_output_rowset_delete_bitmap(
const std::vector<RowsetSharedPtr>& input_rowsets,
const RowIdConversion& rowid_conversion, uint64_t start_version, uint64_t end_version,
Expand Down
8 changes: 8 additions & 0 deletions be/src/olap/tablet_meta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -944,6 +944,14 @@ void DeleteBitmap::merge(const DeleteBitmap& other) {
}
}

uint64_t DeleteBitmap::cardinality() {
uint64_t cardinality = 0;
for (auto entry : delete_bitmap) {
cardinality += entry.second.cardinality();
}
return cardinality;
}

// We cannot just copy the underlying memory to construct a string
// due to equivalent objects may have different padding bytes.
// Reading padding bytes is undefined behavior, neither copy nor
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/tablet_meta.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,8 @@ class DeleteBitmap {
*/
void merge(const DeleteBitmap& other);

uint64_t cardinality();

/**
* Checks if the given row is marked deleted in bitmap with the condition:
* all the bitmaps that
Expand Down
14 changes: 6 additions & 8 deletions be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,10 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac
return Status::OK();
}

void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id,
TTransactionId transaction_id, TTabletId tablet_id,
SchemaHash schema_hash, TabletUid tablet_uid,
bool unique_key_merge_on_write,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids) {
void TxnManager::set_txn_related_delete_bitmap(
TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
SchemaHash schema_hash, TabletUid tablet_uid, bool unique_key_merge_on_write,
DeleteBitmapPtr delete_bitmap, const RowsetIdUnorderedSet& rowset_ids, uint64_t num_keys) {
pair<int64_t, int64_t> key(partition_id, transaction_id);
TabletInfo tablet_info(tablet_id, schema_hash, tablet_uid);

Expand All @@ -190,6 +188,7 @@ void TxnManager::set_txn_related_delete_bitmap(TPartitionId partition_id,
load_info.unique_key_merge_on_write = unique_key_merge_on_write;
load_info.delete_bitmap = delete_bitmap;
load_info.rowset_ids = rowset_ids;
load_info.num_keys = num_keys;
}
}

Expand Down Expand Up @@ -320,8 +319,7 @@ Status TxnManager::publish_txn(OlapMeta* meta, TPartitionId partition_id,
if (tablet == nullptr) {
return Status::OK();
}
RETURN_IF_ERROR(tablet->update_delete_bitmap(
rowset_ptr, load_info->delete_bitmap, load_info->rowset_ids));
RETURN_IF_ERROR(tablet->update_delete_bitmap(rowset_ptr, load_info));
std::shared_lock rlock(tablet->get_header_lock());
tablet->save_meta();
}
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/txn_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ struct TabletTxnInfo {
// records rowsets calc in commit txn
RowsetIdUnorderedSet rowset_ids;
int64_t creation_time;
uint64_t num_keys;

TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(load_id),
Expand Down Expand Up @@ -167,7 +168,7 @@ class TxnManager {
TTabletId tablet_id, SchemaHash schema_hash,
TabletUid tablet_uid, bool unique_key_merge_on_write,
DeleteBitmapPtr delete_bitmap,
const RowsetIdUnorderedSet& rowset_ids);
const RowsetIdUnorderedSet& rowset_ids, uint64_t num_keys);

private:
using TxnKey = std::pair<int64_t, int64_t>; // partition_id, transaction_id;
Expand Down

0 comments on commit af8fca0

Please sign in to comment.