Skip to content

Commit

Permalink
fix previous tests, add assert
Browse files Browse the repository at this point in the history
  • Loading branch information
rickyma committed Feb 6, 2024
1 parent a4db16e commit 6fad457
Show file tree
Hide file tree
Showing 10 changed files with 140 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,20 @@
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class DiskErrorToleranceTest extends ShuffleReadWriteBase {
Expand Down Expand Up @@ -128,11 +131,11 @@ private void diskErrorTest(boolean isNettyMode) throws Exception {
RssSendShuffleDataRequest rs1 = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
RssSendCommitRequest rc1 = new RssSendCommitRequest(appId, 0);
RssFinishShuffleRequest rf1 = new RssFinishShuffleRequest(appId, 0);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rs1);
} else {
shuffleServerClient.sendShuffleData(rs1);
}
RssSendShuffleDataResponse response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rs1)
: shuffleServerClient.sendShuffleData(rs1);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
shuffleServerClient.sendCommit(rc1);
shuffleServerClient.finishShuffle(rf1);
ShuffleReadClientImpl readClient =
Expand Down Expand Up @@ -172,11 +175,12 @@ private void diskErrorTest(boolean isNettyMode) throws Exception {
partitionToBlocks.put(0, blocks2);
shuffleToBlocks.put(0, partitionToBlocks);
rs1 = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rs1);
} else {
shuffleServerClient.sendShuffleData(rs1);
}
response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rs1)
: shuffleServerClient.sendShuffleData(rs1);
assertSame(StatusCode.SUCCESS, response.getStatusCode());

shuffleServerClient.sendCommit(rc1);
shuffleServerClient.finishShuffle(rf1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public abstract class HybridStorageFaultToleranceBase extends ShuffleReadWriteBase {
Expand Down Expand Up @@ -81,7 +84,8 @@ public void fallbackTest() throws Exception {
}

private void fallbackTest(boolean isNettyMode) throws Exception {
String appId = "fallback_test_" + this.getClass().getSimpleName() + "_isNettyMode_" + isNettyMode;
String appId =
"fallback_test_" + this.getClass().getSimpleName() + "_isNettyMode_" + isNettyMode;
Map<Long, byte[]> expectedData = Maps.newHashMap();
Map<Integer, List<Integer>> map = Maps.newHashMap();
map.put(0, Lists.newArrayList(0));
Expand Down Expand Up @@ -122,11 +126,11 @@ private void sendSinglePartitionToShuffleServer(
RssSendShuffleDataRequest rs = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
RssSendCommitRequest rc = new RssSendCommitRequest(appId, shuffle);
RssFinishShuffleRequest rf = new RssFinishShuffleRequest(appId, shuffle);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rs);
} else {
shuffleServerClient.sendShuffleData(rs);
}
RssSendShuffleDataResponse response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rs)
: shuffleServerClient.sendShuffleData(rs);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
shuffleServerClient.sendCommit(rc);
shuffleServerClient.finishShuffle(rf);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,20 @@
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataDistributionType;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

import static org.apache.uniffle.common.util.Constants.SHUFFLE_DATA_FILE_SUFFIX;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;

public class ShuffleServerConcurrentWriteOfHadoopTest extends ShuffleServerWithHadoopTest {
private static final int MAX_CONCURRENCY = 3;
Expand Down Expand Up @@ -86,7 +89,8 @@ private static Stream<Arguments> clientConcurrencyAndExpectedProvider() {
public void testConcurrentWrite2Hadoop(
int clientSpecifiedConcurrency, int expectedConcurrency, boolean isNettyMode)
throws Exception {
String appId = "testConcurrentWrite2Hadoop_" + new Random().nextInt() + "_isNettyMode_" + isNettyMode;
String appId =
"testConcurrentWrite2Hadoop_" + new Random().nextInt() + "_isNettyMode_" + isNettyMode;
String dataBasePath = HDFS_URI + "rss/test";
RssRegisterShuffleRequest rrsr =
new RssRegisterShuffleRequest(
Expand Down Expand Up @@ -120,11 +124,11 @@ public void testConcurrentWrite2Hadoop(
shuffleToBlocks.put(0, partitionToBlocks);
RssSendShuffleDataRequest rssdr =
new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
RssSendShuffleDataResponse response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
});

RssSendCommitRequest rscr = new RssSendCommitRequest(appId, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,8 @@ public void testReadFaultTolerance() throws Exception {
}

private void testReadFaultTolerance(boolean isNettyMode) throws Exception {
String testAppId = "ShuffleServerFaultToleranceTest.testReadFaultTolerance" + "_isNettyMode_" + isNettyMode;
String testAppId =
"ShuffleServerFaultToleranceTest.testReadFaultTolerance" + "_isNettyMode_" + isNettyMode;
int shuffleId = 0;
int partitionId = 0;
RssRegisterShuffleRequest rrsr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,21 @@
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleServerWithHadoopTest extends ShuffleReadWriteBase {
Expand Down Expand Up @@ -129,11 +132,11 @@ private void hadoopWriteReadTest(boolean isNettyMode) {

RssSendShuffleDataRequest rssdr =
new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
RssSendShuffleDataResponse response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
assertEquals(456, shuffleServers.get(0).getShuffleBufferManager().getUsedMemory());
assertEquals(0, shuffleServers.get(0).getShuffleBufferManager().getPreAllocatedSize());
RssSendCommitRequest rscr = new RssSendCommitRequest(appId, 0);
Expand All @@ -158,11 +161,11 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
shuffleToBlocks.clear();
shuffleToBlocks.put(0, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
assertEquals(0, shuffleServers.get(0).getShuffleBufferManager().getPreAllocatedSize());
rscr = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rscr);
Expand All @@ -174,11 +177,11 @@ private void hadoopWriteReadTest(boolean isNettyMode) {
shuffleToBlocks.clear();
shuffleToBlocks.put(0, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
rscr = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rscr);
rfsr = new RssFinishShuffleRequest(appId, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.CompressedShuffleBlock;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.KerberizedHadoopBase;
import org.apache.uniffle.common.PartitionRange;
Expand All @@ -52,6 +53,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.RssUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.CoordinatorServer;
Expand All @@ -62,6 +64,7 @@
import static org.apache.uniffle.test.ShuffleReadWriteBase.mockSSI;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleServerWithKerberizedHadoopTest extends KerberizedHadoopBase {
Expand Down Expand Up @@ -253,11 +256,11 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {

RssSendShuffleDataRequest rssdr =
new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
RssSendShuffleDataResponse response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());

assertEquals(456, shuffleServer.getShuffleBufferManager().getUsedMemory());
assertEquals(0, shuffleServer.getShuffleBufferManager().getPreAllocatedSize());
Expand All @@ -282,11 +285,11 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
shuffleToBlocks.clear();
shuffleToBlocks.put(0, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
assertEquals(0, shuffleServer.getShuffleBufferManager().getPreAllocatedSize());
rscr = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rscr);
Expand All @@ -298,11 +301,11 @@ private void hadoopWriteReadTest(boolean isNettyMode) throws Exception {
shuffleToBlocks.clear();
shuffleToBlocks.put(0, partitionToBlocks);
rssdr = new RssSendShuffleDataRequest(appId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
rscr = new RssSendCommitRequest(appId, 0);
shuffleServerClient.sendCommit(rscr);
rfsr = new RssFinishShuffleRequest(appId, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.client.util.DefaultIdHelper;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ClientType;
Expand All @@ -51,6 +52,7 @@
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.segment.LocalOrderSegmentSplitter;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
Expand All @@ -59,6 +61,7 @@

import static org.apache.uniffle.common.ShuffleDataDistributionType.LOCAL_ORDER;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

Expand Down Expand Up @@ -190,11 +193,11 @@ private void testWriteAndReadWithSpecifiedMapRange(boolean isNettyMode) throws E

RssSendShuffleDataRequest rssdr =
new RssSendShuffleDataRequest(testAppId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
RssSendShuffleDataResponse response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());

// Flush the data to file
RssSendCommitRequest rscr = new RssSendCommitRequest(testAppId, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@
import org.apache.uniffle.client.request.RssRegisterShuffleRequest;
import org.apache.uniffle.client.request.RssSendCommitRequest;
import org.apache.uniffle.client.request.RssSendShuffleDataRequest;
import org.apache.uniffle.client.response.RssSendShuffleDataResponse;
import org.apache.uniffle.common.BufferSegment;
import org.apache.uniffle.common.ClientType;
import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.ShuffleBlockInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.server.ShuffleServerConf;
Expand All @@ -55,6 +57,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class ShuffleServerWithLocalTest extends ShuffleReadWriteBase {
Expand Down Expand Up @@ -133,11 +136,11 @@ private void localWriteReadTest(boolean isNettyMode) throws Exception {

RssSendShuffleDataRequest rssdr =
new RssSendShuffleDataRequest(testAppId, 3, 1000, shuffleToBlocks);
if (isNettyMode) {
shuffleServerNettyClient.sendShuffleData(rssdr);
} else {
shuffleServerClient.sendShuffleData(rssdr);
}
RssSendShuffleDataResponse response =
isNettyMode
? shuffleServerNettyClient.sendShuffleData(rssdr)
: shuffleServerClient.sendShuffleData(rssdr);
assertSame(StatusCode.SUCCESS, response.getStatusCode());
RssSendCommitRequest rscr = new RssSendCommitRequest(testAppId, 0);
shuffleServerClient.sendCommit(rscr);
RssFinishShuffleRequest rfsr = new RssFinishShuffleRequest(testAppId, 0);
Expand Down
Loading

0 comments on commit 6fad457

Please sign in to comment.