-
Notifications
You must be signed in to change notification settings - Fork 13.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-17036: KIP-919 supports for createAcls, deleteAcls, describeAcls #16493
Conversation
tools/src/test/java/org/apache/kafka/tools/ClusterToolTest.java
Outdated
Show resolved
Hide resolved
5ea3dc6
to
d2741e6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this patch. Could you please make testAclOperations
run with controller also?
kafka/core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
Line 1121 in 42f267a
def testAclOperations(quorum: String): Unit = { |
d2741e6
to
50a61b7
Compare
Hi @chia7712, I addressed all comments. Thanks for your review. |
50a61b7
to
d166017
Compare
d166017
to
1fc3eeb
Compare
1fc3eeb
to
107261b
Compare
@FrankYang0529 Could you please move all tests to |
@FrankYang0529 any updates? |
107261b
to
2bfcf53
Compare
Hi @chia7712, updated it. Could you take a look? Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this nice patch. one one small comment is left
.map(server -> server.authorizer().get()).collect(Collectors.toList())); | ||
allAuthorizers.addAll(clusterInstance.controllers().values().stream() | ||
.map(server -> server.authorizer().get()).collect(Collectors.toList())); | ||
allAuthorizers.forEach(authorizer -> kafka.utils.TestUtils.waitAndVerifyAcls( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please add those code to ClusterInstance
as a helper method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, add a method authorizers
to ClusterInstance
. Thanks.
2bfcf53
to
cbac335
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for updated PR
@@ -204,4 +207,13 @@ default void waitForTopic(String topic, int partitions) throws InterruptedExcept | |||
60000L, "Timeout waiting for controller metadata propagating to brokers"); | |||
} | |||
} | |||
|
|||
default List<Authorizer> authorizers() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you please check the existence of Option
?
@@ -204,4 +207,13 @@ default void waitForTopic(String topic, int partitions) throws InterruptedExcept | |||
60000L, "Timeout waiting for controller metadata propagating to brokers"); | |||
} | |||
} | |||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for another, please add waitAcls
. for example:
default void waitAcls(AclBindingFilter filter, Collection<AccessControlEntry> entries) throws InterruptedException {
for (Authorizer authorizer : authorizers()) {
TestUtils.waitForCondition(() -> {
Assertions.assertIterableEquals(authorizer.acls(filter), entries);
return true;
}, "except: ");
}
}
cbac335
to
a76f07d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this patch
|
||
if (options.has(authorizerPropertiesOpt) && options.has(bootstrapServerOpt)) | ||
if (options.has(authorizerPropertiesOpt) && (hasServerOrController)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: if (options.has(authorizerPropertiesOpt) && hasServerOrController)
authorizer.acls(filter).forEach(aclBinding -> accessControlEntrySet.add(aclBinding.entry())); | ||
actualEntries.set(accessControlEntrySet); | ||
return accessControlEntrySet.containsAll(entries) && entries.containsAll(accessControlEntrySet); | ||
}, () -> "except acls: " + entries + ", actual acls: " + actualEntries.get()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
`expected acls:
a76f07d
to
cafa09d
Compare
Hi @chia7712, thanks for the review. I address all comments. |
@FrankYang0529 could you please fix conflicts? |
cafa09d
to
e7217a5
Compare
Fixed it. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this patch.
e7217a5
to
120f824
Compare
Hi @chia7712, I addressed all comments. Could you take a look again? Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 overall LGTM. two small comments are left.
.filter(e -> e.getLevel().equals(Level.WARN.toString())) | ||
.filter(e -> e.getThrowableClassName().filter(name -> name.equals(InstanceAlreadyExistsException.class.getName())).isPresent()) | ||
.count(), "There should be no warnings about multiple registration of mbeans"); | ||
} catch (InterruptedException | IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we don't need to catch the exception. Instead, we can add the exception to the method signature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @chia7712, thanks for your review. I add exceptions to the method signature.
.filter(e -> e.getLevel().equals(Level.WARN.toString())) | ||
.filter(e -> e.getThrowableClassName().filter(name -> name.equals(InstanceAlreadyExistsException.class.getName())).isPresent()) | ||
.count(), "There should be no warnings about multiple registration of mbeans"); | ||
} catch (InterruptedException | IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
120f824
to
195ce91
Compare
Signed-off-by: PoAn Yang <payang@apache.org>
195ce91
to
e393678
Compare
Hi @chia7712, could you help me review this when you have time? Thank you. |
LeastLoadedBrokerOrActiveKController
fordescribeAcls
,createAcls
, anddeleteAcls
APIs.--bootstrap-controller
toAclCommand
.AclCommandTest
.Committer Checklist (excluded from commit message)