Skip to content

Commit

Permalink
[improve][broker] Make cluster metadata init command support metadata…
Browse files Browse the repository at this point in the history
… config path (apache#23269)
  • Loading branch information
Demogorgon314 authored Sep 9, 2024
1 parent ca0fb44 commit 46f99b9
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,15 @@ private static class Arguments {
hidden = false)
private String configurationMetadataStore;

@Option(names = {"-mscp",
"--metadata-store-config-path"}, description = "Metadata Store config path", hidden = false)
private String metadataStoreConfigPath;

@Option(names = {"-cmscp",
"--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path",
hidden = false)
private String configurationStoreConfigPath;

@Option(names = {
"--initial-num-stream-storage-containers"
}, description = "Num storage containers of BookKeeper stream storage")
Expand Down Expand Up @@ -283,9 +292,11 @@ private static void initializeCluster(Arguments arguments, int bundleNumberForDe
log.info("Setting up cluster {} with metadata-store={} configuration-metadata-store={}", arguments.cluster,
arguments.metadataStoreUrl, arguments.configurationMetadataStore);

MetadataStoreExtended localStore =
initLocalMetadataStore(arguments.metadataStoreUrl, arguments.zkSessionTimeoutMillis);
MetadataStoreExtended localStore = initLocalMetadataStore(arguments.metadataStoreUrl,
arguments.metadataStoreConfigPath,
arguments.zkSessionTimeoutMillis);
MetadataStoreExtended configStore = initConfigMetadataStore(arguments.configurationMetadataStore,
arguments.configurationStoreConfigPath,
arguments.zkSessionTimeoutMillis);

final String metadataStoreUrlNoIdentifer = MetadataStoreFactoryImpl
Expand Down Expand Up @@ -464,9 +475,17 @@ static void createPartitionedTopic(MetadataStore configStore, TopicName topicNam
}
}

public static MetadataStoreExtended initLocalMetadataStore(String connection, int sessionTimeout) throws Exception {
public static MetadataStoreExtended initLocalMetadataStore(String connection,
int sessionTimeout) throws Exception {
return initLocalMetadataStore(connection, null, sessionTimeout);
}

public static MetadataStoreExtended initLocalMetadataStore(String connection,
String configPath,
int sessionTimeout) throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
.configFilePath(configPath)
.metadataStoreName(MetadataStoreConfig.METADATA_STORE)
.build());
if (store instanceof MetadataStoreLifecycle) {
Expand All @@ -475,10 +494,19 @@ public static MetadataStoreExtended initLocalMetadataStore(String connection, in
return store;
}

public static MetadataStoreExtended initConfigMetadataStore(String connection, int sessionTimeout)
public static MetadataStoreExtended initConfigMetadataStore(String connection,
int sessionTimeout)
throws Exception {
return initConfigMetadataStore(connection, null, sessionTimeout);
}

public static MetadataStoreExtended initConfigMetadataStore(String connection,
String configPath,
int sessionTimeout)
throws Exception {
MetadataStoreExtended store = MetadataStoreExtended.create(connection, MetadataStoreConfig.builder()
.sessionTimeoutMillis(sessionTimeout)
.configFilePath(configPath)
.metadataStoreName(MetadataStoreConfig.CONFIGURATION_METADATA_STORE)
.build());
if (store instanceof MetadataStoreLifecycle) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ private static class Arguments {
"--configuration-store" }, description = "Configuration Store connection string", required = true)
private String configurationStore;

@Option(names = {"-cmscp",
"--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path",
hidden = false)
private String configurationStoreConfigPath;

@Option(names = {
"--zookeeper-session-timeout-ms"
}, description = "Local zookeeper session timeout ms")
Expand Down Expand Up @@ -85,8 +90,10 @@ public static int doMain(String[] args) throws Exception {
return 1;
}

try (MetadataStore configStore = PulsarClusterMetadataSetup
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
try (MetadataStore configStore = PulsarClusterMetadataSetup.initConfigMetadataStore(
arguments.configurationStore,
arguments.configurationStoreConfigPath,
arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
for (String namespace : arguments.namespaces) {
NamespaceName namespaceName = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ private static class Arguments {
"--configuration-store" }, description = "Configuration Store connection string", required = true)
private String configurationStore;

@Option(names = {"-cmscp",
"--configuration-metadata-store-config-path"}, description = "Configuration Metadata Store config path",
hidden = false)
private String configurationStoreConfigPath;

@Option(names = {
"--zookeeper-session-timeout-ms"
}, description = "Local zookeeper session timeout ms")
Expand Down Expand Up @@ -92,8 +97,10 @@ public static void main(String[] args) throws Exception {
System.exit(1);
}

try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup
.initConfigMetadataStore(arguments.configurationStore, arguments.zkSessionTimeoutMillis)) {
try (MetadataStoreExtended configStore = PulsarClusterMetadataSetup.initConfigMetadataStore(
arguments.configurationStore,
arguments.configurationStoreConfigPath,
arguments.zkSessionTimeoutMillis)) {
PulsarResources pulsarResources = new PulsarResources(null, configStore);
// Create system tenant
PulsarClusterMetadataSetup
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,11 @@ public void testReSetupClusterMetadata() throws Exception {
"--cluster", "testReSetupClusterMetadata-cluster",
"--zookeeper", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-store", "127.0.0.1:" + localZkS.getZookeeperPort(),
"--configuration-metadata-store-config-path", "src/test/resources/conf/zk_client_enable_sasl.conf",
"--web-service-url", "http://127.0.0.1:8080",
"--web-service-url-tls", "https://127.0.0.1:8443",
"--broker-service-url", "pulsar://127.0.0.1:6650",
"--broker-service-url-tls","pulsar+ssl://127.0.0.1:6651"
"--broker-service-url-tls", "pulsar+ssl://127.0.0.1:6651"
};
PulsarClusterMetadataSetup.main(args);
SortedMap<String, String> data1 = localZkS.dumpData();
Expand Down
20 changes: 20 additions & 0 deletions pulsar-broker/src/test/resources/conf/zk_client_enable_sasl.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

zookeeper.sasl.client=true

0 comments on commit 46f99b9

Please sign in to comment.