Skip to content

Commit f2a779c

Browse files
committed
[fix](group commit) fix some group commit problem (apache#48621)
Problem Summary: 1. fix stream load with `unique_key_update_mode` 2. fix `insert into t select * from ctas1 union all select * from ctas2;`
1 parent 3e2709e commit f2a779c

File tree

9 files changed

+140
-11
lines changed

9 files changed

+140
-11
lines changed

be/src/http/action/stream_load.cpp

+6-1
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,12 @@ Status StreamLoadAction::_handle_group_commit(HttpRequest* req,
797797
iequal(req->header(HTTP_PARTIAL_COLUMNS), "true");
798798
auto temp_partitions = !req->header(HTTP_TEMP_PARTITIONS).empty();
799799
auto partitions = !req->header(HTTP_PARTITIONS).empty();
800-
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit) {
800+
auto update_mode =
801+
!req->header(HTTP_UNIQUE_KEY_UPDATE_MODE).empty() &&
802+
(iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FIXED_COLUMNS") ||
803+
iequal(req->header(HTTP_UNIQUE_KEY_UPDATE_MODE), "UPDATE_FLEXIBLE_COLUMNS"));
804+
if (!partial_columns && !partitions && !temp_partitions && !ctx->two_phase_commit &&
805+
!update_mode) {
801806
if (!config::wait_internal_group_commit_finish && !ctx->label.empty()) {
802807
return Status::InvalidArgument("label and group_commit can't be set at the same time");
803808
}

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.doris.nereids.NereidsPlanner;
3333
import org.apache.doris.nereids.analyzer.UnboundTableSink;
3434
import org.apache.doris.nereids.exceptions.AnalysisException;
35+
import org.apache.doris.nereids.trees.plans.Plan;
3536
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
3637
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
3738
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
@@ -131,11 +132,14 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Logi
131132
() -> "not allowModifyMTMVData"));
132133
conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext
133134
&& ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
135+
Plan tableSinkChild = tableSink.child();
134136
conditions.add(Pair.of(
135-
() -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion
136-
|| tableSink.child() instanceof LogicalInlineTable,
137-
() -> "not one row relation or union or inline table, class: " + tableSink.child().getClass()
138-
.getName()));
137+
() -> tableSinkChild instanceof OneRowRelation || (tableSinkChild instanceof LogicalUnion
138+
&& tableSinkChild.getExpressions().size() > 0)
139+
|| tableSinkChild instanceof LogicalInlineTable,
140+
() -> "should be one row relation or union or inline table, class: "
141+
+ tableSinkChild.getClass().getName() + (tableSinkChild instanceof LogicalUnion
142+
? ", expression size is 0" : "")));
139143
ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean()));
140144
if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
141145
for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) {

regression-test/data/insert_p0/insert_group_commit_with_large_data.out

+12
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,15 @@
1313
13 23 36.0
1414
14 24 38.0
1515

16+
-- !select_cte1 --
17+
11111 11111
18+
19+
-- !select_cte2 --
20+
1111111111 1111111111
21+
22+
-- !select_cte3 --
23+
11111 11111
24+
11111 11111
25+
1111111111 1111111111
26+
1111111111 1111111111
27+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
{"k": 1, "v1": 10}
2+
{"k": 2, "v2": 20, "v5": 25}
3+
{"k": 3, "v3": 30}
4+
{"k": 4, "v4": 20, "v1": 43, "v3": 99}
5+
{"k": 5, "v5": null}
6+
{"k": 6, "v1": 999, "v3": 777}
7+
{"k": 2, "v4": 222}
8+
{"k": 1, "v2": 111, "v3": 111}

regression-test/data/load_p0/stream_load/test_group_commit_stream_load.out

+17
Original file line numberDiff line numberDiff line change
@@ -25,3 +25,20 @@
2525
-- !sql --
2626
1201144
2727

28+
-- !sql --
29+
0 0 0 0 0 0
30+
1 1 1 1 1 1
31+
2 2 2 2 2 2
32+
3 3 3 3 3 3
33+
4 4 4 4 4 4
34+
5 5 5 5 5 5
35+
36+
-- !read_json_by_line --
37+
0 0 0 0 0 0
38+
1 10 111 111 1 1 4,5,6
39+
2 2 20 2 222 25 1,3,6
40+
3 3 3 30 3 3 1,2,4,5,6
41+
4 43 4 99 20 4 2,5,6
42+
5 5 5 5 5 \N 1,2,3,4,6
43+
6 999 9876 777 1234 \N 2,4,5,6
44+

regression-test/suites/ddl_p0/test_create_table_properties.groovy

+2-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,8 @@ suite("test_create_table_properties") {
300300
assertTrue(false, "should not be able to execute")
301301
}
302302
catch (Exception ex) {
303-
assertTrue(ex.getMessage().contains("Insert has filtered data in strict mode"))
303+
def exception_str = isGroupCommitMode() ? "too many filtered rows" : "Insert has filtered data in strict mode"
304+
assertTrue(ex.getMessage().contains(exception_str))
304305
} finally {
305306
}
306307
// alter table add default partition

regression-test/suites/ddl_p0/test_ctas.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ suite("test_ctas") {
106106

107107
test {
108108
sql """show load from ${dbname}"""
109-
rowNum 6
109+
rowNum isGroupCommitMode() ? 4: 6
110110
}
111111

112112
sql """

regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy

+50-4
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,10 @@ suite("insert_group_commit_with_large_data") {
101101
sql """ drop table if exists ${testTable}; """
102102
sql """create table ${testTable}(a int,b int,c double generated always as (abs(a+b)) not null)
103103
DISTRIBUTED BY HASH(a) PROPERTIES("replication_num" = "1", "group_commit_interval_ms" = "40");"""
104-
sql "INSERT INTO ${testTable} values(6,7,default);"
105-
sql "INSERT INTO ${testTable}(a,b) values(1,2);"
106-
sql "INSERT INTO ${testTable} values(3,5,default);"
104+
sql " set group_commit = async_mode; "
105+
group_commit_insert "INSERT INTO ${testTable} values(6,7,default);", 1
106+
group_commit_insert "INSERT INTO ${testTable}(a,b) values(1,2);", 1
107+
group_commit_insert "INSERT INTO ${testTable} values(3,5,default);", 1
107108
getRowCount(3)
108109
qt_select1 "select * from ${testTable} order by 1,2,3;"
109110

@@ -123,12 +124,57 @@ suite("insert_group_commit_with_large_data") {
123124
if (exception != null) {
124125
throw exception
125126
}
126-
log.info("Stream load result: ${result}".toString())
127127
def json = parseJson(result)
128128
assertEquals("success", json.Status.toLowerCase())
129129
assertEquals(4, json.NumberTotalRows)
130130
}
131131
}
132132
getRowCount(7)
133133
qt_select2 "select * from ${testTable} order by 1,2,3;"
134+
135+
try {
136+
sql """set group_commit = off_mode;"""
137+
sql "drop table if exists gc_ctas1"
138+
sql "drop table if exists gc_ctas2"
139+
sql "drop table if exists gc_ctas3"
140+
sql '''
141+
CREATE TABLE IF NOT EXISTS `gc_ctas1` (
142+
`k1` varchar(5) NULL,
143+
`k2` varchar(5) NULL
144+
) ENGINE=OLAP
145+
DUPLICATE KEY(`k1`)
146+
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
147+
PROPERTIES (
148+
"replication_allocation" = "tag.location.default: 1"
149+
);
150+
'''
151+
sql '''
152+
CREATE TABLE IF NOT EXISTS `gc_ctas2` (
153+
`k1` varchar(10) NULL,
154+
`k2` varchar(10) NULL
155+
) ENGINE=OLAP
156+
DUPLICATE KEY(`k1`)
157+
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
158+
PROPERTIES (
159+
"replication_allocation" = "tag.location.default: 1"
160+
);
161+
'''
162+
sql ''' insert into gc_ctas1 values('11111','11111'); '''
163+
sql ''' insert into gc_ctas2 values('1111111111','1111111111'); '''
164+
sql "sync"
165+
order_qt_select_cte1 """ select * from gc_ctas1; """
166+
order_qt_select_cte2 """ select * from gc_ctas2; """
167+
sql """set group_commit = async_mode;"""
168+
sql '''
169+
create table `gc_ctas3`(k1, k2)
170+
PROPERTIES("replication_num" = "1")
171+
as select * from gc_ctas1
172+
union all
173+
select * from gc_ctas2;
174+
'''
175+
sql " insert into gc_ctas3 select * from gc_ctas1 union all select * from gc_ctas2;"
176+
sql "sync"
177+
order_qt_select_cte3 """ select * from gc_ctas3; """
178+
} finally {
179+
}
134180
}

regression-test/suites/load_p0/stream_load/test_group_commit_stream_load.groovy

+36
Original file line numberDiff line numberDiff line change
@@ -306,4 +306,40 @@ suite("test_group_commit_stream_load") {
306306
} finally {
307307
// try_sql("DROP TABLE ${tableName}")
308308
}
309+
310+
// stream load with unique_key_update_mode
311+
tableName = "test_group_commit_stream_load_update"
312+
sql """ DROP TABLE IF EXISTS ${tableName} """
313+
sql """ CREATE TABLE ${tableName} (
314+
`k` int(11) NULL,
315+
`v1` BIGINT NULL,
316+
`v2` BIGINT NULL DEFAULT "9876",
317+
`v3` BIGINT NOT NULL,
318+
`v4` BIGINT NOT NULL DEFAULT "1234",
319+
`v5` BIGINT NULL
320+
) UNIQUE KEY(`k`) DISTRIBUTED BY HASH(`k`) BUCKETS 1
321+
PROPERTIES(
322+
"replication_num" = "1",
323+
"enable_unique_key_merge_on_write" = "true",
324+
"light_schema_change" = "true",
325+
"enable_unique_key_skip_bitmap_column" = "true"); """
326+
327+
def show_res = sql "show create table ${tableName}"
328+
assertTrue(show_res.toString().contains('"enable_unique_key_skip_bitmap_column" = "true"'))
329+
sql """insert into ${tableName} select number, number, number, number, number, number from numbers("number" = "6"); """
330+
qt_sql "select k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} order by k;"
331+
332+
streamLoad {
333+
table "${tableName}"
334+
set 'group_commit', 'async_mode'
335+
set 'format', 'json'
336+
set 'read_json_by_line', 'true'
337+
set 'strict_mode', 'false'
338+
set 'unique_key_update_mode', 'update_FLEXIBLE_COLUMNS'
339+
file "test1.json"
340+
time 20000
341+
unset 'label'
342+
}
343+
qt_read_json_by_line "select k,v1,v2,v3,v4,v5,BITMAP_TO_STRING(__DORIS_SKIP_BITMAP_COL__) from ${tableName} order by k;"
344+
309345
}

0 commit comments

Comments
 (0)