Skip to content

Commit

Permalink
[opt](partial update) Extract some common logic in partial update (ap…
Browse files Browse the repository at this point in the history
  • Loading branch information
bobhan1 authored Aug 21, 2024
1 parent 5b55f4b commit 8fa411f
Show file tree
Hide file tree
Showing 11 changed files with 262 additions and 417 deletions.
147 changes: 46 additions & 101 deletions be/src/olap/base_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "olap/calc_delete_bitmap_executor.h"
#include "olap/delete_bitmap_calculator.h"
#include "olap/memtable.h"
#include "olap/partial_update_info.h"
#include "olap/primary_key_index.h"
#include "olap/rowid_conversion.h"
#include "olap/rowset/beta_rowset.h"
Expand Down Expand Up @@ -56,55 +57,6 @@ bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", "update_

static bvar::Adder<size_t> g_total_tablet_num("doris_total_tablet_num");

// read columns by read plan
// read_index: ori_pos-> block_idx
Status read_columns_by_plan(TabletSchemaSPtr tablet_schema,
const std::vector<uint32_t> cids_to_read,
const PartialUpdateReadPlan& read_plan,
const std::map<RowsetId, RowsetSharedPtr>& rsid_to_rowset,
vectorized::Block& block, std::map<uint32_t, uint32_t>* read_index,
const signed char* __restrict skip_map = nullptr) {
bool has_row_column = tablet_schema->has_row_store_for_all_columns();
auto mutable_columns = block.mutate_columns();
size_t read_idx = 0;
for (auto rs_it : read_plan) {
for (auto seg_it : rs_it.second) {
auto rowset_iter = rsid_to_rowset.find(rs_it.first);
CHECK(rowset_iter != rsid_to_rowset.end());
std::vector<uint32_t> rids;
for (auto [rid, pos] : seg_it.second) {
if (skip_map && skip_map[pos]) {
continue;
}
rids.emplace_back(rid);
(*read_index)[pos] = read_idx++;
}
if (has_row_column) {
auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second,
*tablet_schema, seg_it.first,
rids, cids_to_read, block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
return st;
}
continue;
}
for (size_t cid = 0; cid < mutable_columns.size(); ++cid) {
TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]);
auto st = BaseTablet::fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids,
tablet_column, mutable_columns[cid]);
// set read value to output block
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value";
return st;
}
}
}
}
block.set_columns(std::move(mutable_columns));
return Status::OK();
}

Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t segid,
const TabletColumn& target_column,
SegmentCacheHandle* segment_cache_handle,
Expand Down Expand Up @@ -559,27 +511,6 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col,
return Status::Error<ErrorCode::KEY_NOT_FOUND>("can't find key in all rowsets");
}

void BaseTablet::prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan) {
auto rs_it = read_plan->find(row_location.rowset_id);
if (rs_it == read_plan->end()) {
std::map<uint32_t, std::vector<RidAndPos>> segid_to_rid;
std::vector<RidAndPos> rid_pos;
rid_pos.emplace_back(RidAndPos {row_location.row_id, pos});
segid_to_rid.emplace(row_location.segment_id, rid_pos);
read_plan->emplace(row_location.rowset_id, segid_to_rid);
return;
}
auto seg_it = rs_it->second.find(row_location.segment_id);
if (seg_it == rs_it->second.end()) {
std::vector<RidAndPos> rid_pos;
rid_pos.emplace_back(RidAndPos {row_location.row_id, pos});
rs_it->second.emplace(row_location.segment_id, rid_pos);
return;
}
seg_it->second.emplace_back(RidAndPos {row_location.row_id, pos});
}

// if user pass a token, then all calculation works will submit to a threadpool,
// user can get all delete bitmaps from that token.
// if `token` is nullptr, the calculation will run in local, and user can get the result
Expand Down Expand Up @@ -758,8 +689,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset,
// So here we should read version 5's columns and build a new row, which is
// consists of version 6's update columns and version 5's origin columns
// here we build 2 read plan for ori values and update values
prepare_to_read(loc, pos, &read_plan_ori);
prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update);
read_plan_ori.prepare_to_read(loc, pos);
read_plan_update.prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos);
rsid_to_rowset[rowset_find->rowset_id()] = rowset_find;
++pos;
// delete bitmap will be calculate when memtable flush and
Expand Down Expand Up @@ -930,6 +861,40 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t
return Status::OK();
}

const signed char* BaseTablet::get_delete_sign_column_data(vectorized::Block& block,
size_t rows_at_least) {
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
const auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
if (delete_sign_col.size() >= rows_at_least) {
return delete_sign_col.get_data().data();
}
}
return nullptr;
};

Status BaseTablet::generate_default_value_block(const TabletSchema& schema,
const std::vector<uint32_t>& cids,
const std::vector<std::string>& default_values,
const vectorized::Block& ref_block,
vectorized::Block& default_value_block) {
auto mutable_default_value_columns = default_value_block.mutate_columns();
for (auto i = 0; i < cids.size(); ++i) {
const auto& column = schema.column(cids[i]);
if (column.has_default_value()) {
const auto& default_value = default_values[i];
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(ref_block.get_by_position(i).type->from_string(
rb, mutable_default_value_columns[i].get()));
}
}
default_value_block.set_columns(std::move(mutable_default_value_columns));
return Status::OK();
}

Status BaseTablet::generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update,
Expand All @@ -947,27 +912,13 @@ Status BaseTablet::generate_new_block_for_partial_update(
auto old_block = rowset_schema->create_block_by_cids(missing_cids);
auto update_block = rowset_schema->create_block_by_cids(update_cids);

auto get_delete_sign_column_data = [](vectorized::Block& block,
size_t rows) -> const signed char* {
if (const vectorized::ColumnWithTypeAndName* delete_sign_column =
block.try_get_by_name(DELETE_SIGN);
delete_sign_column != nullptr) {
const auto& delete_sign_col =
reinterpret_cast<const vectorized::ColumnInt8&>(*(delete_sign_column->column));
if (delete_sign_col.size() >= rows) {
return delete_sign_col.get_data().data();
}
}
return nullptr;
};

// rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block
std::map<uint32_t, uint32_t> read_index_update;

// read current rowset first, if a row in the current rowset has delete sign mark
// we don't need to read values from old block
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update,
rsid_to_rowset, update_block, &read_index_update));
RETURN_IF_ERROR(read_plan_update.read_columns_by_plan(
*rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update));
size_t update_rows = read_index_update.size();
for (auto i = 0; i < update_cids.size(); ++i) {
for (auto idx = 0; idx < update_rows; ++idx) {
Expand All @@ -986,27 +937,21 @@ Status BaseTablet::generate_new_block_for_partial_update(

// rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block
std::map<uint32_t, uint32_t> read_index_old;
RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset,
old_block, &read_index_old, new_block_delete_signs));
RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, missing_cids, rsid_to_rowset,
old_block, &read_index_old,
new_block_delete_signs));
size_t old_rows = read_index_old.size();
const auto* __restrict old_block_delete_signs =
get_delete_sign_column_data(old_block, old_rows);

// build default value block
auto default_value_block = old_block.clone_empty();
auto mutable_default_value_columns = default_value_block.mutate_columns();
if (old_block_delete_signs != nullptr || new_block_delete_signs != nullptr) {
for (auto i = 0; i < missing_cids.size(); ++i) {
const auto& column = rowset_schema->column(missing_cids[i]);
if (column.has_default_value()) {
const auto& default_value = partial_update_info->default_values[i];
vectorized::ReadBuffer rb(const_cast<char*>(default_value.c_str()),
default_value.size());
RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string(
rb, mutable_default_value_columns[i].get()));
}
}
RETURN_IF_ERROR(BaseTablet::generate_default_value_block(
*rowset_schema, missing_cids, partial_update_info->default_values, old_block,
default_value_block));
}
auto mutable_default_value_columns = default_value_block.mutate_columns();

CHECK(update_rows >= old_rows);

Expand Down
15 changes: 11 additions & 4 deletions be/src/olap/base_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "common/status.h"
#include "olap/iterators.h"
#include "olap/olap_common.h"
#include "olap/partial_update_info.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/tablet_fwd.h"
#include "olap/tablet_meta.h"
Expand All @@ -39,6 +38,8 @@ class RowsetWriter;
class CalcDeleteBitmapToken;
class SegmentCacheHandle;
class RowIdConversion;
struct PartialUpdateInfo;
class PartialUpdateReadPlan;

struct TabletWithVersion {
BaseTabletSPtr tablet;
Expand Down Expand Up @@ -150,9 +151,6 @@ class BaseTablet {
std::vector<std::unique_ptr<SegmentCacheHandle>>& segment_caches,
RowsetSharedPtr* rowset = nullptr, bool with_rowid = true);

static void prepare_to_read(const RowLocation& row_location, size_t pos,
PartialUpdateReadPlan* read_plan);

// calc delete bitmap when flush memtable, use a fake version to calc
// For example, cur max version is 5, and we use version 6 to calc but
// finally this rowset publish version with 8, we should make up data
Expand Down Expand Up @@ -189,6 +187,15 @@ class BaseTablet {
int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids,
std::vector<RowsetSharedPtr>* rowsets = nullptr);

static const signed char* get_delete_sign_column_data(vectorized::Block& block,
size_t rows_at_least = 0);

static Status generate_default_value_block(const TabletSchema& schema,
const std::vector<uint32_t>& cids,
const std::vector<std::string>& default_values,
const vectorized::Block& ref_block,
vectorized::Block& default_value_block);

static Status generate_new_block_for_partial_update(
TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info,
const PartialUpdateReadPlan& read_plan_ori,
Expand Down
16 changes: 4 additions & 12 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <typeinfo>
#include <unordered_map>
#include <unordered_set>
#include <utility>

#include "io/io_common.h"
#include "olap/olap_define.h"
Expand Down Expand Up @@ -508,28 +509,19 @@ class DeleteBitmap;
// merge on write context
struct MowContext {
MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
const std::vector<RowsetSharedPtr>& rowset_ptrs, std::shared_ptr<DeleteBitmap> db)
std::vector<RowsetSharedPtr> rowset_ptrs, std::shared_ptr<DeleteBitmap> db)
: max_version(version),
txn_id(txnid),
rowset_ids(ids),
rowset_ptrs(rowset_ptrs),
delete_bitmap(db) {}
rowset_ptrs(std::move(rowset_ptrs)),
delete_bitmap(std::move(db)) {}
int64_t max_version;
int64_t txn_id;
const RowsetIdUnorderedSet& rowset_ids;
std::vector<RowsetSharedPtr> rowset_ptrs;
std::shared_ptr<DeleteBitmap> delete_bitmap;
};

// used in mow partial update
struct RidAndPos {
uint32_t rid;
// pos in block
size_t pos;
};

using PartialUpdateReadPlan = std::map<RowsetId, std::map<uint32_t, std::vector<RidAndPos>>>;

// used for controll compaction
struct VersionWithTime {
std::atomic<int64_t> version;
Expand Down
Loading

0 comments on commit 8fa411f

Please sign in to comment.