Skip to content

Commit 619d500

Browse files
authored
[bug](pipelineX) Fix pipelineX bug on multiple BE (#28792)
1 parent a16680c commit 619d500

File tree

4 files changed

+31
-10
lines changed

4 files changed

+31
-10
lines changed

fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java

+27-9
Original file line numberDiff line numberDiff line change
@@ -1585,10 +1585,14 @@ private void computeFragmentExecParams() throws Exception {
15851585
dest.server = dummyServer;
15861586
dest.setBrpcServer(dummyServer);
15871587

1588-
int parallelTasksNum = destParams.ignoreDataDistribution
1589-
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
1590-
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
1588+
Set<TNetworkAddress> hostSet = new HashSet<>();
1589+
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
15911590
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
1591+
if (destParams.ignoreDataDistribution
1592+
&& hostSet.contains(instanceExecParams.host)) {
1593+
continue;
1594+
}
1595+
hostSet.add(instanceExecParams.host);
15921596
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
15931597
dest.fragment_instance_id = instanceExecParams.instanceId;
15941598
dest.server = toRpcHost(instanceExecParams.host);
@@ -1623,10 +1627,14 @@ private void computeFragmentExecParams() throws Exception {
16231627
}
16241628
});
16251629
} else {
1630+
Set<TNetworkAddress> hostSet = new HashSet<>();
16261631
// add destination host to this fragment's destination
1627-
int parallelTasksNum = destParams.ignoreDataDistribution
1628-
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
1629-
for (int j = 0; j < parallelTasksNum; ++j) {
1632+
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
1633+
if (destParams.ignoreDataDistribution
1634+
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
1635+
continue;
1636+
}
1637+
hostSet.add(destParams.instanceExecParams.get(j).host);
16301638
TPlanFragmentDestination dest = new TPlanFragmentDestination();
16311639
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
16321640
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);
@@ -1698,10 +1706,14 @@ private void computeMultiCastFragmentParams() throws Exception {
16981706
dest.server = dummyServer;
16991707
dest.setBrpcServer(dummyServer);
17001708

1701-
int parallelTasksNum = destParams.ignoreDataDistribution
1702-
? destParams.parallelTasksNum : destParams.instanceExecParams.size();
1703-
for (int insIdx = 0; insIdx < parallelTasksNum; insIdx++) {
1709+
Set<TNetworkAddress> hostSet = new HashSet<>();
1710+
for (int insIdx = 0; insIdx < destParams.instanceExecParams.size(); insIdx++) {
17041711
FInstanceExecParam instanceExecParams = destParams.instanceExecParams.get(insIdx);
1712+
if (destParams.ignoreDataDistribution
1713+
&& hostSet.contains(instanceExecParams.host)) {
1714+
continue;
1715+
}
1716+
hostSet.add(instanceExecParams.host);
17051717
if (instanceExecParams.bucketSeqSet.contains(bucketSeq)) {
17061718
dest.fragment_instance_id = instanceExecParams.instanceId;
17071719
dest.server = toRpcHost(instanceExecParams.host);
@@ -1736,7 +1748,13 @@ private void computeMultiCastFragmentParams() throws Exception {
17361748
}
17371749
});
17381750
} else {
1751+
Set<TNetworkAddress> hostSet = new HashSet<>();
17391752
for (int j = 0; j < destParams.instanceExecParams.size(); ++j) {
1753+
if (destParams.ignoreDataDistribution
1754+
&& hostSet.contains(destParams.instanceExecParams.get(j).host)) {
1755+
continue;
1756+
}
1757+
hostSet.add(destParams.instanceExecParams.get(j).host);
17401758
TPlanFragmentDestination dest = new TPlanFragmentDestination();
17411759
dest.fragment_instance_id = destParams.instanceExecParams.get(j).instanceId;
17421760
dest.server = toRpcHost(destParams.instanceExecParams.get(j).host);

fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -3175,7 +3175,8 @@ public boolean isMaterializedViewRewriteEnableContainForeignTable() {
31753175
}
31763176

31773177
public boolean isIgnoreStorageDataDistribution() {
3178-
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle;
3178+
return ignoreStorageDataDistribution && getEnablePipelineXEngine() && enableLocalShuffle
3179+
&& enableNereidsPlanner;
31793180
}
31803181

31813182
public void setIgnoreStorageDataDistribution(boolean ignoreStorageDataDistribution) {

regression-test/suites/correctness_p0/test_runtime_filter.groovy

+1
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ suite("test_runtime_filter") {
6161
sql "set enable_fallback_to_original_planner=false"
6262
sql "set disable_join_reorder=true"
6363

64+
sql "set ignore_storage_data_distribution=false"
6465
explain{
6566
sql ("""select * from rf_tblA join rf_tblB on a < b""")
6667
contains "runtime filters: RF000[max] -> a"

regression-test/suites/query_p0/join/test_bitmap_filter_nereids.groovy

+1
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ suite("test_bitmap_filter_nereids") {
8181
exception "Doris hll, bitmap, array, map, struct, jsonb, variant column must use with specific function, and don't support filter"
8282
}
8383

84+
sql "set ignore_storage_data_distribution=false"
8485
explain{
8586
sql "select k1, k2 from ${tbl1} where k1 in (select k2 from ${tbl2}) order by k1;"
8687
contains "RF000[bitmap]"

0 commit comments

Comments
 (0)