Skip to content

Commit

Permalink
[SPARK-32918][SHUFFLE] RPC implementation to support control plane co…
Browse files Browse the repository at this point in the history
…ordination for push-based shuffle

### What changes were proposed in this pull request?
This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
Summary of changes:
This PR introduces a new RPC to be called within Driver. When the expected shuffle push wait time reaches, Driver will call this RPC to facilitate coordination of shuffle map/reduce stages and notify external shuffle services to finalize shuffle block merge for a given shuffle. Shuffle services also respond back the metadata about a merged shuffle partition back to the caller.

### Why are the changes needed?
Refer to the SPIP in SPARK-30602.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
This code snippets won't be called by any existing code and will be tested after the coordinated driver changes gets merged in SPARK-32920.

Lead-authored-by: Min Shen mshenlinkedin.com

Closes #30163 from zhouyejoe/SPARK-32918.

Lead-authored-by: Ye Zhou <yezhou@linkedin.com>
Co-authored-by: Min Shen <mshen@linkedin.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
  • Loading branch information
2 people authored and wakun committed Jul 30, 2022
1 parent 4b4e1b3 commit 3fef6a6
Show file tree
Hide file tree
Showing 3 changed files with 94 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ public void onFailure(Throwable t) {
* @param blockIds block ids to be pushed
* @param buffers buffers to be pushed
* @param listener the listener to receive block push status.
*
* @since 3.1.0
*/
public void pushBlocks(
String host,
Expand All @@ -168,4 +170,24 @@ public void pushBlocks(
BlockFetchingListener listener) {
throw new UnsupportedOperationException();
}

/**
* Invoked by Spark driver to notify external shuffle services to finalize the shuffle merge
* for a given shuffle. This allows the driver to start the shuffle reducer stage after properly
* finishing the shuffle merge process associated with the shuffle mapper stage.
*
* @param host host of shuffle server
* @param port port of shuffle server.
* @param shuffleId shuffle ID of the shuffle to be finalized
* @param listener the listener to receive MergeStatuses
*
* @since 3.1.0
*/
public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
MergeFinalizerListener listener) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,35 @@ public void pushBlocks(
}
}

@Override
public void finalizeShuffleMerge(
String host,
int port,
int shuffleId,
MergeFinalizerListener listener) {
checkInit();
try {
TransportClient client = clientFactory.createClient(host, port);
ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
listener.onShuffleMergeSuccess(
(MergeStatuses) BlockTransferMessage.Decoder.fromByteBuffer(response));
}

@Override
public void onFailure(Throwable e) {
listener.onShuffleMergeFailure(e);
}
});
} catch (Exception e) {
logger.error("Exception while sending finalizeShuffleMerge request to {}:{}",
host, port, e);
listener.onShuffleMergeFailure(e);
}
}

@Override
public MetricSet shuffleMetrics() {
checkInit();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;

import java.util.EventListener;

import org.apache.spark.network.shuffle.protocol.MergeStatuses;

/**
* :: DeveloperApi ::
*
* Listener providing a callback function to invoke when driver receives the response for the
* finalize shuffle merge request sent to remote shuffle service.
*
* @since 3.1.0
*/
public interface MergeFinalizerListener extends EventListener {
/**
* Called once upon successful response on finalize shuffle merge on a remote shuffle service.
* The returned {@link MergeStatuses} is passed to the listener for further processing
*/
void onShuffleMergeSuccess(MergeStatuses statuses);

/**
* Called once upon failure response on finalize shuffle merge on a remote shuffle service.
*/
void onShuffleMergeFailure(Throwable e);
}

0 comments on commit 3fef6a6

Please sign in to comment.