Skip to content

Commit

Permalink
[Admin] Get schema validation enforce add applied. (#12349)
Browse files Browse the repository at this point in the history
now, namespace get schema validation enforce don't return broker config, if namespace policy the schema validation enforce is false and we use --applied, we should return this config in broker level.
  • Loading branch information
congbobo184 authored Oct 18, 2021
1 parent 4bf9d89 commit 69fb802
Show file tree
Hide file tree
Showing 7 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2387,10 +2387,15 @@ protected void internalSetSchemaCompatibilityStrategy(SchemaCompatibilityStrateg
"schemaCompatibilityStrategy");
}

protected boolean internalGetSchemaValidationEnforced() {
protected boolean internalGetSchemaValidationEnforced(boolean applied) {
validateNamespacePolicyOperation(namespaceName, PolicyName.SCHEMA_COMPATIBILITY_STRATEGY,
PolicyOperation.READ);
return getNamespacePolicies(namespaceName).schema_validation_enforced;
boolean schemaValidationEnforced = getNamespacePolicies(namespaceName).schema_validation_enforced;
if (!schemaValidationEnforced && applied) {
return pulsar().getConfiguration().isSchemaValidationEnforced();
} else {
return schemaValidationEnforced;
}
}

protected void internalSetSchemaValidationEnforced(boolean schemaValidationEnforced) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1626,9 +1626,10 @@ public void setSubscriptionTypesEnabled(
@ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenants or Namespace doesn't exist") })
public boolean getSchemaValidtionEnforced(@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace) {
@PathParam("namespace") String namespace,
@QueryParam("applied") @DefaultValue("false") boolean applied) {
validateNamespaceName(tenant, namespace);
return internalGetSchemaValidationEnforced();
return internalGetSchemaValidationEnforced(applied);
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ public void cleanup() throws Exception {
super.internalCleanup();
}

@Test
public void testGetSchemaValidationEnforcedApplied() throws Exception {
String namespace = "schema-validation-enforced/testApplied";
admin.namespaces().createNamespace(namespace);
this.conf.setSchemaValidationEnforced(true);
assertTrue(admin.namespaces().getSchemaValidationEnforced(namespace, true));
assertFalse(admin.namespaces().getSchemaValidationEnforced(namespace, false));
}

@Test
public void testDisableSchemaValidationEnforcedNoSchema() throws Exception {
admin.namespaces().createNamespace("schema-validation-enforced/default-no-schema");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3539,6 +3539,7 @@ void setSchemaAutoUpdateCompatibilityStrategy(String namespace,

/**
* Get schema validation enforced for namespace.
* @param namespace namespace for this command.
* @return the schema validation enforced flag
* @throws NotAuthorizedException
* Don't have admin permission
Expand All @@ -3547,16 +3548,39 @@ void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
* @throws PulsarAdminException
* Unexpected error
*/
boolean getSchemaValidationEnforced(String namespace)
throws PulsarAdminException;
boolean getSchemaValidationEnforced(String namespace) throws PulsarAdminException;

/**
* Get schema validation enforced for namespace asynchronously.
* @param namespace namespace for this command.
*
* @return the schema validation enforced flag
*/
CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace);

/**
* Get schema validation enforced for namespace.
* @param namespace namespace for this command.
* @param applied applied for this command.
* @return the schema validation enforced flag
* @throws NotAuthorizedException
* Don't have admin permission
* @throws NotFoundException
* Tenant or Namespace does not exist
* @throws PulsarAdminException
* Unexpected error
*/
boolean getSchemaValidationEnforced(String namespace, boolean applied) throws PulsarAdminException;

/**
* Get schema validation enforced for namespace asynchronously.
* @param namespace namespace for this command.
* @param applied applied for this command.
*
* @return the schema validation enforced flag
*/
CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace, boolean applied);

/**
* Set schema validation enforced for namespace.
* if a producer without a schema attempts to produce to a topic with schema in this the namespace, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3060,11 +3060,21 @@ public void setSchemaAutoUpdateCompatibilityStrategy(String namespace,
}

@Override
public boolean getSchemaValidationEnforced(String namespace)
public boolean getSchemaValidationEnforced(String namespace) throws PulsarAdminException {
return getSchemaValidationEnforced(namespace, false);
}

@Override
public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace) {
return getSchemaValidationEnforcedAsync(namespace, false);
}

@Override
public boolean getSchemaValidationEnforced(String namespace, boolean applied)
throws PulsarAdminException {
try {
return getSchemaValidationEnforcedAsync(namespace).
get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
return getSchemaValidationEnforcedAsync(namespace, applied)
.get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
} catch (ExecutionException e) {
throw (PulsarAdminException) e.getCause();
} catch (InterruptedException e) {
Expand All @@ -3076,9 +3086,10 @@ public boolean getSchemaValidationEnforced(String namespace)
}

@Override
public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace) {
public CompletableFuture<Boolean> getSchemaValidationEnforcedAsync(String namespace, boolean applied) {
NamespaceName ns = NamespaceName.get(namespace);
WebTarget path = namespacePath(ns, "schemaValidationEnforced");
path = path.queryParam("applied", applied);
final CompletableFuture<Boolean> future = new CompletableFuture<>();
asyncGetRequest(path,
new InvocationCallback<Boolean>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,9 @@ public void namespaces() throws Exception {
namespaces.run(split("get-subscription-types-enabled myprop/clust/ns1"));
verify(mockNamespaces).getSubscriptionTypesEnabled("myprop/clust/ns1");

namespaces.run(split("get-schema-validation-enforce myprop/clust/ns1 -ap"));
verify(mockNamespaces).getSchemaValidationEnforced("myprop/clust/ns1", true);

namespaces
.run(split("set-bookie-affinity-group myprop/clust/ns1 --primary-group test1 --secondary-group test2"));
verify(mockNamespaces).setBookieAffinityGroup("myprop/clust/ns1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1877,11 +1877,14 @@ private class GetSchemaValidationEnforced extends CliCommand {
@Parameter(description = "tenant/namespace", required = true)
private java.util.List<String> params;

@Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the namespace")
private boolean applied = false;

@Override
void run() throws PulsarAdminException {
String namespace = validateNamespace(params);

System.out.println(getAdmin().namespaces().getSchemaValidationEnforced(namespace));
System.out.println(getAdmin().namespaces().getSchemaValidationEnforced(namespace, applied));
}
}

Expand Down

0 comments on commit 69fb802

Please sign in to comment.