diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java index 3fc73ac39d425..f4f4d17b3470c 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java @@ -159,6 +159,8 @@ public void onFailure(Throwable t) { * @param blockIds block ids to be pushed * @param buffers buffers to be pushed * @param listener the listener to receive block push status. + * + * @since 3.1.0 */ public void pushBlocks( String host, @@ -168,4 +170,24 @@ public void pushBlocks( BlockFetchingListener listener) { throw new UnsupportedOperationException(); } + + /** + * Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge + * for a given shuffle. This allows the driver to start the shuffle reducer stage after properly + * finishing the shuffle merge process associated with the shuffle mapper stage. + * + * @param host host of shuffle server + * @param port port of shuffle server. + * @param shuffleId shuffle ID of the shuffle to be finalized + * @param listener the listener to receive MergeStatuses + * + * @since 3.1.0 + */ + public void finalizeShuffleMerge( + String host, + int port, + int shuffleId, + MergeFinalizerListener listener) { + throw new UnsupportedOperationException(); + } } diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java index f439b6485c810..b6e25fc8db2f7 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java @@ -208,6 +208,35 @@ public void pushBlocks( } } + @Override + public void finalizeShuffleMerge( + String host, + int port, + int shuffleId, + MergeFinalizerListener listener) { + checkInit(); + try { + TransportClient client = clientFactory.createClient(host, port); + ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer(); + client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() { + @Override + public void onSuccess(ByteBuffer response) { + listener.onShuffleMergeSuccess( + (MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response)); + } + + @Override + public void onFailure(Throwable e) { + listener.onShuffleMergeFailure(e); + } + }); + } catch (Exception e) { + logger.error("Exception while sending finalizeShuffleMerge request to {}:{}", + host, port, e); + listener.onShuffleMergeFailure(e); + } + } + @Override public MetricSet shuffleMetrics() { checkInit(); diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java new file mode 100644 index 0000000000000..08e13eea9f40d --- /dev/null +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergeFinalizerListener.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.shuffle; + +import java.util.EventListener; + +import org.apache.spark.network.shuffle.protocol.MergeStatuses; + +/** + * :: DeveloperApi :: + * + * Listener providing a callback function to invoke when driver receives the response for the + * finalize shuffle merge request sent to remote shuffle service. + * + * @since 3.1.0 + */ +public interface MergeFinalizerListener extends EventListener { + /** + * Called once upon successful response on finalize shuffle merge on a remote shuffle service. + * The returned {@link MergeStatuses} is passed to the listener for further processing + */ + void onShuffleMergeSuccess(MergeStatuses statuses); + + /** + * Called once upon failure response on finalize shuffle merge on a remote shuffle service. + */ + void onShuffleMergeFailure(Throwable e); +}