Skip to content

Commit 5aa90a3

Browse files
authored
[pipelineX](local shuffle) Fix bucket hash shuffle (apache#28202)
1 parent 61379b1 commit 5aa90a3

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -3596,6 +3596,7 @@ Map<TNetworkAddress, TPipelineFragmentParams> toTPipelineParams(int backendNum)
35963596
}
35973597

35983598
Map<TNetworkAddress, TPipelineFragmentParams> res = new HashMap();
3599+
Map<TNetworkAddress, Integer> instanceIdx = new HashMap();
35993600
for (int i = 0; i < instanceExecParams.size(); ++i) {
36003601
final FInstanceExecParam instanceExecParam = instanceExecParams.get(i);
36013602
if (!res.containsKey(instanceExecParam.host)) {
@@ -3625,10 +3626,16 @@ Map<TNetworkAddress, TPipelineFragmentParams> toTPipelineParams(int backendNum)
36253626
params.setNumBuckets(fragment.getBucketNum());
36263627
res.put(instanceExecParam.host, params);
36273628
res.get(instanceExecParam.host).setBucketSeqToInstanceIdx(new HashMap<Integer, Integer>());
3629+
instanceIdx.put(instanceExecParam.host, 0);
36283630
}
3631+
// Set each bucket belongs to which instance on this BE.
3632+
// This is used for LocalExchange(BUCKET_HASH_SHUFFLE).
3633+
int instanceId = instanceIdx.get(instanceExecParam.host);
36293634
for (int bucket : instanceExecParam.bucketSeqSet) {
3630-
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, i);
3635+
res.get(instanceExecParam.host).getBucketSeqToInstanceIdx().put(bucket, instanceId);
3636+
36313637
}
3638+
instanceIdx.replace(instanceExecParam.host, ++instanceId);
36323639
TPipelineFragmentParams params = res.get(instanceExecParam.host);
36333640
TPipelineInstanceParams localParams = new TPipelineInstanceParams();
36343641

0 commit comments

Comments
 (0)