Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][admin] PIP-369 Change default value of unload-scope in `n… #3

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -778,15 +778,16 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
} catch (PulsarServerException e) {
return FutureUtil.failedFuture(e);
}
// compile regex patterns once
List<Pattern> namespacePatterns = policyData.getNamespaces().stream().map(Pattern::compile).toList();
// TODO for 4.x, we should include both old and new namespace regex pattern for unload `all_matching` option
Set<String> combinedNamespaces = new HashSet<>(policyData.getNamespaces());
final List<String> oldNamespaces = new ArrayList<>();
if (oldPolicy != null) {
oldNamespaces.addAll(oldPolicy.getNamespaces());
combinedNamespaces.addAll(oldNamespaces);
}
return adminClient.tenants().getTenantsAsync().thenCompose(tenants -> {
List<CompletableFuture<List<String>>> filteredNamespacesForEachTenant = tenants.stream()
.map(tenant -> adminClient.namespaces().getNamespacesAsync(tenant).thenCompose(namespaces -> {
List<CompletableFuture<String>> namespaceNamesInCluster = namespaces.stream()
.filter(namespaceName -> namespacePatterns.stream()
.anyMatch(pattern -> pattern.matcher(namespaceName).matches()))
.map(namespaceName -> adminClient.namespaces().getPoliciesAsync(namespaceName)
.thenApply(policies -> policies.replication_clusters.contains(cluster)
? namespaceName : null))
Expand All @@ -802,46 +803,44 @@ private CompletableFuture<Void> filterAndUnloadMatchedNamespaceAsync(String clus
.map(CompletableFuture::join)
.flatMap(List::stream)
.collect(Collectors.toList()));
}).thenCompose(shouldUnloadNamespaces -> {
if (CollectionUtils.isEmpty(shouldUnloadNamespaces)) {
}).thenCompose(clusterLocalNamespaces -> {
if (CollectionUtils.isEmpty(clusterLocalNamespaces)) {
return CompletableFuture.completedFuture(null);
}
// If unload type is 'changed', we need to figure out a further subset of namespaces whose placement might
// actually have been changed.

log.debug("Old policy: {} ; new policy: {}", oldPolicy, policyData);
if (oldPolicy != null && NamespaceIsolationPolicyUnloadScope.changed.equals(policyData.getUnloadScope())) {
// We also compare that the previous primary broker list is same as current, in case all namespaces need
// to be placed again anyway.
if (CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary())) {
// list is same, so we continue finding the changed namespaces.

// We create a union regex list contains old + new regexes
Set<String> combinedNamespaces = new HashSet<>(oldPolicy.getNamespaces());
combinedNamespaces.addAll(policyData.getNamespaces());
// We create a intersection of the old and new regexes. These won't need to be unloaded
Set<String> commonNamespaces = new HashSet<>(oldPolicy.getNamespaces());
commonNamespaces.retainAll(policyData.getNamespaces());
boolean unloadAllNamespaces = false;
// We also compare that the previous primary broker list is same as current, in case all namespaces need
// to be placed again anyway.
if (NamespaceIsolationPolicyUnloadScope.all_matching.equals(policyData.getUnloadScope())
|| (oldPolicy != null
&& !CollectionUtils.isEqualCollection(oldPolicy.getPrimary(), policyData.getPrimary()))) {
unloadAllNamespaces = true;
}
// list is same, so we continue finding the changed namespaces.

log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, combinedNamespaces);
// We create a intersection of the old and new regexes. These won't need to be unloaded.
Set<String> commonNamespaces = new HashSet<>(policyData.getNamespaces());
commonNamespaces.retainAll(oldNamespaces);

// Find the changed regexes (new - new ∩ old). TODO for 4.x, make this (new U old - new ∩ old)
combinedNamespaces.removeAll(commonNamespaces);
log.debug("combined regexes: {}; common regexes:{}", combinedNamespaces, commonNamespaces);

log.debug("changed regexes: {}", commonNamespaces);
if (!unloadAllNamespaces) {
// Find the changed regexes ((new U old) - (new ∩ old)).
combinedNamespaces.removeAll(commonNamespaces);
log.debug("changed regexes: {}", commonNamespaces);
}

// Now we further filter the filtered namespaces based on this combinedNamespaces set
shouldUnloadNamespaces = shouldUnloadNamespaces.stream()
.filter(name -> combinedNamespaces.stream()
.map(Pattern::compile)
.anyMatch(pattern -> pattern.matcher(name).matches())
).toList();
// Now we further filter the filtered namespaces based on this combinedNamespaces set
List<Pattern> namespacePatterns = combinedNamespaces.stream().map(Pattern::compile).toList();
clusterLocalNamespaces = clusterLocalNamespaces.stream()
.filter(name -> namespacePatterns.stream().anyMatch(pattern -> pattern.matcher(name).matches()))
.toList();

}
}
// unload type is either null or not in (changed, none), so we proceed to unload all namespaces
// TODO - default in 4.x should become `changed`
List<CompletableFuture<Void>> futures = shouldUnloadNamespaces.stream()
List<CompletableFuture<Void>> futures = clusterLocalNamespaces.stream()
.map(namespaceName -> adminClient.namespaces().unloadAsync(namespaceName))
.collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenAccept(__ -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3488,17 +3488,19 @@ private NamespaceIsolationData createPolicyData(NamespaceIsolationPolicyUnloadSc
parameters1.put("usage_threshold", "100");
List<String> nsRegexList = new ArrayList<>(namespaces);

return NamespaceIsolationData.builder()
NamespaceIsolationData.Builder build = NamespaceIsolationData.builder()
// "prop-ig/ns1" is present in test cluster, policy set on test2 should work
.namespaces(nsRegexList)
.primary(primaryBrokers)
.secondary(Collections.singletonList(""))
.autoFailoverPolicy(AutoFailoverPolicyData.builder()
.policyType(AutoFailoverPolicyType.min_available)
.parameters(parameters1)
.build())
.unloadScope(scope)
.build();
.build());
if (scope != null) {
build.unloadScope(scope);
}
return build.build();
}

private boolean allTopicsUnloaded(List<String> topics) {
Expand Down Expand Up @@ -3624,18 +3626,42 @@ public void testIsolationPolicyUnloadsNSWithAllScope(final String topicType) thr
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-all", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-c.*"), List.of("b1", "b2"),
Collections.singletonList(".*")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithChangedScope1(final String topicType) throws Exception {
String nsPrefix1 = newUniqueName(defaultTenant + "/") + "-unload-test-";
// Addition case
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-changed1", nsPrefix1, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
Collections.singletonList(".*")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithChangedScope2(final String topicType) throws Exception {
String nsPrefix2 = newUniqueName(defaultTenant + "/") + "-unload-test-";
// removal case
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-changed2", nsPrefix2, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("b1", "b2"),
changed, List.of(".*-unload-test-c.*"), List.of("b1", "b2", "c1"),
Collections.singletonList(".*")
);
}

@Test(dataProvider = "topicType")
public void testIsolationPolicyUnloadsNSWithChangedScope(final String topicType) throws Exception {
public void testIsolationPolicyUnloadsNSWithScopeMissing(final String topicType) throws Exception {
String nsPrefix = newUniqueName(defaultTenant + "/") + "-unload-test-";
testIsolationPolicyUnloadsNSWithScope(
topicType, "policy-changed", nsPrefix, List.of("a1", "a2", "b1", "b2", "c1"),
all_matching, List.of(".*-unload-test-a.*"), List.of("b1", "b2", "c1"),
changed, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
null, List.of(".*-unload-test-a.*", ".*-unload-test-c.*"), List.of("a1", "a2", "b1", "b2"),
Collections.singletonList(".*")
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ private class SetPolicy extends CliCommand {
private Map<String, String> autoFailoverPolicyParams;

@Option(names = "--unload-scope", description = "configure the type of unload to do -"
+ " ['all_matching', 'none', 'changed'] namespaces. By default, all namespaces matching the namespaces"
+ " regex will be unloaded and placed again. You can choose to not unload any namespace while setting"
+ " this new policy by choosing `none` or choose to unload only the namespaces whose placement will"
+ " actually change. If you chose 'none', you will need to manually unload the namespaces for them to"
+ " be placed correctly, or wait till some namespaces get load balanced automatically based on load"
+ " shedding configurations.")
+ " ['all_matching', 'none', 'changed'] namespaces. By default, only namespaces whose placement will"
+ " actually change would be unloaded and placed again. You can choose to not unload any namespace"
+ " while setting this new policy by choosing `none` or choose to unload all namespaces matching"
+ " old (if any) and new namespace regex. If you chose 'none', you will need to manually unload the"
+ " namespaces for them to be placed correctly, or wait till some namespaces get load balanced"
+ " automatically based on load shedding configurations.")
private NamespaceIsolationPolicyUnloadScope unloadScope;

void run() throws PulsarAdminException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ public class NamespaceIsolationDataImpl implements NamespaceIsolationData {
@ApiModelProperty(
name = "unload_scope",
value = "The type of unload to perform while applying the new isolation policy.",
example = "'all_matching' (default) for unloading all matching namespaces. 'none' for not unloading "
+ "any namespace. 'changed' for unloading only the namespaces whose placement is actually changing"
example = "'changed' (default) for unloading only the namespaces whose placement is actually changing. "
+ "'all_matching' for unloading all matching namespaces. 'none' for not unloading any namespaces."
)
@JsonProperty("unload_scope")
private NamespaceIsolationPolicyUnloadScope unloadScope;
Expand Down