-
Notifications
You must be signed in to change notification settings - Fork 148
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[#133] feat(netty): integration-test supports netty. #1008
Conversation
@@ -252,7 +252,9 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff | |||
// get all register info according to coordinator's response | |||
Set<String> assignmentTags = RssSparkShuffleUtils.getAssignmentTags(sparkConf); | |||
ClientUtils.validateClientType(clientType); | |||
assignmentTags.add(clientType); | |||
if (!sparkConf.get(RssSparkConfig.RSS_TEST_MODE_ENABLE)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this? We should reduce similar code. If we don't have better solution, it's acceptable.
You can run the command to fix code style issues
|
Codecov Report
@@ Coverage Diff @@
## master #1008 +/- ##
============================================
+ Coverage 54.76% 55.19% +0.43%
- Complexity 2526 2545 +19
============================================
Files 362 362
Lines 19334 19377 +43
Branches 1799 1803 +4
============================================
+ Hits 10588 10695 +107
+ Misses 8116 8055 -61
+ Partials 630 627 -3
... and 6 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
@@ -105,9 +105,8 @@ public Map runTest(SparkSession spark, String fileName) throws Exception { | |||
map = javaPairRDD.collectAsMap(); | |||
shufflePath = appPath + "/1"; | |||
assertTrue(new File(shufflePath).exists()); | |||
} else { | |||
runCounter++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When running rssWithNetty, the path with shuffleId=0 has been cleaned up, and an assertion exception will be thrown at this time, assertTrue(fs.exists(new Path(shufflePath)));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zuston Could you take a look at this place?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just keep origin code logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK.
@@ -150,4 +152,8 @@ public String toString() { | |||
|
|||
return sb.toString(); | |||
} | |||
|
|||
public synchronized void copyDataTo(ByteBuf to) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ReplicateWrite
would cause concurrency decode
, that's why we added synchronized copyDataTo
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @leixm
### What changes were proposed in this pull request? Fix [#1008](#1008). It does not actually test `GRPC_NETTY` mode, because it uses `ShuffleServerGrpcClient` everywhere instead of `ShuffleServerGrpcNettyClient`. Setting the shuffle server's tags to `GRPC_NETTY,GRPC` is useless, because we are not using `ShuffleServerGrpcNettyClient` at all. ### Why are the changes needed? It is a sub PR for: #1519 Also, it is a follow-up PR for: #1008 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs.
What changes were proposed in this pull request?
For #133
Why are the changes needed?
Make integration-test support netty.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
UTs.