From d09cb9aea5ac759731c34bfb1a59e591a33bbfe1 Mon Sep 17 00:00:00 2001 From: Andy Zhang <87735571+Andyz26@users.noreply.github.com> Date: Mon, 6 Nov 2023 09:53:57 -0800 Subject: [PATCH] add TE properties loader (#578) --- .../properties/MantisPropertiesService.java | 46 ------------------- .../io/mantisrx/server/agent/AgentV2Main.java | 8 +++- .../mantisrx/server/agent/TaskExecutor.java | 13 +++++- .../server/agent/TaskExecutorStarter.java | 10 +++- 4 files changed, 27 insertions(+), 50 deletions(-) delete mode 100644 mantis-common/src/main/java/io/mantisrx/common/properties/MantisPropertiesService.java diff --git a/mantis-common/src/main/java/io/mantisrx/common/properties/MantisPropertiesService.java b/mantis-common/src/main/java/io/mantisrx/common/properties/MantisPropertiesService.java deleted file mode 100644 index e43b968d0..000000000 --- a/mantis-common/src/main/java/io/mantisrx/common/properties/MantisPropertiesService.java +++ /dev/null @@ -1,46 +0,0 @@ -/* - * Copyright 2019 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package io.mantisrx.common.properties; - -public class MantisPropertiesService implements MantisPropertiesLoader { - - private MantisPropertiesLoader mantisPropertiesLoader; - - public MantisPropertiesService(MantisPropertiesLoader mantisProps) { - this.mantisPropertiesLoader = mantisProps; - } - - @Override - public void initalize() { - - mantisPropertiesLoader.initalize(); - - } - - @Override - public String getStringValue(String name, String defaultVal) { - - return mantisPropertiesLoader.getStringValue(name, defaultVal); - } - - @Override - public void shutdown() { - // TODO Auto-generated method stub - - } - -} diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/AgentV2Main.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/AgentV2Main.java index bb5d87a93..44b055af9 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/AgentV2Main.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/AgentV2Main.java @@ -18,6 +18,8 @@ import com.sampullara.cli.Args; import com.sampullara.cli.Argument; +import io.mantisrx.common.properties.DefaultMantisPropertiesLoader; +import io.mantisrx.common.properties.MantisPropertiesLoader; import io.mantisrx.runtime.loader.RuntimeTask; import io.mantisrx.runtime.loader.config.WorkerConfiguration; import io.mantisrx.server.agent.config.ConfigurationFactory; @@ -44,12 +46,13 @@ public class AgentV2Main implements Service { private final TaskExecutorStarter taskExecutorStarter; private final AtomicBoolean stopping = new AtomicBoolean(false); - public AgentV2Main(ConfigurationFactory configFactory) throws Exception { + public AgentV2Main(ConfigurationFactory configFactory, MantisPropertiesLoader propertiesLoader) throws Exception { WorkerConfiguration workerConfiguration = configFactory.getConfig(); this.taskExecutorStarter = TaskExecutorStarter.builder(workerConfiguration) .taskFactory(new SingleTaskOnlyFactory()) + .propertiesLoader(propertiesLoader) .addListener( new TaskExecutor.Listener() { @Override @@ -103,7 +106,8 @@ public static void main(String[] args) { props.putAll(System.getProperties()); props.putAll(loadProperties(propFile)); StaticPropertiesConfigurationFactory factory = new StaticPropertiesConfigurationFactory(props); - AgentV2Main agent = new AgentV2Main(factory); + DefaultMantisPropertiesLoader propertiesLoader = new DefaultMantisPropertiesLoader(props); + AgentV2Main agent = new AgentV2Main(factory, propertiesLoader); agent.start(); // blocks until shutdown hook (ctrl-c) } catch (Exception e) { // unexpected to get a RuntimeException, will exit diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java index bf8cf7d12..ea121d1d0 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutor.java @@ -22,6 +22,8 @@ import io.mantisrx.common.JsonSerializer; import io.mantisrx.common.WorkerPorts; import io.mantisrx.common.metrics.netty.MantisNettyEventsListenerFactory; +import io.mantisrx.common.properties.DefaultMantisPropertiesLoader; +import io.mantisrx.common.properties.MantisPropertiesLoader; import io.mantisrx.runtime.MachineDefinition; import io.mantisrx.runtime.MantisJobState; import io.mantisrx.runtime.loader.ClassLoaderHandle; @@ -91,6 +93,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Getter private final ClusterID clusterID; private final WorkerConfiguration workerConfiguration; + private final MantisPropertiesLoader dynamicPropertiesLoader; private final HighAvailabilityServices highAvailabilityServices; private final ClassLoaderHandle classLoaderHandle; private final TaskExecutorRegistration taskExecutorRegistration; @@ -121,12 +124,18 @@ public TaskExecutor( WorkerConfiguration workerConfiguration, HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle) { - this(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle, null); + this(rpcService, + workerConfiguration, + new DefaultMantisPropertiesLoader(System.getProperties()), + highAvailabilityServices, + classLoaderHandle, + null); } public TaskExecutor( RpcService rpcService, WorkerConfiguration workerConfiguration, + MantisPropertiesLoader propertiesLoader, HighAvailabilityServices highAvailabilityServices, ClassLoaderHandle classLoaderHandle, @Nullable TaskFactory taskFactory) { @@ -139,6 +148,7 @@ public TaskExecutor( .orElseGet(TaskExecutorID::generate); this.clusterID = ClusterID.of(workerConfiguration.getClusterId()); this.workerConfiguration = workerConfiguration; + this.dynamicPropertiesLoader = propertiesLoader; this.highAvailabilityServices = highAvailabilityServices; this.classLoaderHandle = classLoaderHandle; WorkerPorts workerPorts = new WorkerPorts(workerConfiguration.getMetricsPort(), @@ -309,6 +319,7 @@ private ResourceManagerGatewayCxn newResourceManagerCxn() { ResourceClusterGateway resourceManagerGateway = resourceClusterGatewaySupplier.getCurrent(); // let's register ourselves with the resource manager + // todo: move timeout/retry to apply values from this.dynamicPropertiesLoader return new ResourceManagerGatewayCxn( resourceManagerCxnIdx++, taskExecutorRegistration, diff --git a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java index 00fb39e4a..26a44d25c 100644 --- a/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java +++ b/mantis-server/mantis-server-agent/src/main/java/io/mantisrx/server/agent/TaskExecutorStarter.java @@ -17,6 +17,7 @@ package io.mantisrx.server.agent; import com.mantisrx.common.utils.Services; +import io.mantisrx.common.properties.MantisPropertiesLoader; import io.mantisrx.runtime.loader.ClassLoaderHandle; import io.mantisrx.runtime.loader.TaskFactory; import io.mantisrx.runtime.loader.config.WorkerConfiguration; @@ -96,6 +97,7 @@ public static TaskExecutorStarterBuilder builder(WorkerConfiguration workerConfi public static class TaskExecutorStarterBuilder { private final WorkerConfiguration workerConfiguration; private Configuration configuration; + private MantisPropertiesLoader propertiesLoader; @Nullable private RpcSystem rpcSystem; @Nullable @@ -179,11 +181,17 @@ public TaskExecutorStarterBuilder addListener(TaskExecutor.Listener listener, Ex return this; } + public TaskExecutorStarterBuilder propertiesLoader(MantisPropertiesLoader propertiesLoader) { + this.propertiesLoader = propertiesLoader; + return this; + } + public TaskExecutorStarter build() throws Exception { final TaskExecutor taskExecutor = new TaskExecutor( getRpcService(), - workerConfiguration, + Preconditions.checkNotNull(workerConfiguration, "WorkerConfiguration for TaskExecutor is null"), + Preconditions.checkNotNull(propertiesLoader, "propertiesLoader for TaskExecutor is null"), highAvailabilityServices, getClassLoaderHandle(), this.taskFactory);