From 8fa411f2f4bf3bb881d12120f9ea9452c5ea4d34 Mon Sep 17 00:00:00 2001 From: bobhan1 Date: Wed, 21 Aug 2024 15:36:54 +0800 Subject: [PATCH] [opt](partial update) Extract some common logic in partial update (#39619) --- be/src/olap/base_tablet.cpp | 147 ++++++----------- be/src/olap/base_tablet.h | 15 +- be/src/olap/olap_common.h | 16 +- be/src/olap/partial_update_info.cpp | 150 ++++++++++++++++- be/src/olap/partial_update_info.h | 38 +++++ .../olap/rowset/segment_v2/segment_writer.cpp | 154 ++---------------- .../olap/rowset/segment_v2/segment_writer.h | 5 - .../segment_v2/vertical_segment_writer.cpp | 145 +---------------- .../segment_v2/vertical_segment_writer.h | 5 - be/src/olap/tablet_schema.cpp | 2 +- be/src/olap/tablet_schema.h | 2 +- 11 files changed, 262 insertions(+), 417 deletions(-) diff --git a/be/src/olap/base_tablet.cpp b/be/src/olap/base_tablet.cpp index db1e0283854a39..934b00f56698b8 100644 --- a/be/src/olap/base_tablet.cpp +++ b/be/src/olap/base_tablet.cpp @@ -24,6 +24,7 @@ #include "olap/calc_delete_bitmap_executor.h" #include "olap/delete_bitmap_calculator.h" #include "olap/memtable.h" +#include "olap/partial_update_info.h" #include "olap/primary_key_index.h" #include "olap/rowid_conversion.h" #include "olap/rowset/beta_rowset.h" @@ -56,55 +57,6 @@ bvar::LatencyRecorder g_tablet_update_delete_bitmap_latency("doris_pk", "update_ static bvar::Adder g_total_tablet_num("doris_total_tablet_num"); -// read columns by read plan -// read_index: ori_pos-> block_idx -Status read_columns_by_plan(TabletSchemaSPtr tablet_schema, - const std::vector cids_to_read, - const PartialUpdateReadPlan& read_plan, - const std::map& rsid_to_rowset, - vectorized::Block& block, std::map* read_index, - const signed char* __restrict skip_map = nullptr) { - bool has_row_column = tablet_schema->has_row_store_for_all_columns(); - auto mutable_columns = block.mutate_columns(); - size_t read_idx = 0; - for (auto rs_it : read_plan) { - for (auto seg_it : rs_it.second) { - auto rowset_iter = rsid_to_rowset.find(rs_it.first); - CHECK(rowset_iter != rsid_to_rowset.end()); - std::vector rids; - for (auto [rid, pos] : seg_it.second) { - if (skip_map && skip_map[pos]) { - continue; - } - rids.emplace_back(rid); - (*read_index)[pos] = read_idx++; - } - if (has_row_column) { - auto st = BaseTablet::fetch_value_through_row_column(rowset_iter->second, - *tablet_schema, seg_it.first, - rids, cids_to_read, block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { - TabletColumn tablet_column = tablet_schema->column(cids_to_read[cid]); - auto st = BaseTablet::fetch_value_by_rowids(rowset_iter->second, seg_it.first, rids, - tablet_column, mutable_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value"; - return st; - } - } - } - } - block.set_columns(std::move(mutable_columns)); - return Status::OK(); -} - Status _get_segment_column_iterator(const BetaRowsetSharedPtr& rowset, uint32_t segid, const TabletColumn& target_column, SegmentCacheHandle* segment_cache_handle, @@ -559,27 +511,6 @@ Status BaseTablet::lookup_row_key(const Slice& encoded_key, bool with_seq_col, return Status::Error("can't find key in all rowsets"); } -void BaseTablet::prepare_to_read(const RowLocation& row_location, size_t pos, - PartialUpdateReadPlan* read_plan) { - auto rs_it = read_plan->find(row_location.rowset_id); - if (rs_it == read_plan->end()) { - std::map> segid_to_rid; - std::vector rid_pos; - rid_pos.emplace_back(RidAndPos {row_location.row_id, pos}); - segid_to_rid.emplace(row_location.segment_id, rid_pos); - read_plan->emplace(row_location.rowset_id, segid_to_rid); - return; - } - auto seg_it = rs_it->second.find(row_location.segment_id); - if (seg_it == rs_it->second.end()) { - std::vector rid_pos; - rid_pos.emplace_back(RidAndPos {row_location.row_id, pos}); - rs_it->second.emplace(row_location.segment_id, rid_pos); - return; - } - seg_it->second.emplace_back(RidAndPos {row_location.row_id, pos}); -} - // if user pass a token, then all calculation works will submit to a threadpool, // user can get all delete bitmaps from that token. // if `token` is nullptr, the calculation will run in local, and user can get the result @@ -758,8 +689,8 @@ Status BaseTablet::calc_segment_delete_bitmap(RowsetSharedPtr rowset, // So here we should read version 5's columns and build a new row, which is // consists of version 6's update columns and version 5's origin columns // here we build 2 read plan for ori values and update values - prepare_to_read(loc, pos, &read_plan_ori); - prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos, &read_plan_update); + read_plan_ori.prepare_to_read(loc, pos); + read_plan_update.prepare_to_read(RowLocation {rowset_id, seg->id(), row_id}, pos); rsid_to_rowset[rowset_find->rowset_id()] = rowset_find; ++pos; // delete bitmap will be calculate when memtable flush and @@ -930,6 +861,40 @@ Status BaseTablet::fetch_value_by_rowids(RowsetSharedPtr input_rowset, uint32_t return Status::OK(); } +const signed char* BaseTablet::get_delete_sign_column_data(vectorized::Block& block, + size_t rows_at_least) { + if (const vectorized::ColumnWithTypeAndName* delete_sign_column = + block.try_get_by_name(DELETE_SIGN); + delete_sign_column != nullptr) { + const auto& delete_sign_col = + reinterpret_cast(*(delete_sign_column->column)); + if (delete_sign_col.size() >= rows_at_least) { + return delete_sign_col.get_data().data(); + } + } + return nullptr; +}; + +Status BaseTablet::generate_default_value_block(const TabletSchema& schema, + const std::vector& cids, + const std::vector& default_values, + const vectorized::Block& ref_block, + vectorized::Block& default_value_block) { + auto mutable_default_value_columns = default_value_block.mutate_columns(); + for (auto i = 0; i < cids.size(); ++i) { + const auto& column = schema.column(cids[i]); + if (column.has_default_value()) { + const auto& default_value = default_values[i]; + vectorized::ReadBuffer rb(const_cast(default_value.c_str()), + default_value.size()); + RETURN_IF_ERROR(ref_block.get_by_position(i).type->from_string( + rb, mutable_default_value_columns[i].get())); + } + } + default_value_block.set_columns(std::move(mutable_default_value_columns)); + return Status::OK(); +} + Status BaseTablet::generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, const PartialUpdateReadPlan& read_plan_ori, const PartialUpdateReadPlan& read_plan_update, @@ -947,27 +912,13 @@ Status BaseTablet::generate_new_block_for_partial_update( auto old_block = rowset_schema->create_block_by_cids(missing_cids); auto update_block = rowset_schema->create_block_by_cids(update_cids); - auto get_delete_sign_column_data = [](vectorized::Block& block, - size_t rows) -> const signed char* { - if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr) { - const auto& delete_sign_col = - reinterpret_cast(*(delete_sign_column->column)); - if (delete_sign_col.size() >= rows) { - return delete_sign_col.get_data().data(); - } - } - return nullptr; - }; - // rowid in the final block(start from 0, increase continuously) -> rowid to read in update_block std::map read_index_update; // read current rowset first, if a row in the current rowset has delete sign mark // we don't need to read values from old block - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, update_cids, read_plan_update, - rsid_to_rowset, update_block, &read_index_update)); + RETURN_IF_ERROR(read_plan_update.read_columns_by_plan( + *rowset_schema, update_cids, rsid_to_rowset, update_block, &read_index_update)); size_t update_rows = read_index_update.size(); for (auto i = 0; i < update_cids.size(); ++i) { for (auto idx = 0; idx < update_rows; ++idx) { @@ -986,27 +937,21 @@ Status BaseTablet::generate_new_block_for_partial_update( // rowid in the final block(start from 0, increase, may not continuous becasue we skip to read some rows) -> rowid to read in old_block std::map read_index_old; - RETURN_IF_ERROR(read_columns_by_plan(rowset_schema, missing_cids, read_plan_ori, rsid_to_rowset, - old_block, &read_index_old, new_block_delete_signs)); + RETURN_IF_ERROR(read_plan_ori.read_columns_by_plan(*rowset_schema, missing_cids, rsid_to_rowset, + old_block, &read_index_old, + new_block_delete_signs)); size_t old_rows = read_index_old.size(); const auto* __restrict old_block_delete_signs = get_delete_sign_column_data(old_block, old_rows); // build default value block auto default_value_block = old_block.clone_empty(); - auto mutable_default_value_columns = default_value_block.mutate_columns(); if (old_block_delete_signs != nullptr || new_block_delete_signs != nullptr) { - for (auto i = 0; i < missing_cids.size(); ++i) { - const auto& column = rowset_schema->column(missing_cids[i]); - if (column.has_default_value()) { - const auto& default_value = partial_update_info->default_values[i]; - vectorized::ReadBuffer rb(const_cast(default_value.c_str()), - default_value.size()); - RETURN_IF_ERROR(old_block.get_by_position(i).type->from_string( - rb, mutable_default_value_columns[i].get())); - } - } + RETURN_IF_ERROR(BaseTablet::generate_default_value_block( + *rowset_schema, missing_cids, partial_update_info->default_values, old_block, + default_value_block)); } + auto mutable_default_value_columns = default_value_block.mutate_columns(); CHECK(update_rows >= old_rows); diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index d329c786fc9781..ab289822df891f 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -24,7 +24,6 @@ #include "common/status.h" #include "olap/iterators.h" #include "olap/olap_common.h" -#include "olap/partial_update_info.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet_fwd.h" #include "olap/tablet_meta.h" @@ -39,6 +38,8 @@ class RowsetWriter; class CalcDeleteBitmapToken; class SegmentCacheHandle; class RowIdConversion; +struct PartialUpdateInfo; +class PartialUpdateReadPlan; struct TabletWithVersion { BaseTabletSPtr tablet; @@ -150,9 +151,6 @@ class BaseTablet { std::vector>& segment_caches, RowsetSharedPtr* rowset = nullptr, bool with_rowid = true); - static void prepare_to_read(const RowLocation& row_location, size_t pos, - PartialUpdateReadPlan* read_plan); - // calc delete bitmap when flush memtable, use a fake version to calc // For example, cur max version is 5, and we use version 6 to calc but // finally this rowset publish version with 8, we should make up data @@ -189,6 +187,15 @@ class BaseTablet { int64_t txn_id, const RowsetIdUnorderedSet& rowset_ids, std::vector* rowsets = nullptr); + static const signed char* get_delete_sign_column_data(vectorized::Block& block, + size_t rows_at_least = 0); + + static Status generate_default_value_block(const TabletSchema& schema, + const std::vector& cids, + const std::vector& default_values, + const vectorized::Block& ref_block, + vectorized::Block& default_value_block); + static Status generate_new_block_for_partial_update( TabletSchemaSPtr rowset_schema, const PartialUpdateInfo* partial_update_info, const PartialUpdateReadPlan& read_plan_ori, diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index b6e336722f3eeb..dac1750c24b54f 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -33,6 +33,7 @@ #include #include #include +#include #include "io/io_common.h" #include "olap/olap_define.h" @@ -508,12 +509,12 @@ class DeleteBitmap; // merge on write context struct MowContext { MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids, - const std::vector& rowset_ptrs, std::shared_ptr db) + std::vector rowset_ptrs, std::shared_ptr db) : max_version(version), txn_id(txnid), rowset_ids(ids), - rowset_ptrs(rowset_ptrs), - delete_bitmap(db) {} + rowset_ptrs(std::move(rowset_ptrs)), + delete_bitmap(std::move(db)) {} int64_t max_version; int64_t txn_id; const RowsetIdUnorderedSet& rowset_ids; @@ -521,15 +522,6 @@ struct MowContext { std::shared_ptr delete_bitmap; }; -// used in mow partial update -struct RidAndPos { - uint32_t rid; - // pos in block - size_t pos; -}; - -using PartialUpdateReadPlan = std::map>>; - // used for controll compaction struct VersionWithTime { std::atomic version; diff --git a/be/src/olap/partial_update_info.cpp b/be/src/olap/partial_update_info.cpp index 5867a77559b36d..bff3f4196369db 100644 --- a/be/src/olap/partial_update_info.cpp +++ b/be/src/olap/partial_update_info.cpp @@ -19,7 +19,14 @@ #include +#include "olap/base_tablet.h" +#include "olap/olap_common.h" +#include "olap/rowset/rowset.h" +#include "olap/rowset/rowset_writer_context.h" #include "olap/tablet_schema.h" +#include "olap/utils.h" +#include "vec/common/assert_cast.h" +#include "vec/core/block.h" namespace doris { @@ -125,24 +132,20 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids( const auto& column = tablet_schema.column(cur_cid); if (column.has_default_value()) { std::string default_value; - if (UNLIKELY(tablet_schema.column(cur_cid).type() == - FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && - to_lower(tablet_schema.column(cur_cid).default_value()) - .find(to_lower("CURRENT_TIMESTAMP")) != + if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATETIMEV2 && + to_lower(column.default_value()).find(to_lower("CURRENT_TIMESTAMP")) != std::string::npos)) { DateV2Value dtv; dtv.from_unixtime(timestamp_ms / 1000, timezone); default_value = dtv.debug_string(); - } else if (UNLIKELY(tablet_schema.column(cur_cid).type() == - FieldType::OLAP_FIELD_TYPE_DATEV2 && - to_lower(tablet_schema.column(cur_cid).default_value()) - .find(to_lower("CURRENT_DATE")) != + } else if (UNLIKELY(column.type() == FieldType::OLAP_FIELD_TYPE_DATEV2 && + to_lower(column.default_value()).find(to_lower("CURRENT_DATE")) != std::string::npos)) { DateV2Value dv; dv.from_unixtime(timestamp_ms / 1000, timezone); default_value = dv.debug_string(); } else { - default_value = tablet_schema.column(cur_cid).default_value(); + default_value = column.default_value(); } default_values.emplace_back(default_value); } else { @@ -152,4 +155,133 @@ void PartialUpdateInfo::_generate_default_values_for_missing_cids( } CHECK_EQ(missing_cids.size(), default_values.size()); } + +void PartialUpdateReadPlan::prepare_to_read(const RowLocation& row_location, size_t pos) { + plan[row_location.rowset_id][row_location.segment_id].emplace_back(row_location.row_id, pos); +} + +// read columns by read plan +// read_index: ori_pos-> block_idx +Status PartialUpdateReadPlan::read_columns_by_plan( + const TabletSchema& tablet_schema, const std::vector cids_to_read, + const std::map& rsid_to_rowset, vectorized::Block& block, + std::map* read_index, const signed char* __restrict skip_map) const { + bool has_row_column = tablet_schema.has_row_store_for_all_columns(); + auto mutable_columns = block.mutate_columns(); + size_t read_idx = 0; + for (const auto& [rowset_id, segment_row_mappings] : plan) { + for (const auto& [segment_id, mappings] : segment_row_mappings) { + auto rowset_iter = rsid_to_rowset.find(rowset_id); + CHECK(rowset_iter != rsid_to_rowset.end()); + std::vector rids; + for (auto [rid, pos] : mappings) { + if (skip_map && skip_map[pos]) { + continue; + } + rids.emplace_back(rid); + (*read_index)[pos] = read_idx++; + } + if (has_row_column) { + auto st = doris::BaseTablet::fetch_value_through_row_column( + rowset_iter->second, tablet_schema, segment_id, rids, cids_to_read, block); + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value through row column"; + return st; + } + continue; + } + for (size_t cid = 0; cid < mutable_columns.size(); ++cid) { + TabletColumn tablet_column = tablet_schema.column(cids_to_read[cid]); + auto st = doris::BaseTablet::fetch_value_by_rowids( + rowset_iter->second, segment_id, rids, tablet_column, mutable_columns[cid]); + // set read value to output block + if (!st.ok()) { + LOG(WARNING) << "failed to fetch value"; + return st; + } + } + } + } + block.set_columns(std::move(mutable_columns)); + return Status::OK(); +} + +Status PartialUpdateReadPlan::fill_missing_columns( + RowsetWriterContext* rowset_ctx, const std::map& rsid_to_rowset, + const TabletSchema& tablet_schema, vectorized::Block& full_block, + const std::vector& use_default_or_null_flag, bool has_default_or_nullable, + const size_t& segment_start_pos, const vectorized::Block* block) const { + auto mutable_full_columns = full_block.mutate_columns(); + // create old value columns + const auto& missing_cids = rowset_ctx->partial_update_info->missing_cids; + auto old_value_block = tablet_schema.create_block_by_cids(missing_cids); + CHECK_EQ(missing_cids.size(), old_value_block.columns()); + + // record real pos, key is input line num, value is old_block line num + std::map read_index; + RETURN_IF_ERROR(read_columns_by_plan(tablet_schema, missing_cids, rsid_to_rowset, + old_value_block, &read_index, nullptr)); + + const auto* delete_sign_column_data = BaseTablet::get_delete_sign_column_data(old_value_block); + + // build default value columns + auto default_value_block = old_value_block.clone_empty(); + if (has_default_or_nullable || delete_sign_column_data != nullptr) { + RETURN_IF_ERROR(BaseTablet::generate_default_value_block( + tablet_schema, missing_cids, rowset_ctx->partial_update_info->default_values, + old_value_block, default_value_block)); + } + auto mutable_default_value_columns = default_value_block.mutate_columns(); + + // fill all missing value from mutable_old_columns, need to consider default value and null value + for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { + // `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row + // for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column + // marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will + // be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not + // read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column + // to check if a row REALLY exists in the table. + auto pos_in_old_block = read_index[idx + segment_start_pos]; + if (use_default_or_null_flag[idx] || (delete_sign_column_data != nullptr && + delete_sign_column_data[pos_in_old_block] != 0)) { + for (auto i = 0; i < missing_cids.size(); ++i) { + // if the column has default value, fill it with default value + // otherwise, if the column is nullable, fill it with null value + const auto& tablet_column = tablet_schema.column(missing_cids[i]); + auto& missing_col = mutable_full_columns[missing_cids[i]]; + // clang-format off + if (tablet_column.has_default_value()) { + missing_col->insert_from(*mutable_default_value_columns[i].get(), 0); + } else if (tablet_column.is_nullable()) { + auto* nullable_column = + assert_cast(missing_col.get()); + nullable_column->insert_null_elements(1); + } else if (tablet_schema.auto_increment_column() == tablet_column.name()) { + const auto& column = + *DORIS_TRY(rowset_ctx->tablet_schema->column(tablet_column.name())); + DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT); + auto* auto_inc_column = + assert_cast(missing_col.get()); + auto_inc_column->insert( + (assert_cast( + block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__").column.get()))->get_element(idx)); + } else { + // If the control flow reaches this branch, the column neither has default value + // nor is nullable. It means that the row's delete sign is marked, and the value + // columns are useless and won't be read. So we can just put arbitary values in the cells + missing_col->insert_default(); + } + // clang-format on + } + continue; + } + for (auto i = 0; i < missing_cids.size(); ++i) { + mutable_full_columns[missing_cids[i]]->insert_from( + *old_value_block.get_columns_with_type_and_name()[i].column.get(), + pos_in_old_block); + } + } + return Status::OK(); +} + } // namespace doris diff --git a/be/src/olap/partial_update_info.h b/be/src/olap/partial_update_info.h index 987f31ec7f7eb9..a99bf7181184f4 100644 --- a/be/src/olap/partial_update_info.h +++ b/be/src/olap/partial_update_info.h @@ -17,13 +17,24 @@ #pragma once #include +#include #include #include #include +#include "common/status.h" +#include "olap/rowset/rowset_fwd.h" +#include "olap/tablet_fwd.h" + namespace doris { class TabletSchema; class PartialUpdateInfoPB; +struct RowLocation; +namespace vectorized { +class Block; +} +struct RowsetWriterContext; +struct RowsetId; struct PartialUpdateInfo { void init(const TabletSchema& tablet_schema, bool partial_update, @@ -55,4 +66,31 @@ struct PartialUpdateInfo { // default values for missing cids std::vector default_values; }; + +// used in mow partial update +struct RidAndPos { + uint32_t rid; + // pos in block + size_t pos; +}; + +class PartialUpdateReadPlan { +public: + void prepare_to_read(const RowLocation& row_location, size_t pos); + Status read_columns_by_plan(const TabletSchema& tablet_schema, + const std::vector cids_to_read, + const std::map& rsid_to_rowset, + vectorized::Block& block, std::map* read_index, + const signed char* __restrict skip_map = nullptr) const; + Status fill_missing_columns(RowsetWriterContext* rowset_ctx, + const std::map& rsid_to_rowset, + const TabletSchema& tablet_schema, vectorized::Block& full_block, + const std::vector& use_default_or_null_flag, + bool has_default_or_nullable, const size_t& segment_start_pos, + const vectorized::Block* block) const; + +private: + std::map>> plan; +}; + } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index a450f8ffd99b9e..2c94942bac08c7 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -37,6 +37,7 @@ #include "olap/data_dir.h" #include "olap/key_coder.h" #include "olap/olap_common.h" +#include "olap/partial_update_info.h" #include "olap/primary_key_index.h" #include "olap/row_cursor.h" // RowCursor // IWYU pragma: keep #include "olap/rowset/rowset_writer_context.h" // RowsetWriterContext @@ -522,16 +523,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* bool has_default_or_nullable = false; std::vector use_default_or_null_flag; use_default_or_null_flag.reserve(num_rows); - const vectorized::Int8* delete_sign_column_data = nullptr; - if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - full_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr) { - auto& delete_sign_col = - reinterpret_cast(*(delete_sign_column->column)); - if (delete_sign_col.size() >= row_pos + num_rows) { - delete_sign_column_data = delete_sign_col.get_data().data(); - } - } + const auto* delete_sign_column_data = + BaseTablet::get_delete_sign_column_data(full_block, row_pos + num_rows); std::vector specified_rowsets; { @@ -554,6 +547,9 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } } std::vector> segment_caches(specified_rowsets.size()); + + PartialUpdateReadPlan read_plan; + // locate rows in base data int64_t num_rows_updated = 0; int64_t num_rows_new_added = 0; @@ -638,7 +634,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* // partial update should not contain invisible columns use_default_or_null_flag.emplace_back(false); _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - _tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid); + read_plan.prepare_to_read(loc, segment_pos); } if (st.is()) { @@ -662,10 +658,10 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* } // read and fill block - auto mutable_full_columns = full_block.mutate_columns(); - RETURN_IF_ERROR(fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos, block)); - full_block.set_columns(std::move(mutable_full_columns)); + RETURN_IF_ERROR(read_plan.fill_missing_columns( + _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block, + use_default_or_null_flag, has_default_or_nullable, segment_start_pos, block)); + // convert block to row store format _serialize_block_to_row_column(full_block); @@ -721,134 +717,6 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* return Status::OK(); } -Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, - const size_t& segment_start_pos, - const vectorized::Block* block) { - if (config::is_cloud_mode()) { - // TODO(plat1ko): cloud mode - return Status::NotSupported("fill_missing_columns"); - } - // create old value columns - const auto& cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids; - auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing); - CHECK_EQ(cids_missing.size(), old_value_block.columns()); - bool has_row_column = _tablet_schema->has_row_store_for_all_columns(); - // record real pos, key is input line num, value is old_block line num - std::map read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector rids; - for (auto [rid, pos] : seg_it.second) { - rids.emplace_back(rid); - read_index[pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column( - rowset, *_tablet_schema, seg_it.first, rids, cids_missing, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - auto mutable_old_columns = old_value_block.mutate_columns(); - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } - } - old_value_block.set_columns(std::move(mutable_old_columns)); - } - } - // build default value columns - auto default_value_block = old_value_block.clone_empty(); - auto mutable_default_value_columns = default_value_block.mutate_columns(); - - const vectorized::Int8* delete_sign_column_data = nullptr; - if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - old_value_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr) { - auto& delete_sign_col = - reinterpret_cast(*(delete_sign_column->column)); - delete_sign_column_data = delete_sign_col.get_data().data(); - } - - if (has_default_or_nullable || delete_sign_column_data != nullptr) { - for (auto i = 0; i < cids_missing.size(); ++i) { - const auto& column = _tablet_schema->column(cids_missing[i]); - if (column.has_default_value()) { - const auto& default_value = - _opts.rowset_ctx->partial_update_info->default_values[i]; - vectorized::ReadBuffer rb(const_cast(default_value.c_str()), - default_value.size()); - RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string( - rb, mutable_default_value_columns[i].get())); - } - } - } - - // fill all missing value from mutable_old_columns, need to consider default value and null value - for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { - // `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row - // for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column - // marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will - // be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not - // read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column - // to check if a row REALLY exists in the table. - if (use_default_or_null_flag[idx] || - (delete_sign_column_data != nullptr && - delete_sign_column_data[read_index[idx + segment_start_pos]] != 0)) { - for (auto i = 0; i < cids_missing.size(); ++i) { - // if the column has default value, fill it with default value - // otherwise, if the column is nullable, fill it with null value - const auto& tablet_column = _tablet_schema->column(cids_missing[i]); - if (tablet_column.has_default_value()) { - mutable_full_columns[cids_missing[i]]->insert_from( - *mutable_default_value_columns[i].get(), 0); - } else if (tablet_column.is_nullable()) { - auto nullable_column = assert_cast( - mutable_full_columns[cids_missing[i]].get()); - nullable_column->insert_null_elements(1); - } else if (_tablet_schema->auto_increment_column() == tablet_column.name()) { - const auto& column = *DORIS_TRY( - _opts.rowset_ctx->tablet_schema->column(tablet_column.name())); - DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT); - auto auto_inc_column = assert_cast( - mutable_full_columns[cids_missing[i]].get()); - auto_inc_column->insert( - (assert_cast( - block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") - .column.get())) - ->get_element(idx)); - } else { - // If the control flow reaches this branch, the column neither has default value - // nor is nullable. It means that the row's delete sign is marked, and the value - // columns are useless and won't be read. So we can just put arbitary values in the cells - mutable_full_columns[cids_missing[i]]->insert_default(); - } - } - continue; - } - auto pos_in_old_block = read_index[idx + segment_start_pos]; - for (auto i = 0; i < cids_missing.size(); ++i) { - mutable_full_columns[cids_missing[i]]->insert_from( - *old_value_block.get_columns_with_type_and_name()[i].column.get(), - pos_in_old_block); - } - } - return Status::OK(); -} - Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_pos, size_t num_rows) { if (_opts.rowset_ctx->partial_update_info && diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 3cdb71a45d7b15..c4b571cfc19d9d 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -136,10 +136,6 @@ class SegmentWriter { TabletSchemaSPtr flush_schema() const { return _flush_schema; }; void set_mow_context(std::shared_ptr mow_context); - Status fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos, - const vectorized::Block* block); private: DISALLOW_COPY_AND_ASSIGN(SegmentWriter); @@ -241,7 +237,6 @@ class SegmentWriter { std::shared_ptr _mow_context; // group every rowset-segment row id to speed up reader - PartialUpdateReadPlan _rssid_to_rid; std::map _rsid_to_rowset; // contains auto generated columns, should be nullptr if no variants's subcolumns TabletSchemaSPtr _flush_schema = nullptr; diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp index c14f3b557d7f2a..891fd8c6a10ce6 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp @@ -36,6 +36,7 @@ #include "inverted_index_fs_directory.h" #include "io/fs/file_writer.h" #include "io/fs/local_file_system.h" +#include "olap/base_tablet.h" #include "olap/data_dir.h" #include "olap/key_coder.h" #include "olap/olap_common.h" @@ -381,16 +382,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da bool has_default_or_nullable = false; std::vector use_default_or_null_flag; use_default_or_null_flag.reserve(data.num_rows); - const vectorized::Int8* delete_sign_column_data = nullptr; - if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - full_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr) { - auto& delete_sign_col = - reinterpret_cast(*(delete_sign_column->column)); - if (delete_sign_col.size() >= data.row_pos + data.num_rows) { - delete_sign_column_data = delete_sign_col.get_data().data(); - } - } + const auto* delete_sign_column_data = + BaseTablet::get_delete_sign_column_data(full_block, data.row_pos + data.num_rows); std::vector specified_rowsets; { @@ -416,6 +409,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da } std::vector> segment_caches(specified_rowsets.size()); + PartialUpdateReadPlan read_plan; + // locate rows in base data int64_t num_rows_updated = 0; int64_t num_rows_new_added = 0; @@ -498,7 +493,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da // partial update should not contain invisible columns use_default_or_null_flag.emplace_back(false); _rsid_to_rowset.emplace(rowset->rowset_id(), rowset); - _tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid); + read_plan.prepare_to_read(loc, segment_pos); } if (st.is()) { @@ -522,9 +517,9 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da } // read and fill block - auto mutable_full_columns = full_block.mutate_columns(); - RETURN_IF_ERROR(_fill_missing_columns(mutable_full_columns, use_default_or_null_flag, - has_default_or_nullable, segment_start_pos, data.block)); + RETURN_IF_ERROR(read_plan.fill_missing_columns( + _opts.rowset_ctx, _rsid_to_rowset, *_tablet_schema, full_block, + use_default_or_null_flag, has_default_or_nullable, segment_start_pos, data.block)); // row column should be filled here // convert block to row store format @@ -582,128 +577,6 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da return Status::OK(); } -Status VerticalSegmentWriter::_fill_missing_columns( - vectorized::MutableColumns& mutable_full_columns, - const std::vector& use_default_or_null_flag, bool has_default_or_nullable, - const size_t& segment_start_pos, const vectorized::Block* block) { - // create old value columns - const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids; - auto old_value_block = _tablet_schema->create_block_by_cids(missing_cids); - CHECK_EQ(missing_cids.size(), old_value_block.columns()); - auto mutable_old_columns = old_value_block.mutate_columns(); - bool has_row_column = _tablet_schema->has_row_store_for_all_columns(); - // record real pos, key is input line num, value is old_block line num - std::map read_index; - size_t read_idx = 0; - for (auto rs_it : _rssid_to_rid) { - for (auto seg_it : rs_it.second) { - auto rowset = _rsid_to_rowset[rs_it.first]; - CHECK(rowset); - std::vector rids; - for (auto [rid, pos] : seg_it.second) { - rids.emplace_back(rid); - read_index[pos] = read_idx++; - } - if (has_row_column) { - auto st = _tablet->fetch_value_through_row_column( - rowset, *_tablet_schema, seg_it.first, rids, missing_cids, old_value_block); - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value through row column"; - return st; - } - continue; - } - for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) { - TabletColumn tablet_column = _tablet_schema->column(missing_cids[cid]); - auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column, - mutable_old_columns[cid]); - // set read value to output block - if (!st.ok()) { - LOG(WARNING) << "failed to fetch value by rowids"; - return st; - } - } - } - } - // build default value columns - auto default_value_block = old_value_block.clone_empty(); - auto mutable_default_value_columns = default_value_block.mutate_columns(); - - const vectorized::Int8* delete_sign_column_data = nullptr; - if (const vectorized::ColumnWithTypeAndName* delete_sign_column = - old_value_block.try_get_by_name(DELETE_SIGN); - delete_sign_column != nullptr) { - auto& delete_sign_col = - reinterpret_cast(*(delete_sign_column->column)); - delete_sign_column_data = delete_sign_col.get_data().data(); - } - - if (has_default_or_nullable || delete_sign_column_data != nullptr) { - for (auto i = 0; i < missing_cids.size(); ++i) { - const auto& column = _tablet_schema->column(missing_cids[i]); - if (column.has_default_value()) { - const auto& default_value = - _opts.rowset_ctx->partial_update_info->default_values[i]; - vectorized::ReadBuffer rb(const_cast(default_value.c_str()), - default_value.size()); - RETURN_IF_ERROR(old_value_block.get_by_position(i).type->from_string( - rb, mutable_default_value_columns[i].get())); - } - } - } - - // fill all missing value from mutable_old_columns, need to consider default value and null value - for (auto idx = 0; idx < use_default_or_null_flag.size(); idx++) { - // `use_default_or_null_flag[idx] == false` doesn't mean that we should read values from the old row - // for the missing columns. For example, if a table has sequence column, the rows with DELETE_SIGN column - // marked will not be marked in delete bitmap(see https://github.com/apache/doris/pull/24011), so it will - // be found in Tablet::lookup_row_key() and `use_default_or_null_flag[idx]` will be false. But we should not - // read values from old rows for missing values in this occasion. So we should read the DELETE_SIGN column - // to check if a row REALLY exists in the table. - if (use_default_or_null_flag[idx] || - (delete_sign_column_data != nullptr && - delete_sign_column_data[read_index[idx + segment_start_pos]] != 0)) { - for (auto i = 0; i < missing_cids.size(); ++i) { - // if the column has default value, fill it with default value - // otherwise, if the column is nullable, fill it with null value - const auto& tablet_column = _tablet_schema->column(missing_cids[i]); - if (tablet_column.has_default_value()) { - mutable_full_columns[missing_cids[i]]->insert_from( - *mutable_default_value_columns[i].get(), 0); - } else if (tablet_column.is_nullable()) { - auto nullable_column = assert_cast( - mutable_full_columns[missing_cids[i]].get()); - nullable_column->insert_null_elements(1); - } else if (_tablet_schema->auto_increment_column() == tablet_column.name()) { - const auto& column = *DORIS_TRY( - _opts.rowset_ctx->tablet_schema->column(tablet_column.name())); - DCHECK(column.type() == FieldType::OLAP_FIELD_TYPE_BIGINT); - auto auto_inc_column = assert_cast( - mutable_full_columns[missing_cids[i]].get()); - auto_inc_column->insert( - (assert_cast( - block->get_by_name("__PARTIAL_UPDATE_AUTO_INC_COLUMN__") - .column.get())) - ->get_element(idx)); - } else { - // If the control flow reaches this branch, the column neither has default value - // nor is nullable. It means that the row's delete sign is marked, and the value - // columns are useless and won't be read. So we can just put arbitary values in the cells - mutable_full_columns[missing_cids[i]]->insert_default(); - } - } - continue; - } - auto pos_in_old_block = read_index[idx + segment_start_pos]; - for (auto i = 0; i < missing_cids.size(); ++i) { - mutable_full_columns[missing_cids[i]]->insert_from( - *old_value_block.get_columns_with_type_and_name()[i].column.get(), - pos_in_old_block); - } - } - return Status::OK(); -} - Status VerticalSegmentWriter::batch_block(const vectorized::Block* block, size_t row_pos, size_t num_rows) { if (_opts.rowset_ctx->partial_update_info && diff --git a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h index 66525ea4c768d5..d84e08d081f472 100644 --- a/be/src/olap/rowset/segment_v2/vertical_segment_writer.h +++ b/be/src/olap/rowset/segment_v2/vertical_segment_writer.h @@ -160,10 +160,6 @@ class VerticalSegmentWriter { void _serialize_block_to_row_column(vectorized::Block& block); Status _append_block_with_partial_content(RowsInBlock& data, vectorized::Block& full_block); Status _append_block_with_variant_subcolumns(RowsInBlock& data); - Status _fill_missing_columns(vectorized::MutableColumns& mutable_full_columns, - const std::vector& use_default_or_null_flag, - bool has_default_or_nullable, const size_t& segment_start_pos, - const vectorized::Block* block); Status _generate_key_index( RowsInBlock& data, std::vector& key_columns, vectorized::IOlapColumnDataAccessor* seq_column, @@ -230,7 +226,6 @@ class VerticalSegmentWriter { std::shared_ptr _mow_context; // group every rowset-segment row id to speed up reader - PartialUpdateReadPlan _rssid_to_rid; std::map _rsid_to_rowset; std::vector _batched_blocks; diff --git a/be/src/olap/tablet_schema.cpp b/be/src/olap/tablet_schema.cpp index 78854a1534e2d8..095439e4d5b393 100644 --- a/be/src/olap/tablet_schema.cpp +++ b/be/src/olap/tablet_schema.cpp @@ -1476,7 +1476,7 @@ vectorized::Block TabletSchema::create_block(bool ignore_dropped_col) const { return block; } -vectorized::Block TabletSchema::create_block_by_cids(const std::vector& cids) { +vectorized::Block TabletSchema::create_block_by_cids(const std::vector& cids) const { vectorized::Block block; for (const auto& cid : cids) { const auto& col = *_cols[cid]; diff --git a/be/src/olap/tablet_schema.h b/be/src/olap/tablet_schema.h index 290f62743f73a4..251c0b58eacaf7 100644 --- a/be/src/olap/tablet_schema.h +++ b/be/src/olap/tablet_schema.h @@ -477,7 +477,7 @@ class TabletSchema { return str; } - vectorized::Block create_block_by_cids(const std::vector& cids); + vectorized::Block create_block_by_cids(const std::vector& cids) const; std::shared_ptr copy_without_variant_extracted_columns(); InvertedIndexStorageFormatPB get_inverted_index_storage_format() const {