-
Notifications
You must be signed in to change notification settings - Fork 374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CELEBORN-1489] Update Flink support with authentication support #2596
Conversation
+CC @otterc, @SteNicholas, @RexXiong |
+CC @venkata91 |
LifecycleManager manager = lifecycleManager; | ||
if (null != manager) { | ||
manager.stop(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an unrelated minor fix for NPE
@@ -53,7 +56,7 @@ public TransportClient createClientWithRetry(String remoteHost, int remotePort) | |||
while (retryCount > 0) { | |||
try { | |||
return createClient(remoteHost, remotePort); | |||
} catch (IOException e) { | |||
} catch (Exception e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SASL failures are not checked exceptions - we have to retry for those too.
@@ -565,6 +587,15 @@ public void setDataClientFactory(TransportClientFactory dataClientFactory) { | |||
@Override | |||
@VisibleForTesting | |||
public TransportClientFactory getDataClientFactory() { | |||
initializeTransportClientFactoryIfRequired(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getDataClientFactory
gets invoked without setupLifecycleManagerRef
primarily for tests.
} catch (IOException e) { | ||
throw new RuntimeException("Failed to resolve path " + path, e); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is strictly not required for this PR - but keeps failing in ApiBaseResourceAuthenticationSuite
(with maven) without it.
@@ -95,8 +95,10 @@ class WorkerSource(conf: CelebornConf) extends AbstractSource(conf, MetricsSyste | |||
def connectionInactive(client: TransportClient): Unit = { | |||
val applicationIds = appActiveConnections.remove(client.getChannel.id().asLongText()) | |||
incCounter(ACTIVE_CONNECTION_COUNT, -1) | |||
applicationIds.asScala.foreach(applicationId => | |||
incCounter(ACTIVE_CONNECTION_COUNT, -1, Map(applicationLabel -> applicationId))) | |||
if (null != applicationIds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was causing NPE in the tests
Thanks for tagging me here. Will review the PR shortly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall lgtm except for some minor comments. This looks great, thanks for adding SASL auth support for Flink with Celeborn.
...mmon/src/main/java/org/apache/celeborn/plugin/flink/network/FlinkTransportClientFactory.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
...common/src/main/java/org/apache/celeborn/plugin/flink/readclient/FlinkShuffleClientImpl.java
Outdated
Show resolved
Hide resolved
Thanks @venkata91, @RexXiong ... have addressed the comments. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM, merge to main(v0.6.0)
Thanks @venkata91, @RexXiong ! |
Let me backport this to 0.5 branch as well @RexXiong |
Merge conflicts in branch 0.5 Fix authentication support for Apache Flink. Without these changes, Apache Flink applications fail when Celeborn cluster has authentication enabled. Fixes authentication support for Apache Flink integration This is forward port + adaptation of changes we did internally (against 0.4) when testing Apache Flink applications against Celeborn cluster with authentication (and TLS) enabled. Integration test has been updated to additionally test for Flink applications with authentication enabled in Celeborn cluster. Closes apache#2596 from mridulm/fix-flink-auth-support. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: Shuang <lvshuang.xjs@alibaba-inc.com>
…ckport to 0.5 Backport of #2596 to branch-0.5 Conflicts were in: * ApiBaseResourceAuthenticationSuite.scala * ApiBaseResourceSuite.scala Description from #2596: ### What changes were proposed in this pull request? Fix authentication support for Apache Flink. ### Why are the changes needed? Without these changes, Apache Flink applications fail when Celeborn cluster has authentication enabled. ### Does this PR introduce _any_ user-facing change? Fixes authentication support for Apache Flink integration ### How was this patch tested? This is forward port + adaptation of changes we did internally (against 0.4) when testing Apache Flink applications against Celeborn cluster with authentication (and TLS) enabled. Integration test has been updated to additionally test for Flink applications with authentication enabled in Celeborn cluster. Closes #2603 from mridulm/fix-flink-auth-support-v0.5. Authored-by: Mridul Muralidharan <mridulatgmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
Fix authentication support for Apache Flink.
Why are the changes needed?
Without these changes, Apache Flink applications fail when Celeborn cluster has authentication enabled.
Does this PR introduce any user-facing change?
Fixes authentication support for Apache Flink integration
How was this patch tested?
This is forward port + adaptation of changes we did internally (against 0.4) when testing Apache Flink applications against Celeborn cluster with authentication (and TLS) enabled.
Integration test has been updated to additionally test for Flink applications with authentication enabled in Celeborn cluster.