Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#133] feat(netty): integration-test supports netty. #1008

Merged
merged 3 commits into from
Jul 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,17 @@ public List<AddBlockEvent> buildBlockEvents(List<ShuffleBlockInfo> shuffleBlockI
+ " bytes");
// Use final temporary variables for closures
final long _memoryUsed = memoryUsed;
final List<ShuffleBlockInfo> finalShuffleBlockInfosPerEvent = shuffleBlockInfoList;
events.add(
new AddBlockEvent(
taskId, shuffleBlockInfosPerEvent, () -> freeAllocatedMemory(_memoryUsed)));
taskId,
shuffleBlockInfosPerEvent,
() -> {
freeAllocatedMemory(_memoryUsed);
for (ShuffleBlockInfo shuffleBlockInfo : finalShuffleBlockInfosPerEvent) {
shuffleBlockInfo.getData().release();
}
}));
}
return events;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.uniffle.common.util.ByteBufUtils;

public class ShuffleBlockInfo {

private int partitionId;
Expand Down Expand Up @@ -150,4 +152,8 @@ public String toString() {

return sb.toString();
}

public synchronized void copyDataTo(ByteBuf to) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReplicateWrite would cause concurrency decode, that's why we added synchronized copyDataTo here.

ByteBufUtils.copyByteBuf(data, to);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;

import org.apache.uniffle.common.util.UnitConverter;
Expand Down Expand Up @@ -665,4 +666,9 @@ public String toString() {
public String getEnv(String key) {
return System.getenv(key);
}

@VisibleForTesting
public void remove(String key) {
this.settings.remove(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
} catch (Exception e) {
LOG.error("Unexpected exception during process encode!", e);
byteBuf.release();
throw e;
}
ctx.writeAndFlush(byteBuf);
// do transferTo send data after encode buffer send.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public ChannelFuture sendRpc(Message message, RpcResponseCallback callback) {
if (logger.isTraceEnabled()) {
logger.trace("Pushing data to {}", NettyUtils.getRemoteAddress(channel));
}
long requestId = requestId();
long requestId = message.getRequestId();
handler.addResponseCallback(requestId, callback);
RpcChannelListener listener = new RpcChannelListener(requestId, callback);
return channel.writeAndFlush(message).addListener(listener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ public static void encodeShuffleBlockInfo(ShuffleBlockInfo shuffleBlockInfo, Byt
byteBuf.writeLong(shuffleBlockInfo.getCrc());
byteBuf.writeLong(shuffleBlockInfo.getTaskAttemptId());
// todo: avoid copy
ByteBufUtils.copyByteBuf(shuffleBlockInfo.getData(), byteBuf);
shuffleBlockInfo.getData().release();
shuffleBlockInfo.copyDataTo(byteBuf);
List<ShuffleServerInfo> shuffleServerInfoList = shuffleBlockInfo.getShuffleServerInfos();
byteBuf.writeInt(shuffleServerInfoList.size());
for (ShuffleServerInfo shuffleServerInfo : shuffleServerInfoList) {
Expand All @@ -64,7 +63,8 @@ public static int encodeLengthOfShuffleBlockInfo(ShuffleBlockInfo shuffleBlockIn
int encodeLength =
4 * Long.BYTES
+ 4 * Integer.BYTES
+ ByteBufUtils.encodedLength(shuffleBlockInfo.getData())
+ Integer.BYTES
+ shuffleBlockInfo.getLength()
+ Integer.BYTES;
for (ShuffleServerInfo shuffleServerInfo : shuffleBlockInfo.getShuffleServerInfos()) {
encodeLength += encodeLengthOfShuffleServerInfo(shuffleServerInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,6 @@ public static Message decode(Type msgType, ByteBuf in) {
throw new IllegalArgumentException("Unexpected message type: " + msgType);
}
}

public abstract long getRequestId();
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public void testSendShuffleDataRequest() {
1,
1,
1,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand All @@ -61,7 +61,7 @@ public void testSendShuffleDataRequest() {
1,
1,
1,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand All @@ -74,7 +74,7 @@ public void testSendShuffleDataRequest() {
1,
2,
1,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand All @@ -85,7 +85,7 @@ public void testSendShuffleDataRequest() {
1,
1,
2,
10,
data.length,
123,
Unpooled.wrappedBuffer(data).retain(),
shuffleServerInfoList,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public static void setupServers() throws Exception {
coordinatorConf.setLong("rss.coordinator.server.heartbeat.timeout", 3000);
createCoordinatorServer(coordinatorConf);
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
createShuffleServer(shuffleServerConf);
shuffleServerConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + 1);
shuffleServerConf.setInteger("rss.jetty.http.port", 18081);
Expand Down Expand Up @@ -155,6 +156,7 @@ public void getShuffleAssignmentsTest() throws Exception {
withEnvironmentVariables("RSS_ENV_KEY", storageTypeJsonSource)
.execute(
() -> {
shuffleServerConf.remove(ShuffleServerConf.NETTY_SERVER_PORT.key());
ShuffleServer ss = new ShuffleServer((ShuffleServerConf) shuffleServerConf);
ss.start();
shuffleServers.set(0, ss);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.nio.file.Files;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.collect.Lists;
import org.junit.jupiter.api.AfterAll;
Expand Down Expand Up @@ -60,6 +61,9 @@ public abstract class IntegrationTestBase extends HadoopTestBase {
protected static List<ShuffleServer> shuffleServers = Lists.newArrayList();
protected static List<CoordinatorServer> coordinators = Lists.newArrayList();

protected static final int NETTY_PORT = 21000;
protected static AtomicInteger nettyPortCounter = new AtomicInteger();

public static void startServers() throws Exception {
for (CoordinatorServer coordinator : coordinators) {
coordinator.start();
Expand Down Expand Up @@ -123,6 +127,9 @@ protected static ShuffleServerConf getShuffleServerConf() throws Exception {
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setBoolean(ShuffleServerConf.RSS_TEST_MODE_ENABLE, true);
serverConf.set(ShuffleServerConf.SERVER_TRIGGER_FLUSH_CHECK_INTERVAL, 500L);
serverConf.setInteger(
ShuffleServerConf.NETTY_SERVER_PORT, NETTY_PORT + nettyPortCounter.getAndIncrement());
serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
return serverConf;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ public Map runTest(SparkSession spark, String fileName) throws Exception {
map = javaPairRDD.collectAsMap();
shufflePath = appPath + "/1";
assertTrue(fs.exists(new Path(shufflePath)));
} else {
runCounter++;
}
runCounter++;
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,8 @@ public Map runTest(SparkSession spark, String fileName) throws Exception {
map = javaPairRDD.collectAsMap();
shufflePath = appPath + "/1";
assertTrue(new File(shufflePath).exists());
} else {
runCounter++;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When running rssWithNetty, the path with shuffleId=0 has been cleaned up, and an assertion exception will be thrown at this time, assertTrue(fs.exists(new Path(shufflePath)));

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zuston Could you take a look at this place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just keep origin code logic.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK.

}
runCounter++;
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,23 @@ public void run() throws Exception {
updateSparkConfCustomer(sparkConf);
start = System.currentTimeMillis();
Map resultWithRss = runSparkApp(sparkConf, fileName);
long durationWithRss = System.currentTimeMillis() - start;
final long durationWithRss = System.currentTimeMillis() - start;

updateSparkConfWithRssNetty(sparkConf);
start = System.currentTimeMillis();
Map resultWithRssNetty = runSparkApp(sparkConf, fileName);
final long durationWithRssNetty = System.currentTimeMillis() - start;
verifyTestResult(resultWithoutRss, resultWithRss);
verifyTestResult(resultWithoutRss, resultWithRssNetty);

LOG.info(
"Test: durationWithoutRss["
+ durationWithoutRss
+ "], durationWithRss["
+ durationWithRss
+ "]"
+ "], durationWithRssNetty["
+ durationWithRssNetty
+ "]");
}

Expand Down Expand Up @@ -110,6 +118,10 @@ public void updateSparkConfWithRss(SparkConf sparkConf) {
sparkConf.set(RssSparkConfig.RSS_TEST_MODE_ENABLE.key(), "true");
}

public void updateSparkConfWithRssNetty(SparkConf sparkConf) {
sparkConf.set(RssSparkConfig.RSS_CLIENT_TYPE, "GRPC_NETTY");
}

protected void verifyTestResult(Map expected, Map actual) {
assertEquals(expected.size(), actual.size());
for (Object expectedKey : expected.keySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private static void createShuffleServers() throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
dataFolder.deleteOnExit();
serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + i);
serverConf.setInteger("rss.server.netty.port", NETTY_PORT + i);
serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name());
serverConf.setString("rss.storage.basePath", dataFolder.getAbsolutePath());
serverConf.setString("rss.server.buffer.capacity", String.valueOf(671088640 - i));
Expand All @@ -94,6 +95,7 @@ private static void createShuffleServers() throws Exception {
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
createMockedShuffleServer(serverConf);
}
enableRecordGetShuffleResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ private static void createShuffleServers() throws Exception {
ShuffleServerConf serverConf = new ShuffleServerConf();
dataFolder.deleteOnExit();
serverConf.setInteger("rss.rpc.server.port", SHUFFLE_SERVER_PORT + i);
serverConf.setInteger("rss.server.netty.port", NETTY_PORT + i);
serverConf.setString("rss.storage.type", StorageType.MEMORY_LOCALFILE_HDFS.name());
serverConf.setString("rss.storage.basePath", dataFolder.getAbsolutePath());
serverConf.setString("rss.server.buffer.capacity", "671088640");
Expand All @@ -98,6 +99,7 @@ private static void createShuffleServers() throws Exception {
serverConf.setString("rss.server.hadoop.dfs.replication", "2");
serverConf.setLong("rss.server.disk.capacity", 10L * 1024L * 1024L * 1024L);
serverConf.setBoolean("rss.server.health.check.enable", false);
serverConf.setString("rss.server.tags", "GRPC,GRPC_NETTY");
createMockedShuffleServer(serverConf);
}
enableRecordGetShuffleResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public CoordinatorClientFactory(ClientType clientType) {
}

public CoordinatorClient createCoordinatorClient(String host, int port) {
if (clientType.equals(ClientType.GRPC)) {
if (clientType.equals(ClientType.GRPC) || clientType.equals(ClientType.GRPC_NETTY)) {
return new CoordinatorGrpcClient(host, port);
} else {
throw new UnsupportedOperationException("Unsupported client type " + clientType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
responseMessage = errorMsg;
rpcResponse =
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, responseMessage);
client.sendRpcSync(rpcResponse, RPC_TIMEOUT);
client.getChannel().writeAndFlush(rpcResponse);
return;
}
final long start = System.currentTimeMillis();
Expand Down Expand Up @@ -209,7 +209,7 @@ public void handleSendShuffleDataRequest(TransportClient client, SendShuffleData
new RpcResponse(req.getRequestId(), StatusCode.INTERNAL_ERROR, "No data in request");
}

client.sendRpcSync(rpcResponse, RPC_TIMEOUT);
client.getChannel().writeAndFlush(rpcResponse);
}

public void handleGetMemoryShuffleDataRequest(
Expand Down Expand Up @@ -292,7 +292,7 @@ public void handleGetMemoryShuffleDataRequest(
new GetMemoryShuffleDataResponse(
req.getRequestId(), status, msg, Lists.newArrayList(), Unpooled.EMPTY_BUFFER);
}
client.sendRpcSync(response, RPC_TIMEOUT);
client.getChannel().writeAndFlush(response);
}

public void handleGetLocalShuffleIndexRequest(
Expand Down Expand Up @@ -374,7 +374,7 @@ public void handleGetLocalShuffleIndexRequest(
new GetLocalShuffleIndexResponse(
req.getRequestId(), status, msg, Unpooled.EMPTY_BUFFER, 0L);
}
client.sendRpcSync(response, RPC_TIMEOUT);
client.getChannel().writeAndFlush(response);
}

public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDataRequest req) {
Expand Down Expand Up @@ -471,7 +471,7 @@ public void handleGetLocalShuffleData(TransportClient client, GetLocalShuffleDat
new GetLocalShuffleDataResponse(
req.getRequestId(), status, msg, new NettyManagedBuffer(Unpooled.EMPTY_BUFFER));
}
client.sendRpcSync(response, RPC_TIMEOUT);
client.getChannel().writeAndFlush(response);
}

private List<ShufflePartitionedData> toPartitionedData(SendShuffleDataRequest req) {
Expand Down