Skip to content

Commit

Permalink
Merge branch 'master' into fix-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
felixwluo authored Jun 25, 2024
2 parents d99c8ed + d01d5c7 commit 1c7cd36
Show file tree
Hide file tree
Showing 238 changed files with 7,064 additions and 1,579 deletions.
1 change: 1 addition & 0 deletions .licenserc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ header:
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_orc.hql"
- "docker/thirdparties/docker-compose/hive/scripts/create_tpch1_parquet.hql"
- "docker/thirdparties/docker-compose/hive/scripts/preinstalled_data/"
- "docker/thirdparties/docker-compose/hive/scripts/suites/**"
- "docker/thirdparties/docker-compose/iceberg/spark-defaults.conf.tpl"
- "conf/mysql_ssl_default_certificate/*"
- "conf/mysql_ssl_default_certificate/client_certificate/ca.pem"
Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/pb_convert.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, const TabletSchemaPB
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_idxes()->CopyFrom(in.cluster_key_idxes());
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
}

void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in) {
Expand All @@ -299,6 +300,7 @@ void doris_tablet_schema_to_cloud(TabletSchemaCloudPB* out, TabletSchemaPB&& in)
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_idxes()->Swap(in.mutable_cluster_key_idxes());
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
}

TabletSchemaPB cloud_tablet_schema_to_doris(const TabletSchemaCloudPB& in) {
Expand Down Expand Up @@ -336,6 +338,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, const TabletSchemaCloudPB
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_idxes()->CopyFrom(in.cluster_key_idxes());
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->CopyFrom(in.row_store_column_unique_ids());
}

void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in) {
Expand All @@ -361,6 +364,7 @@ void cloud_tablet_schema_to_doris(TabletSchemaPB* out, TabletSchemaCloudPB&& in)
out->set_skip_write_index_on_load(in.skip_write_index_on_load());
out->mutable_cluster_key_idxes()->Swap(in.mutable_cluster_key_idxes());
out->set_is_dynamic_schema(in.is_dynamic_schema());
out->mutable_row_store_column_unique_ids()->Swap(in.mutable_row_store_column_unique_ids());
}

TabletMetaCloudPB doris_tablet_meta_to_cloud(const TabletMetaPB& in) {
Expand Down
93 changes: 54 additions & 39 deletions be/src/http/action/stream_load.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include <time.h>

#include <algorithm>
#include <functional>
#include <future>
#include <map>
#include <sstream>
Expand Down Expand Up @@ -108,20 +109,67 @@ void StreamLoadAction::handle(HttpRequest* req) {

// status already set to fail
if (ctx->status.ok()) {
ctx->status = _handle(ctx);
ctx->status = _handle(ctx, req);
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
_send_reply(ctx, req);
}
}
}

Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
RETURN_IF_ERROR(ctx->body_sink->finish());
if (!ctx->use_streaming) {
// we need to close file first, then execute_plan_fragment here
ctx->body_sink.reset();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
ctx,
[req, this](std::shared_ptr<StreamLoadContext> ctx) { _on_finish(ctx, req); }));
}

return Status::OK();
}

void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
ctx->status = ctx->future.get();
if (ctx->status.ok()) {
if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id="
<< ctx->id.to_string();
} else if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
ctx->status = _exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
ctx->status = _exec_env->stream_load_executor()->commit_txn(ctx.get());
ctx->commit_and_publish_txn_cost_nanos =
MonotonicNanos() - commit_and_publish_start_time;
}
}
_send_reply(ctx, req);
}

void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req) {
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;

if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
if (ctx->body_sink.get() != nullptr) {
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
}
Expand All @@ -141,42 +189,6 @@ void StreamLoadAction::handle(HttpRequest* req) {
streaming_load_duration_ms->increment(ctx->load_cost_millis);
}

Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
LOG(WARNING) << "recevie body don't equal with body bytes, body_bytes=" << ctx->body_bytes
<< ", receive_bytes=" << ctx->receive_bytes << ", id=" << ctx->id;
return Status::InternalError("receive body don't equal with body bytes");
}

// if we use non-streaming, MessageBodyFileSink.finish will close the file
RETURN_IF_ERROR(ctx->body_sink->finish());
if (!ctx->use_streaming) {
// we need to close file first, then execute_plan_fragment here
ctx->body_sink.reset();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx));
}

// wait stream load finish
RETURN_IF_ERROR(ctx->future.get());

if (ctx->group_commit) {
LOG(INFO) << "skip commit because this is group commit, pipe_id=" << ctx->id.to_string();
return Status::OK();
}

if (ctx->two_phase_commit) {
int64_t pre_commit_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - pre_commit_start_time;
} else {
// If put file success we need commit this load
int64_t commit_and_publish_start_time = MonotonicNanos();
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - commit_and_publish_start_time;
}
return Status::OK();
}

int StreamLoadAction::on_header(HttpRequest* req) {
streaming_load_current_processing->increment(1);

Expand Down Expand Up @@ -681,7 +693,10 @@ Status StreamLoadAction::_process_put(HttpRequest* http_req,
return Status::OK();
}

return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
return _exec_env->stream_load_executor()->execute_plan_fragment(
ctx, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
_on_finish(ctx, http_req);
});
}

Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* file_path) {
Expand Down
4 changes: 3 additions & 1 deletion be/src/http/action/stream_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ class StreamLoadAction : public HttpHandler {

private:
Status _on_header(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx);
Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _process_put(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, const std::string& str);
Status _handle_group_commit(HttpRequest* http_req, std::shared_ptr<StreamLoadContext> ctx);
void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);

private:
ExecEnv* _exec_env;
Expand Down
20 changes: 9 additions & 11 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -467,7 +467,6 @@ void SegmentWriter::_serialize_block_to_row_column(vectorized::Block& block) {
// 3. set columns to data convertor and then write all columns
Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* block,
size_t row_pos, size_t num_rows) {
auto* tablet = static_cast<Tablet*>(_tablet.get());
if (block->columns() <= _tablet_schema->num_key_columns() ||
block->columns() >= _tablet_schema->num_columns()) {
return Status::InvalidArgument(
Expand Down Expand Up @@ -533,7 +532,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*

std::vector<RowsetSharedPtr> specified_rowsets;
{
std::shared_lock rlock(tablet->get_header_lock());
std::shared_lock rlock(_tablet->get_header_lock());
specified_rowsets = _mow_context->rowset_ptrs;
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing rowsets here will lead to problems.
Expand Down Expand Up @@ -586,8 +585,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
Expand Down Expand Up @@ -636,7 +635,7 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);
_tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);
}

if (st.is<KEY_ALREADY_EXISTS>()) {
Expand All @@ -655,8 +654,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block*
CHECK_EQ(use_default_or_null_flag.size(), num_rows);

if (config::enable_merge_on_write_correctness_check) {
tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
_mow_context->rowset_ids);
_tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
_mow_context->rowset_ids);
}

// read and fill block
Expand Down Expand Up @@ -728,7 +727,6 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
// TODO(plat1ko): cloud mode
return Status::NotSupported("fill_missing_columns");
}
auto tablet = static_cast<Tablet*>(_tablet.get());
// create old value columns
const auto& cids_missing = _opts.rowset_ctx->partial_update_info->missing_cids;
auto old_value_block = _tablet_schema->create_block_by_cids(cids_missing);
Expand All @@ -747,7 +745,7 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
read_index[id_and_pos.pos] = read_idx++;
}
if (has_row_column) {
auto st = tablet->fetch_value_through_row_column(
auto st = _tablet->fetch_value_through_row_column(
rowset, *_tablet_schema, seg_it.first, rids, cids_missing, old_value_block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
Expand All @@ -758,8 +756,8 @@ Status SegmentWriter::fill_missing_columns(vectorized::MutableColumns& mutable_f
auto mutable_old_columns = old_value_block.mutate_columns();
for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
TabletColumn tablet_column = _tablet_schema->column(cids_missing[cid]);
auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column,
mutable_old_columns[cid]);
auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column,
mutable_old_columns[cid]);
// set read value to output block
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value by rowids";
Expand Down
20 changes: 9 additions & 11 deletions be/src/olap/rowset/segment_v2/vertical_segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,6 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
DCHECK(_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write);
DCHECK(_opts.rowset_ctx->partial_update_info != nullptr);

auto tablet = static_cast<Tablet*>(_tablet.get());
// create full block and fill with input columns
full_block = _tablet_schema->create_block();
const auto& including_cids = _opts.rowset_ctx->partial_update_info->update_cids;
Expand Down Expand Up @@ -364,7 +363,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
{
DBUG_EXECUTE_IF("VerticalSegmentWriter._append_block_with_partial_content.sleep",
{ sleep(60); })
std::shared_lock rlock(tablet->get_header_lock());
std::shared_lock rlock(_tablet->get_header_lock());
specified_rowsets = _mow_context->rowset_ptrs;
if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
// Only when this is a strict mode partial update that missing rowsets here will lead to problems.
Expand Down Expand Up @@ -417,8 +416,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
RowLocation loc;
// save rowset shared ptr so this rowset wouldn't delete
RowsetSharedPtr rowset;
auto st = tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
auto st = _tablet->lookup_row_key(key, have_input_seq_column, specified_rowsets, &loc,
_mow_context->max_version, segment_caches, &rowset);
if (st.is<KEY_NOT_FOUND>()) {
if (_opts.rowset_ctx->partial_update_info->is_strict_mode) {
++num_rows_filtered;
Expand Down Expand Up @@ -466,7 +465,7 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
// partial update should not contain invisible columns
use_default_or_null_flag.emplace_back(false);
_rsid_to_rowset.emplace(rowset->rowset_id(), rowset);
tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);
_tablet->prepare_to_read(loc, segment_pos, &_rssid_to_rid);
}

if (st.is<KEY_ALREADY_EXISTS>()) {
Expand All @@ -485,8 +484,8 @@ Status VerticalSegmentWriter::_append_block_with_partial_content(RowsInBlock& da
CHECK_EQ(use_default_or_null_flag.size(), data.num_rows);

if (config::enable_merge_on_write_correctness_check) {
tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
_mow_context->rowset_ids);
_tablet->add_sentinel_mark_to_delete_bitmap(_mow_context->delete_bitmap.get(),
_mow_context->rowset_ids);
}

// read and fill block
Expand Down Expand Up @@ -554,7 +553,6 @@ Status VerticalSegmentWriter::_fill_missing_columns(
vectorized::MutableColumns& mutable_full_columns,
const std::vector<bool>& use_default_or_null_flag, bool has_default_or_nullable,
const size_t& segment_start_pos, const vectorized::Block* block) {
auto tablet = static_cast<Tablet*>(_tablet.get());
// create old value columns
const auto& missing_cids = _opts.rowset_ctx->partial_update_info->missing_cids;
auto old_value_block = _tablet_schema->create_block_by_cids(missing_cids);
Expand All @@ -574,7 +572,7 @@ Status VerticalSegmentWriter::_fill_missing_columns(
read_index[id_and_pos.pos] = read_idx++;
}
if (has_row_column) {
auto st = tablet->fetch_value_through_row_column(
auto st = _tablet->fetch_value_through_row_column(
rowset, *_tablet_schema, seg_it.first, rids, missing_cids, old_value_block);
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value through row column";
Expand All @@ -584,8 +582,8 @@ Status VerticalSegmentWriter::_fill_missing_columns(
}
for (size_t cid = 0; cid < mutable_old_columns.size(); ++cid) {
TabletColumn tablet_column = _tablet_schema->column(missing_cids[cid]);
auto st = tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column,
mutable_old_columns[cid]);
auto st = _tablet->fetch_value_by_rowids(rowset, seg_it.first, rids, tablet_column,
mutable_old_columns[cid]);
// set read value to output block
if (!st.ok()) {
LOG(WARNING) << "failed to fetch value by rowids";
Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/group_commit_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ Status GroupCommitOperatorX::get_block(RuntimeState* state, vectorized::Block* b
auto& local_state = get_local_state(state);
bool find_node = false;
while (!find_node && !*eos) {
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos));
RETURN_IF_ERROR(local_state.load_block_queue->get_block(state, block, &find_node, eos,
local_state._get_block_dependency));
}
return Status::OK();
}
Expand All @@ -42,8 +43,10 @@ Status GroupCommitLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(ScanLocalState<GroupCommitLocalState>::init(state, info));
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<GroupCommitOperatorX>();
_get_block_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"GroupCommitGetBlockDependency", true);
return state->exec_env()->group_commit_mgr()->get_load_block_queue(
p._table_id, state->fragment_instance_id(), load_block_queue);
p._table_id, state->fragment_instance_id(), load_block_queue, _get_block_dependency);
}

Status GroupCommitLocalState::_process_conjuncts(RuntimeState* state) {
Expand Down
Loading

0 comments on commit 1c7cd36

Please sign in to comment.