Skip to content

Commit

Permalink
Introduce a new CoordinatorPlugin interface for native clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabre12 authored and Pratik Joseph Dabre committed Jul 23, 2024
1 parent 5ed9da6 commit 0f7141b
Show file tree
Hide file tree
Showing 10 changed files with 181 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package com.facebook.presto.server;

import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -42,11 +43,10 @@
final class PluginDiscovery
{
private static final String CLASS_FILE_SUFFIX = ".class";
private static final String SERVICES_FILE = "META-INF/services/" + Plugin.class.getName();

private PluginDiscovery() {}

public static Set<String> discoverPlugins(Artifact artifact, ClassLoader classLoader)
public static Set<String> discoverPlugins(Artifact artifact, ClassLoader classLoader, String servicesFile, boolean isNativeExecutionEnabled)
throws IOException
{
if (!artifact.getExtension().equals("presto-plugin")) {
Expand All @@ -61,19 +61,22 @@ public static Set<String> discoverPlugins(Artifact artifact, ClassLoader classLo
throw new RuntimeException("Main artifact file is not a directory: " + file);
}

if (new File(file, SERVICES_FILE).exists()) {
if (new File(file, servicesFile).exists()) {
return ImmutableSet.of();
}

String targetClassName = isNativeExecutionEnabled ? CoordinatorPlugin.class.getName() :
Plugin.class.getName();

return listClasses(file.toPath()).stream()
.filter(name -> classInterfaces(name, classLoader).contains(Plugin.class.getName()))
.filter(name -> classInterfaces(name, classLoader).contains(targetClassName))
.collect(toImmutableSet());
}

public static void writePluginServices(Iterable<String> plugins, File root)
public static void writePluginServices(Iterable<String> plugins, File root, String servicesFile)
throws IOException
{
Path path = root.toPath().resolve(SERVICES_FILE);
Path path = root.toPath().resolve(servicesFile);
createDirectories(path.getParent());
try (Writer out = Files.newBufferedWriter(path, UTF_8)) {
for (String plugin : plugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.server.security.PasswordAuthenticatorManager;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.analyzer.AnalyzerProvider;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
Expand All @@ -45,6 +46,7 @@
import com.facebook.presto.spi.ttl.ClusterTtlProviderFactory;
import com.facebook.presto.spi.ttl.NodeTtlFetcherFactory;
import com.facebook.presto.sql.analyzer.AnalyzerProviderManager;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.storage.TempStorageManager;
import com.facebook.presto.tracing.TracerProviderManager;
import com.facebook.presto.ttl.clusterttlprovidermanagers.ClusterTtlProviderManager;
Expand Down Expand Up @@ -125,6 +127,8 @@ public class PluginManager
private final TracerProviderManager tracerProviderManager;
private final AnalyzerProviderManager analyzerProviderManager;
private final NodeStatusNotificationManager nodeStatusNotificationManager;
private final String servicesFile;
private final boolean isNativeExecutionEnabled;

@Inject
public PluginManager(
Expand All @@ -145,7 +149,8 @@ public PluginManager(
ClusterTtlProviderManager clusterTtlProviderManager,
HistoryBasedPlanStatisticsManager historyBasedPlanStatisticsManager,
TracerProviderManager tracerProviderManager,
NodeStatusNotificationManager nodeStatusNotificationManager)
NodeStatusNotificationManager nodeStatusNotificationManager,
FeaturesConfig featuresConfig)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
Expand Down Expand Up @@ -176,6 +181,9 @@ public PluginManager(
this.tracerProviderManager = requireNonNull(tracerProviderManager, "tracerProviderManager is null");
this.analyzerProviderManager = requireNonNull(analyzerProviderManager, "analyzerProviderManager is null");
this.nodeStatusNotificationManager = requireNonNull(nodeStatusNotificationManager, "nodeStatusNotificationManager is null");
this.isNativeExecutionEnabled = featuresConfig.isNativeExecutionEnabled();
this.servicesFile = "META-INF/services/" + (isNativeExecutionEnabled ?
CoordinatorPlugin.class.getName() : Plugin.class.getName());
}

public void loadPlugins()
Expand Down Expand Up @@ -206,23 +214,32 @@ private void loadPlugin(String plugin)
log.info("-- Loading plugin %s --", plugin);
URLClassLoader pluginClassLoader = buildClassLoader(plugin);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
loadPlugin(pluginClassLoader);
Class<?> pluginClass = isNativeExecutionEnabled ? CoordinatorPlugin.class : Plugin.class;
loadPlugin(pluginClassLoader, pluginClass);
}
log.info("-- Finished loading plugin %s --", plugin);
}

private void loadPlugin(URLClassLoader pluginClassLoader)
private void loadPlugin(URLClassLoader pluginClassLoader, Class<?> clazz)
{
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
ServiceLoader<?> serviceLoader = ServiceLoader.load(clazz, pluginClassLoader);
List<?> plugins = ImmutableList.copyOf(serviceLoader);

if (plugins.isEmpty()) {
log.warn("No service providers of type %s", Plugin.class.getName());
log.warn("No service providers of type %s", clazz.getName());
}

for (Plugin plugin : plugins) {
for (Object plugin : plugins) {
log.info("Installing %s", plugin.getClass().getName());
installPlugin(plugin);
if (plugin instanceof Plugin) {
installPlugin((Plugin) plugin);
}
else if (plugin instanceof CoordinatorPlugin) {
installCoordinatorPlugin((CoordinatorPlugin) plugin);
}
else {
log.warn("Unknown plugin type: %s", plugin.getClass().getName());
}
}
}

Expand Down Expand Up @@ -328,6 +345,14 @@ public void installPlugin(Plugin plugin)
}
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
for (FunctionNamespaceManagerFactory functionNamespaceManagerFactory : plugin.getFunctionNamespaceManagerFactories()) {
log.info("Registering function namespace manager %s", functionNamespaceManagerFactory.getName());
metadata.getFunctionAndTypeManager().addFunctionNamespaceFactory(functionNamespaceManagerFactory);
}
}

private URLClassLoader buildClassLoader(String plugin)
throws Exception
{
Expand All @@ -348,9 +373,9 @@ private URLClassLoader buildClassLoaderFromPom(File pomFile)
URLClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath());

Artifact artifact = artifacts.get(0);
Set<String> plugins = discoverPlugins(artifact, classLoader);
Set<String> plugins = discoverPlugins(artifact, classLoader, servicesFile, isNativeExecutionEnabled);
if (!plugins.isEmpty()) {
writePluginServices(plugins, artifact.getFile());
writePluginServices(plugins, artifact.getFile(), servicesFile);
}

return classLoader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.facebook.presto.server.ShutdownAction;
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.eventlistener.EventListener;
Expand Down Expand Up @@ -475,11 +476,20 @@ public void close()
}
}

public PluginManager getPluginManager()
{
return pluginManager;
}
public void installPlugin(Plugin plugin)
{
pluginManager.installPlugin(plugin);
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
pluginManager.installCoordinatorPlugin(plugin);
}

public DispatchManager getDispatchManager()
{
return dispatchManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
new ThrowingClusterTtlProviderManager(),
historyBasedPlanStatisticsManager,
new TracerProviderManager(new TracingConfig()),
new NodeStatusNotificationManager());
new NodeStatusNotificationManager(),
featuresConfig);

connectorManager.addConnectorFactory(globalSystemConnectorFactory);
connectorManager.createConnection(GlobalSystemConnector.NAME, GlobalSystemConnector.NAME, ImmutableMap.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.eventlistener.EventListener;
Expand Down Expand Up @@ -84,6 +85,11 @@ default Plan createPlan(Session session, @Language("SQL") String sql, WarningCol

void installPlugin(Plugin plugin);

default void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
throw new UnsupportedOperationException();
}

void createCatalog(String catalogName, String connectorName, Map<String, String> properties);

void loadFunctionNamespaceManager(String functionNamespaceManagerName, String catalogName, Map<String, String> properties);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi;

import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;

import static java.util.Collections.emptyList;

public interface CoordinatorPlugin
{
default Iterable<FunctionNamespaceManagerFactory> getFunctionNamespaceManagerFactories()
{
return emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.NodePoolType;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.Plugin;
Expand Down Expand Up @@ -586,6 +587,12 @@ public void installPlugin(Plugin plugin)
installPlugin(plugin, false);
}

@Override
public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
installCoordinatorPlugin(plugin, false);
}

public void createCatalog(String catalogName, String connectorName)
{
createCatalog(catalogName, connectorName, ImmutableMap.of());
Expand Down Expand Up @@ -852,6 +859,18 @@ private void installPlugin(Plugin plugin, boolean coordinatorOnly)
log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}

private void installCoordinatorPlugin(CoordinatorPlugin plugin, boolean coordinatorOnly)
{
long start = nanoTime();
for (TestingPrestoServer server : servers) {
if (coordinatorOnly && !server.isCoordinator()) {
continue;
}
server.installCoordinatorPlugin(plugin);
}
log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}

private static void closeUnchecked(AutoCloseable closeable)
{
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;

import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;

public class TestingCoordinatorPlugin
implements CoordinatorPlugin
{
@Override
public Iterable<FunctionNamespaceManagerFactory> getFunctionNamespaceManagerFactories()
{
throw new RuntimeException(String.format("This exception confirms that %s was called.", this.getClass().getSimpleName()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.facebook.presto.tests.TestingCoordinatorPlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

public class TestCoordinatorPluginIntegration
{
private final TestingPrestoServer testingPrestoServer;

private TestCoordinatorPluginIntegration()
throws Exception
{
this.testingPrestoServer = new TestingPrestoServer(
true,
ImmutableMap.<String, String>builder()
.put("native-execution-enabled", "true")
.put("plugin.bundles", "../presto-tests/target/")
.build(),
null, null, new SqlParserOptions(), ImmutableList.of());
}

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp =
"This exception confirms that TestingCoordinatorPlugin was called.")
public void testCoordinatorPluginIntegration()
throws Exception
{
PluginManager pluginManager = testingPrestoServer.getPluginManager();
pluginManager.loadPlugins();
}
}

0 comments on commit 0f7141b

Please sign in to comment.