Skip to content

Commit

Permalink
Merge branch 'up/master' into website/branch-2.7.2-chapter-5
Browse files Browse the repository at this point in the history
* up/master:
  [Doc] Add explanations for setting geo-replication at topic level (apache#12633)
  commit chapter Tiered Storage (apache#12592)
  [pulsar-admin] Add remove-subscription-types-enabled command for namespace (apache#12392)
  Enable CLI to publish non-batched messages (apache#12641)
  [Doc] Add doc for tokenSettingPrefix (apache#12662)
  [pulsar-admin] Add corresponding get command for namespace (apache#12322)
  [pulsar-admin] Perfect judgment conditions of pulsar-admin (apache#12315)
  [broker] Avoid unnecessary recalculation of maxSubscriptionsPerTopic in AbstractTopic (apache#12658)
  [Transaction]Stop TB recovering with exception (apache#12636)
  [website][upgrade]feat: docs migration - 2.7.1 / client (apache#12612)
  [website][upgrade]feat: docs migration - 2.7.1 / performance (apache#12611)
  [website][upgrade]feat: docs migration - 2.7.1 / security (apache#12610)
  [Modernizer] Apply Modernizer plugin for pulsar broker common module and fix violation. (apache#12657)
  [Authorization] Support GET_METADATA topic op after enable auth (apache#12656)
  Fix StringIndexOutOfBoundsException in org.apache.pulsar.broker.resources.NamespaceResources#pathIsFromNamespace (apache#12659)

# Conflicts:
#	site2/website-next/versioned_sidebars/version-2.7.2-sidebars.json
  • Loading branch information
Yan Zhang committed Nov 9, 2021
2 parents c84b320 + f9d16ca commit ee04033
Show file tree
Hide file tree
Showing 69 changed files with 9,494 additions and 96 deletions.
1 change: 0 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ flexible messaging model and an intuitive client API.</description>
<j2objc-annotations.version>1.3</j2objc-annotations.version>
<lightproto-maven-plugin.version>0.4</lightproto-maven-plugin.version>
<dependency-check-maven.version>6.1.6</dependency-check-maven.version>
<modernizer-maven-plugin.version>2.3.0</modernizer-maven-plugin.version>

<!-- Used to configure rename.netty.native. Libs -->
<rename.netty.native.libs>rename-netty-native-libs.sh</rename.netty.native.libs>
Expand Down
21 changes: 21 additions & 0 deletions pulsar-broker-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,25 @@
<artifactId>jjwt-jackson</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.gaul</groupId>
<artifactId>modernizer-maven-plugin</artifactId>
<configuration>
<failOnViolations>true</failOnViolations>
<javaVersion>8</javaVersion>
</configuration>
<executions>
<execution>
<id>modernizer</id>
<goals>
<goal>modernizer</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
import lombok.Getter;
import lombok.Setter;
Expand Down Expand Up @@ -1000,7 +1001,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_SERVER,
doc = "List of broker interceptor to load, which is a list of broker interceptor names"
)
private Set<String> brokerInterceptors = Sets.newTreeSet();
private Set<String> brokerInterceptors = new TreeSet<>();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -1034,7 +1035,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_PROTOCOLS,
doc = "List of messaging protocols to load, which is a list of protocol names"
)
private Set<String> messagingProtocols = Sets.newTreeSet();
private Set<String> messagingProtocols = new TreeSet<>();

@FieldContext(
category = CATEGORY_SERVER,
Expand Down Expand Up @@ -1108,13 +1109,13 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Specify the tls protocols the broker will use to negotiate during TLS Handshake.\n\n"
+ "Example:- [TLSv1.3, TLSv1.2]"
)
private Set<String> tlsProtocols = Sets.newTreeSet();
private Set<String> tlsProtocols = new TreeSet<>();
@FieldContext(
category = CATEGORY_TLS,
doc = "Specify the tls cipher the broker will use to negotiate during TLS Handshake.\n\n"
+ "Example:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256]"
)
private Set<String> tlsCiphers = Sets.newTreeSet();
private Set<String> tlsCiphers = new TreeSet<>();
@FieldContext(
category = CATEGORY_TLS,
doc = "Specify whether Client certificates are required for TLS Reject.\n"
Expand All @@ -1131,7 +1132,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_AUTHENTICATION,
doc = "Authentication provider name list, which is a list of class names"
)
private Set<String> authenticationProviders = Sets.newTreeSet();
private Set<String> authenticationProviders = new TreeSet<>();

@FieldContext(
category = CATEGORY_AUTHENTICATION,
Expand All @@ -1156,14 +1157,14 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Role names that are treated as `super-user`, meaning they will be able to"
+ " do all admin operations and publish/consume from all topics"
)
private Set<String> superUserRoles = Sets.newTreeSet();
private Set<String> superUserRoles = new TreeSet<>();

@FieldContext(
category = CATEGORY_AUTHORIZATION,
doc = "Role names that are treated as `proxy roles`. \n\nIf the broker sees"
+ " a request with role as proxyRoles - it will demand to see the original"
+ " client role or certificate.")
private Set<String> proxyRoles = Sets.newTreeSet();
private Set<String> proxyRoles = new TreeSet<>();

@FieldContext(
category = CATEGORY_AUTHORIZATION,
Expand Down Expand Up @@ -2300,15 +2301,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "Examples:- [TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256].\n"
+ " used by the internal client to authenticate with Pulsar brokers"
)
private Set<String> brokerClientTlsCiphers = Sets.newTreeSet();
private Set<String> brokerClientTlsCiphers = new TreeSet<>();
@FieldContext(
category = CATEGORY_KEYSTORE_TLS,
doc = "Specify the tls protocols the broker will use to negotiate during TLS handshake"
+ " (a comma-separated list of protocol names).\n\n"
+ "Examples:- [TLSv1.3, TLSv1.2] \n"
+ " used by the internal client to authenticate with Pulsar brokers"
)
private Set<String> brokerClientTlsProtocols = Sets.newTreeSet();
private Set<String> brokerClientTlsProtocols = new TreeSet<>();

/* packages management service configurations (begin) */

Expand Down Expand Up @@ -2349,7 +2350,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
category = CATEGORY_PLUGIN,
doc = "List of broker additional servlet to load, which is a list of broker additional servlet names"
)
private Set<String> additionalServlets = Sets.newTreeSet();
private Set<String> additionalServlets = new TreeSet<>();

/**
* @deprecated See {@link #getConfigurationStoreServers}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class AuthenticationService implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(AuthenticationService.class);
private final String anonymousUserRole;

private final Map<String, AuthenticationProvider> providers = Maps.newHashMap();
private final Map<String, AuthenticationProvider> providers = new HashMap<>();

public AuthenticationService(ServiceConfiguration conf) throws PulsarServerException {
anonymousUserRole = conf.getAnonymousUserRole();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.broker.authorization;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import com.google.common.base.Function;
import java.io.IOException;
Expand Down Expand Up @@ -72,8 +73,8 @@ public PulsarAuthorizationProvider(ServiceConfiguration conf, PulsarResources re

@Override
public void initialize(ServiceConfiguration conf, PulsarResources pulsarResources) throws IOException {
checkNotNull(conf, "ServiceConfiguration can't be null");
checkNotNull(pulsarResources, "PulsarResources can't be null");
requireNonNull(conf, "ServiceConfiguration can't be null");
requireNonNull(pulsarResources, "PulsarResources can't be null");
this.conf = conf;
this.pulsarResources = pulsarResources;

Expand Down Expand Up @@ -567,6 +568,7 @@ public CompletableFuture<Boolean> allowTopicOperationAsync(TopicName topicName,
switch (operation) {
case LOOKUP:
case GET_STATS:
case GET_METADATA:
isAuthorizedFuture = canLookupAsync(topicName, role, authData);
break;
case PRODUCE:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ public CompletableFuture<Void> setPoliciesAsync(NamespaceName ns, Function<Polic
}

public static boolean pathIsFromNamespace(String path) {
return path.startsWith(BASE_POLICIES_PATH) && path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
return path.startsWith(BASE_POLICIES_PATH + "/")
&& path.substring(BASE_POLICIES_PATH.length() + 1).contains("/");
}

public static NamespaceName namespaceFromPath(String path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.pulsar.broker.resources;

import com.google.common.collect.Lists;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -82,7 +84,7 @@ public CompletableFuture<Boolean> tenantExistsAsync(String tenantName) {
}

public List<String> getListOfNamespaces(String tenant) throws MetadataStoreException {
List<String> namespaces = Lists.newArrayList();
List<String> namespaces = new ArrayList<>();

// this will return a cluster in v1 and a namespace in v2
for (String clusterOrNamespace : getChildren(joinPath(BASE_POLICIES_PATH, tenant))) {
Expand Down Expand Up @@ -121,7 +123,7 @@ public CompletableFuture<Void> hasActiveNamespace(String tenant) {
activeNamespaceFuture.complete(null);
return;
}
List<CompletableFuture<Void>> activeNamespaceListFuture = Lists.newArrayList();
List<CompletableFuture<Void>> activeNamespaceListFuture = new ArrayList<>();
clusterOrNamespaceList.forEach(clusterOrNamespace -> {
// get list of active V1 namespace
CompletableFuture<Void> checkNs = new CompletableFuture<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;

import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;

/**
* Validates multiple listener address configurations.
Expand All @@ -52,7 +55,7 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
return Collections.emptyMap();
}
Optional<String> firstListenerName = Optional.empty();
Map<String, List<String>> listeners = Maps.newLinkedHashMap();
Map<String, List<String>> listeners = new LinkedHashMap<>();
for (final String str : StringUtils.split(config.getAdvertisedListeners(), ",")) {
int index = str.indexOf(":");
if (index <= 0) {
Expand All @@ -64,7 +67,7 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
firstListenerName = Optional.of(listenerName);
}
String value = StringUtils.trim(str.substring(index + 1));
listeners.computeIfAbsent(listenerName, k -> Lists.newArrayListWithCapacity(2));
listeners.computeIfAbsent(listenerName, k -> new ArrayList<>(2));
listeners.get(listenerName).add(value);
}
if (StringUtils.isBlank(config.getInternalListenerName())) {
Expand All @@ -73,8 +76,8 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
if (!listeners.containsKey(config.getInternalListenerName())) {
throw new IllegalArgumentException("the `advertisedListeners` configure do not contain `internalListenerName` entry");
}
final Map<String, AdvertisedListener> result = Maps.newLinkedHashMap();
final Map<String, Set<String>> reverseMappings = Maps.newLinkedHashMap();
final Map<String, AdvertisedListener> result = new LinkedHashMap<>();
final Map<String, Set<String>> reverseMappings = new LinkedHashMap<>();
for (final Map.Entry<String, List<String>> entry : listeners.entrySet()) {
if (entry.getValue().size() > 2) {
throw new IllegalArgumentException("there are redundant configure for listener `" + entry.getKey() + "`");
Expand All @@ -97,7 +100,7 @@ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListe
}
}
String hostPort = String.format("%s:%d", uri.getHost(), uri.getPort());
Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> Sets.newTreeSet());
Set<String> sets = reverseMappings.computeIfAbsent(hostPort, k -> new TreeSet<>());
sets.add(entry.getKey());
if (sets.size() > 1) {
throw new IllegalArgumentException("must not specify `" + hostPort + "` to different listener.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.common.configuration;

import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.Objects.requireNonNull;
import static org.apache.pulsar.common.util.FieldParser.update;

import java.io.FileInputStream;
Expand Down Expand Up @@ -52,7 +53,7 @@ public class PulsarConfigurationLoader {
*/
public static <T extends PulsarConfiguration> T create(String configFile,
Class<? extends PulsarConfiguration> clazz) throws IOException, IllegalArgumentException {
checkNotNull(configFile);
requireNonNull(configFile);
try (InputStream inputStream = new FileInputStream(configFile)) {
return create(inputStream, clazz);
}
Expand All @@ -71,7 +72,7 @@ public static <T extends PulsarConfiguration> T create(String configFile,
public static <T extends PulsarConfiguration> T create(InputStream inStream,
Class<? extends PulsarConfiguration> clazz) throws IOException, IllegalArgumentException {
try {
checkNotNull(inStream);
requireNonNull(inStream);
Properties properties = new Properties();
properties.load(inStream);
return (create(properties, clazz));
Expand All @@ -92,7 +93,7 @@ public static <T extends PulsarConfiguration> T create(InputStream inStream,
@SuppressWarnings({ "rawtypes", "unchecked" })
public static <T extends PulsarConfiguration> T create(Properties properties,
Class<? extends PulsarConfiguration> clazz) throws IOException, IllegalArgumentException {
checkNotNull(properties);
requireNonNull(properties);
T configuration = null;
try {
configuration = (T) clazz.getDeclaredConstructor().newInstance();
Expand All @@ -117,7 +118,7 @@ public static <T extends PulsarConfiguration> T create(Properties properties,
* @throws IllegalAccessException
*/
public static boolean isComplete(Object obj) throws IllegalArgumentException {
checkNotNull(obj);
requireNonNull(obj);
Field[] fields = obj.getClass().getDeclaredFields();
StringBuilder error = new StringBuilder();
for (Field field : fields) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pulsar.broker.resources;

import org.junit.Assert;
import org.testng.annotations.Test;


public class NamespaceResourcesTest {
@Test
public void test_pathIsFromNamespace() {
Assert.assertFalse(NamespaceResources.pathIsFromNamespace("/admin/clusters"));
Assert.assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies"));
Assert.assertFalse(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant"));
Assert.assertTrue(NamespaceResources.pathIsFromNamespace("/admin/policies/my-tenant/my-ns"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,10 @@ public void testConfigurationConverting() throws Exception {
// check whether converting correctly
assertEquals(serviceConfiguration.getZookeeperServers(), "localhost:2181");
assertEquals(serviceConfiguration.getConfigurationStoreServers(), "localhost:2184");
assertEquals(serviceConfiguration.getBrokerServicePort().get(), new Integer(7650));
assertEquals(serviceConfiguration.getBrokerServicePortTls().get(), new Integer(7651));
assertEquals(serviceConfiguration.getWebServicePort().get(), new Integer(9080));
assertEquals(serviceConfiguration.getWebServicePortTls().get(), new Integer(9443));
assertEquals(serviceConfiguration.getBrokerServicePort().get(), Integer.valueOf(7650));
assertEquals(serviceConfiguration.getBrokerServicePortTls().get(), Integer.valueOf((7651)));
assertEquals(serviceConfiguration.getWebServicePort().get(), Integer.valueOf((9080)));
assertEquals(serviceConfiguration.getWebServicePortTls().get(), Integer.valueOf((9443)));

// check whether exception causes
try {
Expand Down Expand Up @@ -119,8 +119,8 @@ public void testPulsarConfiguraitonLoadingStream() throws Exception {
assertEquals(serviceConfig.getBacklogQuotaDefaultLimitGB(), 18);
assertEquals(serviceConfig.getClusterName(), "usc");
assertEquals(serviceConfig.getBrokerClientAuthenticationParameters(), "role:my-role");
assertEquals(serviceConfig.getBrokerServicePort().get(), new Integer(7777));
assertEquals(serviceConfig.getBrokerServicePortTls().get(), new Integer(8777));
assertEquals(serviceConfig.getBrokerServicePort().get(), Integer.valueOf((7777)));
assertEquals(serviceConfig.getBrokerServicePortTls().get(), Integer.valueOf((8777)));
assertFalse(serviceConfig.getWebServicePort().isPresent());
assertFalse(serviceConfig.getWebServicePortTls().isPresent());
assertEquals(serviceConfig.getManagedLedgerDigestType(), DigestType.CRC32C);
Expand Down Expand Up @@ -227,4 +227,4 @@ class TestInCompleteObjectMix {
@FieldContext(minValue = 1, maxValue = 3)
long inValidMax = 4;
}
}
}
Loading

0 comments on commit ee04033

Please sign in to comment.