Skip to content

Commit a7402d6

Browse files
committed
some modify
1 parent 3e2709e commit a7402d6

File tree

5 files changed

+73
-10
lines changed

5 files changed

+73
-10
lines changed

fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/OlapGroupCommitInsertExecutor.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.doris.nereids.NereidsPlanner;
3333
import org.apache.doris.nereids.analyzer.UnboundTableSink;
3434
import org.apache.doris.nereids.exceptions.AnalysisException;
35+
import org.apache.doris.nereids.trees.plans.Plan;
3536
import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
3637
import org.apache.doris.nereids.trees.plans.commands.PrepareCommand;
3738
import org.apache.doris.nereids.trees.plans.logical.LogicalInlineTable;
@@ -131,11 +132,14 @@ protected static void analyzeGroupCommit(ConnectContext ctx, TableIf table, Logi
131132
() -> "not allowModifyMTMVData"));
132133
conditions.add(Pair.of(() -> !(insertCtx.isPresent() && insertCtx.get() instanceof OlapInsertCommandContext
133134
&& ((OlapInsertCommandContext) insertCtx.get()).isOverwrite()), () -> "is overwrite command"));
135+
Plan tableSinkChild = tableSink.child();
134136
conditions.add(Pair.of(
135-
() -> tableSink.child() instanceof OneRowRelation || tableSink.child() instanceof LogicalUnion
136-
|| tableSink.child() instanceof LogicalInlineTable,
137-
() -> "not one row relation or union or inline table, class: " + tableSink.child().getClass()
138-
.getName()));
137+
() -> tableSinkChild instanceof OneRowRelation || (tableSinkChild instanceof LogicalUnion
138+
&& tableSinkChild.getExpressions().size() > 0)
139+
|| tableSinkChild instanceof LogicalInlineTable,
140+
() -> "should be one row relation or union or inline table, class: "
141+
+ tableSinkChild.getClass().getName() + (tableSinkChild instanceof LogicalUnion
142+
? ", expression size is 0" : "")));
139143
ctx.setGroupCommit(conditions.stream().allMatch(p -> p.first.getAsBoolean()));
140144
if (!ctx.isGroupCommit() && LOG.isDebugEnabled()) {
141145
for (Pair<BooleanSupplier, Supplier<String>> pair : conditions) {

regression-test/data/insert_p0/insert_group_commit_with_large_data.out

+12
Original file line numberDiff line numberDiff line change
@@ -13,3 +13,15 @@
1313
13 23 36.0
1414
14 24 38.0
1515

16+
-- !select_cte1 --
17+
11111 11111
18+
19+
-- !select_cte2 --
20+
1111111111 1111111111
21+
22+
-- !select_cte3 --
23+
11111 11111
24+
11111 11111
25+
1111111111 1111111111
26+
1111111111 1111111111
27+

regression-test/suites/ddl_p0/test_create_table_properties.groovy

+2-1
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,8 @@ suite("test_create_table_properties") {
300300
assertTrue(false, "should not be able to execute")
301301
}
302302
catch (Exception ex) {
303-
assertTrue(ex.getMessage().contains("Insert has filtered data in strict mode"))
303+
def exception_str = isGroupCommitMode() ? "too many filtered rows" : "Insert has filtered data in strict mode"
304+
assertTrue(ex.getMessage().contains(exception_str))
304305
} finally {
305306
}
306307
// alter table add default partition

regression-test/suites/ddl_p0/test_ctas.groovy

+1-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ suite("test_ctas") {
106106

107107
test {
108108
sql """show load from ${dbname}"""
109-
rowNum 6
109+
rowNum isGroupCommitMode() ? 4: 6
110110
}
111111

112112
sql """

regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy

+50-4
Original file line numberDiff line numberDiff line change
@@ -101,9 +101,10 @@ suite("insert_group_commit_with_large_data") {
101101
sql """ drop table if exists ${testTable}; """
102102
sql """create table ${testTable}(a int,b int,c double generated always as (abs(a+b)) not null)
103103
DISTRIBUTED BY HASH(a) PROPERTIES("replication_num" = "1", "group_commit_interval_ms" = "40");"""
104-
sql "INSERT INTO ${testTable} values(6,7,default);"
105-
sql "INSERT INTO ${testTable}(a,b) values(1,2);"
106-
sql "INSERT INTO ${testTable} values(3,5,default);"
104+
sql " set group_commit = async_mode; "
105+
group_commit_insert "INSERT INTO ${testTable} values(6,7,default);", 1
106+
group_commit_insert "INSERT INTO ${testTable}(a,b) values(1,2);", 1
107+
group_commit_insert "INSERT INTO ${testTable} values(3,5,default);", 1
107108
getRowCount(3)
108109
qt_select1 "select * from ${testTable} order by 1,2,3;"
109110

@@ -123,12 +124,57 @@ suite("insert_group_commit_with_large_data") {
123124
if (exception != null) {
124125
throw exception
125126
}
126-
log.info("Stream load result: ${result}".toString())
127127
def json = parseJson(result)
128128
assertEquals("success", json.Status.toLowerCase())
129129
assertEquals(4, json.NumberTotalRows)
130130
}
131131
}
132132
getRowCount(7)
133133
qt_select2 "select * from ${testTable} order by 1,2,3;"
134+
135+
try {
136+
sql """set group_commit = off_mode;"""
137+
sql "drop table if exists gc_ctas1"
138+
sql "drop table if exists gc_ctas2"
139+
sql "drop table if exists gc_ctas3"
140+
sql '''
141+
CREATE TABLE IF NOT EXISTS `gc_ctas1` (
142+
`k1` varchar(5) NULL,
143+
`k2` varchar(5) NULL
144+
) ENGINE=OLAP
145+
DUPLICATE KEY(`k1`)
146+
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
147+
PROPERTIES (
148+
"replication_allocation" = "tag.location.default: 1"
149+
);
150+
'''
151+
sql '''
152+
CREATE TABLE IF NOT EXISTS `gc_ctas2` (
153+
`k1` varchar(10) NULL,
154+
`k2` varchar(10) NULL
155+
) ENGINE=OLAP
156+
DUPLICATE KEY(`k1`)
157+
DISTRIBUTED BY HASH(`k1`) BUCKETS 10
158+
PROPERTIES (
159+
"replication_allocation" = "tag.location.default: 1"
160+
);
161+
'''
162+
sql ''' insert into gc_ctas1 values('11111','11111'); '''
163+
sql ''' insert into gc_ctas2 values('1111111111','1111111111'); '''
164+
sql "sync"
165+
order_qt_select_cte1 """ select * from gc_ctas1; """
166+
order_qt_select_cte2 """ select * from gc_ctas2; """
167+
sql """set group_commit = async_mode;"""
168+
sql '''
169+
create table `gc_ctas3`(k1, k2)
170+
PROPERTIES("replication_num" = "1")
171+
as select * from gc_ctas1
172+
union all
173+
select * from gc_ctas2;
174+
'''
175+
sql " insert into gc_ctas3 select * from gc_ctas1 union all select * from gc_ctas2;"
176+
sql "sync"
177+
order_qt_select_cte3 """ select * from gc_ctas3; """
178+
} finally {
179+
}
134180
}

0 commit comments

Comments
 (0)