Skip to content

Commit 168031e

Browse files
committed
[improve](group commit) Group commit support max filter ratio when rows is less than value in config
1 parent 869fffd commit 168031e

19 files changed

+455
-26
lines changed

be/src/common/config.cpp

+2-1
Original file line numberDiff line numberDiff line change
@@ -1091,10 +1091,11 @@ DEFINE_Int16(bitmap_serialize_version, "1");
10911091
DEFINE_String(group_commit_replay_wal_dir, "./wal");
10921092
DEFINE_Int32(group_commit_replay_wal_retry_num, "10");
10931093
DEFINE_Int32(group_commit_replay_wal_retry_interval_seconds, "5");
1094-
DEFINE_Bool(wait_internal_group_commit_finish, "false");
10951094

10961095
// the count of thread to group commit insert
10971096
DEFINE_Int32(group_commit_insert_threads, "10");
1097+
DEFINE_Int32(group_commit_memory_rows_for_max_filter_ratio, "10000");
1098+
DEFINE_Bool(wait_internal_group_commit_finish, "false");
10981099

10991100
DEFINE_mInt32(scan_thread_nice_value, "0");
11001101
DEFINE_mInt32(tablet_schema_cache_recycle_interval, "86400");

be/src/common/config.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -1166,10 +1166,11 @@ DECLARE_Int16(bitmap_serialize_version);
11661166
DECLARE_String(group_commit_replay_wal_dir);
11671167
DECLARE_Int32(group_commit_replay_wal_retry_num);
11681168
DECLARE_Int32(group_commit_replay_wal_retry_interval_seconds);
1169-
DECLARE_Bool(wait_internal_group_commit_finish);
11701169

11711170
// This config can be set to limit thread number in group commit insert thread pool.
11721171
DECLARE_mInt32(group_commit_insert_threads);
1172+
DECLARE_mInt32(group_commit_memory_rows_for_max_filter_ratio);
1173+
DECLARE_Bool(wait_internal_group_commit_finish);
11731174

11741175
// The configuration item is used to lower the priority of the scanner thread,
11751176
// typically employed to ensure CPU scheduling for write operations.

be/src/runtime/group_commit_mgr.cpp

+1-3
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ void LoadBlockQueue::cancel(const Status& st) {
130130

131131
Status GroupCommitTable::get_first_block_load_queue(
132132
int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
133-
std::shared_ptr<vectorized::Block> block,
134133
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
135134
DCHECK(table_id == _table_id);
136135
{
@@ -418,7 +417,6 @@ void GroupCommitMgr::stop() {
418417

419418
Status GroupCommitMgr::get_first_block_load_queue(
420419
int64_t db_id, int64_t table_id, int64_t base_schema_version, const UniqueId& load_id,
421-
std::shared_ptr<vectorized::Block> block,
422420
std::shared_ptr<LoadBlockQueue>& load_block_queue) {
423421
std::shared_ptr<GroupCommitTable> group_commit_table;
424422
{
@@ -431,7 +429,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
431429
group_commit_table = _table_map[table_id];
432430
}
433431
return group_commit_table->get_first_block_load_queue(table_id, base_schema_version, load_id,
434-
block, load_block_queue);
432+
load_block_queue);
435433
}
436434

437435
Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& instance_id,

be/src/runtime/group_commit_mgr.h

-2
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,6 @@ class GroupCommitTable {
9494
_all_block_queues_bytes(all_block_queue_bytes) {};
9595
Status get_first_block_load_queue(int64_t table_id, int64_t base_schema_version,
9696
const UniqueId& load_id,
97-
std::shared_ptr<vectorized::Block> block,
9897
std::shared_ptr<LoadBlockQueue>& load_block_queue);
9998
Status get_load_block_queue(const TUniqueId& instance_id,
10099
std::shared_ptr<LoadBlockQueue>& load_block_queue);
@@ -134,7 +133,6 @@ class GroupCommitMgr {
134133
std::shared_ptr<LoadBlockQueue>& load_block_queue);
135134
Status get_first_block_load_queue(int64_t db_id, int64_t table_id, int64_t base_schema_version,
136135
const UniqueId& load_id,
137-
std::shared_ptr<vectorized::Block> block,
138136
std::shared_ptr<LoadBlockQueue>& load_block_queue);
139137

140138
private:

be/src/runtime/stream_load/stream_load_executor.cpp

+9
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,15 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
101101
ctx->number_loaded_rows);
102102
}
103103
} else {
104+
if (ctx->group_commit && status->is<DATA_QUALITY_ERROR>()) {
105+
ctx->number_total_rows = state->num_rows_load_total();
106+
ctx->number_loaded_rows = state->num_rows_load_success();
107+
ctx->number_filtered_rows = state->num_rows_load_filtered();
108+
ctx->number_unselected_rows = state->num_rows_load_unselected();
109+
if (ctx->number_filtered_rows > 0 && !state->get_error_log_file_path().empty()) {
110+
ctx->error_url = to_load_error_http_path(state->get_error_log_file_path());
111+
}
112+
}
104113
LOG(WARNING) << "fragment execute failed"
105114
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
106115
<< ", err_msg=" << status->to_string() << ", " << ctx->brief();

be/src/vec/sink/group_commit_block_sink.cpp

+43-11
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ Status GroupCommitBlockSink::init(const TDataSink& t_sink) {
4949
_base_schema_version = table_sink.base_schema_version;
5050
_group_commit_mode = table_sink.group_commit_mode;
5151
_load_id = table_sink.load_id;
52+
_max_filter_ratio = table_sink.max_filter_ratio;
5253
return Status::OK();
5354
}
5455

@@ -84,18 +85,28 @@ Status GroupCommitBlockSink::open(RuntimeState* state) {
8485
}
8586

8687
Status GroupCommitBlockSink::close(RuntimeState* state, Status close_status) {
87-
if (_load_block_queue) {
88-
_load_block_queue->remove_load_id(_load_id);
89-
}
9088
RETURN_IF_ERROR(DataSink::close(state, close_status));
9189
RETURN_IF_ERROR(close_status);
92-
// wait to wal
9390
int64_t total_rows = state->num_rows_load_total();
9491
int64_t loaded_rows = state->num_rows_load_total();
95-
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows -
96-
loaded_rows);
9792
state->set_num_rows_load_total(loaded_rows + state->num_rows_load_unselected() +
9893
state->num_rows_load_filtered());
94+
state->update_num_rows_load_filtered(_block_convertor->num_filtered_rows() + total_rows -
95+
loaded_rows);
96+
if (!_is_block_appended) {
97+
// if not meet the max_filter_ratio, we should return error status directly
98+
int64_t num_selected_rows =
99+
state->num_rows_load_total() - state->num_rows_load_unselected();
100+
if (num_selected_rows > 0 &&
101+
(double)state->num_rows_load_filtered() / num_selected_rows > _max_filter_ratio) {
102+
return Status::DataQualityError("too many filtered rows");
103+
}
104+
RETURN_IF_ERROR(_add_blocks());
105+
}
106+
if (_load_block_queue) {
107+
_load_block_queue->remove_load_id(_load_id);
108+
}
109+
// wait to wal
99110
auto st = Status::OK();
100111
if (_load_block_queue && (_load_block_queue->wait_internal_group_commit_finish ||
101112
_group_commit_mode == TGroupCommitMode::SYNC_MODE)) {
@@ -137,6 +148,8 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
137148
if (block->rows() == 0) {
138149
return Status::OK();
139150
}
151+
// the insert group commit tvf always accept nullable columns, so we should convert
152+
// the non-nullable columns to nullable columns
140153
for (int i = 0; i < block->columns(); ++i) {
141154
if (block->get_by_position(i).type->is_nullable()) {
142155
continue;
@@ -155,16 +168,35 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
155168
}
156169
std::shared_ptr<vectorized::Block> output_block = vectorized::Block::create_shared();
157170
output_block->swap(cur_mutable_block->to_block());
171+
if (!_is_block_appended && state->num_rows_load_total() + state->num_rows_load_unselected() +
172+
state->num_rows_load_filtered() <=
173+
config::group_commit_memory_rows_for_max_filter_ratio) {
174+
_blocks.emplace_back(output_block);
175+
} else {
176+
if (!_is_block_appended) {
177+
RETURN_IF_ERROR(_add_blocks());
178+
}
179+
RETURN_IF_ERROR(_load_block_queue->add_block(output_block));
180+
}
181+
return Status::OK();
182+
}
183+
184+
Status GroupCommitBlockSink::_add_blocks() {
185+
DCHECK(_is_block_appended == false);
158186
TUniqueId load_id;
159187
load_id.__set_hi(_load_id.hi);
160188
load_id.__set_lo(_load_id.lo);
161189
if (_load_block_queue == nullptr) {
162-
RETURN_IF_ERROR(state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
163-
_db_id, _table_id, _base_schema_version, load_id, block, _load_block_queue));
164-
state->set_import_label(_load_block_queue->label);
165-
state->set_wal_id(_load_block_queue->txn_id);
190+
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
191+
_db_id, _table_id, _base_schema_version, load_id, _load_block_queue));
192+
_state->set_import_label(_load_block_queue->label);
193+
_state->set_wal_id(_load_block_queue->txn_id);
194+
}
195+
for (auto it = _blocks.begin(); it != _blocks.end(); ++it) {
196+
RETURN_IF_ERROR(_load_block_queue->add_block(*it));
166197
}
167-
RETURN_IF_ERROR(_load_block_queue->add_block(output_block));
198+
_is_block_appended = true;
199+
_blocks.clear();
168200
return Status::OK();
169201
}
170202

be/src/vec/sink/group_commit_block_sink.h

+5
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ class GroupCommitBlockSink : public DataSink {
4747

4848
private:
4949
Status _add_block(RuntimeState* state, std::shared_ptr<vectorized::Block> block);
50+
Status _add_blocks();
5051

5152
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
5253

@@ -65,6 +66,10 @@ class GroupCommitBlockSink : public DataSink {
6566
TGroupCommitMode::type _group_commit_mode;
6667
UniqueId _load_id;
6768
std::shared_ptr<LoadBlockQueue> _load_block_queue;
69+
// used to calculate if meet the max filter ratio
70+
std::vector<std::shared_ptr<vectorized::Block>> _blocks;
71+
bool _is_block_appended = false;
72+
double _max_filter_ratio = 0.0;
6873
};
6974

7075
} // namespace vectorized

fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -970,7 +970,7 @@ private DataSink createDataSink() throws AnalysisException {
970970
if (isGroupCommitStreamLoadSql) {
971971
sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple,
972972
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(),
973-
ConnectContext.get().getSessionVariable().getGroupCommit());
973+
ConnectContext.get().getSessionVariable().getGroupCommit(), 0);
974974
} else {
975975
sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
976976
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());

fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitBlockSink.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,13 @@
2929

3030
public class GroupCommitBlockSink extends OlapTableSink {
3131
private String groupCommit;
32+
private double maxFilterRatio;
3233

3334
public GroupCommitBlockSink(OlapTable dstTable, TupleDescriptor tupleDescriptor, List<Long> partitionIds,
34-
boolean singleReplicaLoad, String groupCommit) {
35+
boolean singleReplicaLoad, String groupCommit, double maxFilterRatio) {
3536
super(dstTable, tupleDescriptor, partitionIds, singleReplicaLoad);
3637
this.groupCommit = groupCommit;
38+
this.maxFilterRatio = maxFilterRatio;
3739
}
3840

3941
protected TDataSinkType getDataSinkType() {
@@ -45,6 +47,7 @@ protected TDataSink toThrift() {
4547
TGroupCommitMode groupCommitMode = parseGroupCommit(groupCommit);
4648
Preconditions.checkNotNull(groupCommitMode, "Group commit is: " + groupCommit);
4749
tDataSink.olap_table_sink.setGroupCommitMode(groupCommitMode);
50+
tDataSink.olap_table_sink.setMaxFilterRatio(maxFilterRatio);
4851
return tDataSink;
4952
}
5053

fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitPlanner.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -98,11 +98,12 @@ public GroupCommitPlanner(Database db, OlapTable table, List<String> targetColum
9898
}
9999
streamLoadPutRequest
100100
.setDb(db.getFullName())
101-
.setMaxFilterRatio(1)
101+
.setMaxFilterRatio(ConnectContext.get().getSessionVariable().enableInsertStrict ? 0 : 1)
102102
.setTbl(table.getName())
103103
.setFileType(TFileType.FILE_STREAM).setFormatType(TFileFormatType.FORMAT_CSV_PLAIN)
104104
.setMergeType(TMergeType.APPEND).setThriftRpcTimeoutMs(5000).setLoadId(queryId)
105-
.setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit);
105+
.setTrimDoubleQuotes(true).setGroupCommitMode(groupCommit)
106+
.setStrictMode(ConnectContext.get().getSessionVariable().enableInsertStrict);
106107
StreamLoadTask streamLoadTask = StreamLoadTask.fromTStreamLoadPutRequest(streamLoadPutRequest);
107108
StreamLoadPlanner planner = new StreamLoadPlanner(db, table, streamLoadTask);
108109
// Will using load id as query id in fragment

fe/fe-core/src/main/java/org/apache/doris/planner/StreamLoadPlanner.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,8 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
261261
OlapTableSink olapTableSink;
262262
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
263263
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
264-
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit());
264+
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(),
265+
taskInfo.getMaxFilterRatio());
265266
} else {
266267
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
267268
}
@@ -481,7 +482,8 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns
481482
OlapTableSink olapTableSink;
482483
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
483484
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
484-
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit());
485+
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(),
486+
taskInfo.getMaxFilterRatio());
485487
} else {
486488
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
487489
}

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,7 @@
175175
import com.google.common.collect.Maps;
176176
import com.google.common.collect.Sets;
177177
import com.google.protobuf.ByteString;
178+
import com.google.protobuf.ProtocolStringList;
178179
import lombok.Setter;
179180
import org.apache.logging.log4j.LogManager;
180181
import org.apache.logging.log4j.Logger;
@@ -1887,7 +1888,9 @@ private void handleInsertStmt() throws Exception {
18871888
List<InternalService.PDataRow> rows = groupCommitPlanner.getRows(nativeInsertStmt);
18881889
PGroupCommitInsertResponse response = groupCommitPlanner.executeGroupCommitInsert(context, rows);
18891890
TStatusCode code = TStatusCode.findByValue(response.getStatus().getStatusCode());
1890-
if (code == TStatusCode.DATA_QUALITY_ERROR) {
1891+
ProtocolStringList errorMsgsList = response.getStatus().getErrorMsgsList();
1892+
if (code == TStatusCode.DATA_QUALITY_ERROR && !errorMsgsList.isEmpty() && errorMsgsList.get(0)
1893+
.contains("schema version not match")) {
18911894
LOG.info("group commit insert failed. stmt: {}, backend id: {}, status: {}, "
18921895
+ "schema version: {}, retry: {}", insertStmt.getOrigStmt().originStmt,
18931896
groupCommitPlanner.getBackend().getId(),

gensrc/thrift/DataSinks.thrift

+1
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ struct TOlapTableSink {
266266
// used by GroupCommitBlockSink
267267
21: optional i64 base_schema_version
268268
22: optional TGroupCommitMode group_commit_mode
269+
23: optional double max_filter_ratio
269270
}
270271

271272
struct TDataSink {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
-- This file is automatically generated. You should know what you did if you want to edit this
2+
-- !sql --
3+
1 a 10
4+
2 \N -1
5+
3 a 10
6+
9 a \N
7+
8+
-- !sql --
9+
1 a 10
10+
2 \N -1
11+
3 a 10
12+
6 a \N
13+
7 a \N
14+
9 a \N
15+
16+
-- !sql --
17+
1 a 21
18+
1 a 21
19+
2 b 22
20+
2 b 22
21+
3 c 23
22+
3 c 23
23+
4 d \N
24+
25+
-- !sql --
26+
1 a 21
27+
1 a 21
28+
2 b 22
29+
2 b 22
30+
3 c 23
31+
3 c 23
32+
4 d \N
33+
4 d \N
34+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
1,a,21
2+
2,b,22
3+
3,c,23
4+
4,d,a
Binary file not shown.

0 commit comments

Comments
 (0)