Skip to content

Commit

Permalink
add TE properties loader (#578)
Browse files Browse the repository at this point in the history
  • Loading branch information
Andyz26 authored Nov 6, 2023
1 parent 7d098a8 commit d09cb9a
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 50 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.sampullara.cli.Args;
import com.sampullara.cli.Argument;
import io.mantisrx.common.properties.DefaultMantisPropertiesLoader;
import io.mantisrx.common.properties.MantisPropertiesLoader;
import io.mantisrx.runtime.loader.RuntimeTask;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
import io.mantisrx.server.agent.config.ConfigurationFactory;
Expand All @@ -44,12 +46,13 @@ public class AgentV2Main implements Service {
private final TaskExecutorStarter taskExecutorStarter;
private final AtomicBoolean stopping = new AtomicBoolean(false);

public AgentV2Main(ConfigurationFactory configFactory) throws Exception {
public AgentV2Main(ConfigurationFactory configFactory, MantisPropertiesLoader propertiesLoader) throws Exception {
WorkerConfiguration workerConfiguration = configFactory.getConfig();

this.taskExecutorStarter =
TaskExecutorStarter.builder(workerConfiguration)
.taskFactory(new SingleTaskOnlyFactory())
.propertiesLoader(propertiesLoader)
.addListener(
new TaskExecutor.Listener() {
@Override
Expand Down Expand Up @@ -103,7 +106,8 @@ public static void main(String[] args) {
props.putAll(System.getProperties());
props.putAll(loadProperties(propFile));
StaticPropertiesConfigurationFactory factory = new StaticPropertiesConfigurationFactory(props);
AgentV2Main agent = new AgentV2Main(factory);
DefaultMantisPropertiesLoader propertiesLoader = new DefaultMantisPropertiesLoader(props);
AgentV2Main agent = new AgentV2Main(factory, propertiesLoader);
agent.start(); // blocks until shutdown hook (ctrl-c)
} catch (Exception e) {
// unexpected to get a RuntimeException, will exit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.mantisrx.common.JsonSerializer;
import io.mantisrx.common.WorkerPorts;
import io.mantisrx.common.metrics.netty.MantisNettyEventsListenerFactory;
import io.mantisrx.common.properties.DefaultMantisPropertiesLoader;
import io.mantisrx.common.properties.MantisPropertiesLoader;
import io.mantisrx.runtime.MachineDefinition;
import io.mantisrx.runtime.MantisJobState;
import io.mantisrx.runtime.loader.ClassLoaderHandle;
Expand Down Expand Up @@ -91,6 +93,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
@Getter
private final ClusterID clusterID;
private final WorkerConfiguration workerConfiguration;
private final MantisPropertiesLoader dynamicPropertiesLoader;
private final HighAvailabilityServices highAvailabilityServices;
private final ClassLoaderHandle classLoaderHandle;
private final TaskExecutorRegistration taskExecutorRegistration;
Expand Down Expand Up @@ -121,12 +124,18 @@ public TaskExecutor(
WorkerConfiguration workerConfiguration,
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle) {
this(rpcService, workerConfiguration, highAvailabilityServices, classLoaderHandle, null);
this(rpcService,
workerConfiguration,
new DefaultMantisPropertiesLoader(System.getProperties()),
highAvailabilityServices,
classLoaderHandle,
null);
}

public TaskExecutor(
RpcService rpcService,
WorkerConfiguration workerConfiguration,
MantisPropertiesLoader propertiesLoader,
HighAvailabilityServices highAvailabilityServices,
ClassLoaderHandle classLoaderHandle,
@Nullable TaskFactory taskFactory) {
Expand All @@ -139,6 +148,7 @@ public TaskExecutor(
.orElseGet(TaskExecutorID::generate);
this.clusterID = ClusterID.of(workerConfiguration.getClusterId());
this.workerConfiguration = workerConfiguration;
this.dynamicPropertiesLoader = propertiesLoader;
this.highAvailabilityServices = highAvailabilityServices;
this.classLoaderHandle = classLoaderHandle;
WorkerPorts workerPorts = new WorkerPorts(workerConfiguration.getMetricsPort(),
Expand Down Expand Up @@ -309,6 +319,7 @@ private ResourceManagerGatewayCxn newResourceManagerCxn() {
ResourceClusterGateway resourceManagerGateway = resourceClusterGatewaySupplier.getCurrent();

// let's register ourselves with the resource manager
// todo: move timeout/retry to apply values from this.dynamicPropertiesLoader
return new ResourceManagerGatewayCxn(
resourceManagerCxnIdx++,
taskExecutorRegistration,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.mantisrx.server.agent;

import com.mantisrx.common.utils.Services;
import io.mantisrx.common.properties.MantisPropertiesLoader;
import io.mantisrx.runtime.loader.ClassLoaderHandle;
import io.mantisrx.runtime.loader.TaskFactory;
import io.mantisrx.runtime.loader.config.WorkerConfiguration;
Expand Down Expand Up @@ -96,6 +97,7 @@ public static TaskExecutorStarterBuilder builder(WorkerConfiguration workerConfi
public static class TaskExecutorStarterBuilder {
private final WorkerConfiguration workerConfiguration;
private Configuration configuration;
private MantisPropertiesLoader propertiesLoader;
@Nullable
private RpcSystem rpcSystem;
@Nullable
Expand Down Expand Up @@ -179,11 +181,17 @@ public TaskExecutorStarterBuilder addListener(TaskExecutor.Listener listener, Ex
return this;
}

public TaskExecutorStarterBuilder propertiesLoader(MantisPropertiesLoader propertiesLoader) {
this.propertiesLoader = propertiesLoader;
return this;
}

public TaskExecutorStarter build() throws Exception {
final TaskExecutor taskExecutor =
new TaskExecutor(
getRpcService(),
workerConfiguration,
Preconditions.checkNotNull(workerConfiguration, "WorkerConfiguration for TaskExecutor is null"),
Preconditions.checkNotNull(propertiesLoader, "propertiesLoader for TaskExecutor is null"),
highAvailabilityServices,
getClassLoaderHandle(),
this.taskFactory);
Expand Down

0 comments on commit d09cb9a

Please sign in to comment.