Skip to content

Commit

Permalink
Introduce a new CoordinatorPlugin interface for native clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
pdabre12 authored and Pratik Joseph Dabre committed Aug 1, 2024
1 parent 5ed9da6 commit 9466f35
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package com.facebook.presto.server;

import com.facebook.presto.spi.Plugin;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import org.objectweb.asm.ClassReader;
Expand Down Expand Up @@ -42,11 +41,10 @@
final class PluginDiscovery
{
private static final String CLASS_FILE_SUFFIX = ".class";
private static final String SERVICES_FILE = "META-INF/services/" + Plugin.class.getName();

private PluginDiscovery() {}

public static Set<String> discoverPlugins(Artifact artifact, ClassLoader classLoader)
public static Set<String> discoverPlugins(Artifact artifact, ClassLoader classLoader, String servicesFile, String targetClassName)
throws IOException
{
if (!artifact.getExtension().equals("presto-plugin")) {
Expand All @@ -61,19 +59,19 @@ public static Set<String> discoverPlugins(Artifact artifact, ClassLoader classLo
throw new RuntimeException("Main artifact file is not a directory: " + file);
}

if (new File(file, SERVICES_FILE).exists()) {
if (new File(file, servicesFile).exists()) {
return ImmutableSet.of();
}

return listClasses(file.toPath()).stream()
.filter(name -> classInterfaces(name, classLoader).contains(Plugin.class.getName()))
.filter(name -> classInterfaces(name, classLoader).contains(targetClassName))
.collect(toImmutableSet());
}

public static void writePluginServices(Iterable<String> plugins, File root)
public static void writePluginServices(Iterable<String> plugins, File root, String servicesFile)
throws IOException
{
Path path = root.toPath().resolve(SERVICES_FILE);
Path path = root.toPath().resolve(servicesFile);
createDirectories(path.getParent());
try (Writer out = Files.newBufferedWriter(path, UTF_8)) {
for (String plugin : plugins) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.security.AccessControlManager;
import com.facebook.presto.server.security.PasswordAuthenticatorManager;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.analyzer.AnalyzerProvider;
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
Expand Down Expand Up @@ -101,8 +102,10 @@ public class PluginManager
.add("com.facebook.drift.TApplicationException")
.build();

// TODO: To make CoordinatorPlugin loading compulsory when native execution is enabled.
private static final String COORDINATOR_PLUGIN_SERVICES_FILE = "META-INF/services/" + CoordinatorPlugin.class.getName();
private static final String PLUGIN_SERVICES_FILE = "META-INF/services/" + Plugin.class.getName();
private static final Logger log = Logger.get(PluginManager.class);

private final ConnectorManager connectorManager;
private final Metadata metadata;
private final ResourceGroupManager<?> resourceGroupManager;
Expand Down Expand Up @@ -206,23 +209,32 @@ private void loadPlugin(String plugin)
log.info("-- Loading plugin %s --", plugin);
URLClassLoader pluginClassLoader = buildClassLoader(plugin);
try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) {
loadPlugin(pluginClassLoader);
loadPlugin(pluginClassLoader, CoordinatorPlugin.class);
loadPlugin(pluginClassLoader, Plugin.class);
}
log.info("-- Finished loading plugin %s --", plugin);
}

private void loadPlugin(URLClassLoader pluginClassLoader)
private void loadPlugin(URLClassLoader pluginClassLoader, Class<?> clazz)
{
ServiceLoader<Plugin> serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader);
List<Plugin> plugins = ImmutableList.copyOf(serviceLoader);
ServiceLoader<?> serviceLoader = ServiceLoader.load(clazz, pluginClassLoader);
List<?> plugins = ImmutableList.copyOf(serviceLoader);

if (plugins.isEmpty()) {
log.warn("No service providers of type %s", Plugin.class.getName());
log.warn("No service providers of type %s", clazz.getName());
}

for (Plugin plugin : plugins) {
for (Object plugin : plugins) {
log.info("Installing %s", plugin.getClass().getName());
installPlugin(plugin);
if (plugin instanceof Plugin) {
installPlugin((Plugin) plugin);
}
else if (plugin instanceof CoordinatorPlugin) {
installCoordinatorPlugin((CoordinatorPlugin) plugin);
}
else {
log.warn("Unknown plugin type: %s", plugin.getClass().getName());
}
}
}

Expand Down Expand Up @@ -328,6 +340,14 @@ public void installPlugin(Plugin plugin)
}
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
for (FunctionNamespaceManagerFactory functionNamespaceManagerFactory : plugin.getFunctionNamespaceManagerFactories()) {
log.info("Registering function namespace manager %s", functionNamespaceManagerFactory.getName());
metadata.getFunctionAndTypeManager().addFunctionNamespaceFactory(functionNamespaceManagerFactory);
}
}

private URLClassLoader buildClassLoader(String plugin)
throws Exception
{
Expand All @@ -348,10 +368,9 @@ private URLClassLoader buildClassLoaderFromPom(File pomFile)
URLClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath());

Artifact artifact = artifacts.get(0);
Set<String> plugins = discoverPlugins(artifact, classLoader);
if (!plugins.isEmpty()) {
writePluginServices(plugins, artifact.getFile());
}

processPlugins(artifact, classLoader, COORDINATOR_PLUGIN_SERVICES_FILE, CoordinatorPlugin.class.getName());
processPlugins(artifact, classLoader, PLUGIN_SERVICES_FILE, Plugin.class.getName());

return classLoader;
}
Expand Down Expand Up @@ -416,4 +435,13 @@ private static List<Artifact> sortedArtifacts(List<Artifact> artifacts)
Collections.sort(list, Ordering.natural().nullsLast().onResultOf(Artifact::getFile));
return list;
}

private void processPlugins(Artifact artifact, ClassLoader classLoader, String servicesFile, String className)
throws IOException
{
Set<String> plugins = discoverPlugins(artifact, classLoader, servicesFile, className);
if (!plugins.isEmpty()) {
writePluginServices(plugins, artifact.getFile(), servicesFile);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import com.facebook.presto.server.ShutdownAction;
import com.facebook.presto.server.security.ServerSecurityModule;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.spi.eventlistener.EventListener;
Expand Down Expand Up @@ -475,11 +476,20 @@ public void close()
}
}

public PluginManager getPluginManager()
{
return pluginManager;
}
public void installPlugin(Plugin plugin)
{
pluginManager.installPlugin(plugin);
}

public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
pluginManager.installCoordinatorPlugin(plugin);
}

public DispatchManager getDispatchManager()
{
return dispatchManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.common.type.Type;
import com.facebook.presto.cost.StatsCalculator;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.Plugin;
import com.facebook.presto.spi.WarningCollector;
import com.facebook.presto.spi.eventlistener.EventListener;
Expand Down Expand Up @@ -84,6 +85,11 @@ default Plan createPlan(Session session, @Language("SQL") String sql, WarningCol

void installPlugin(Plugin plugin);

default void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
throw new UnsupportedOperationException();
}

void createCatalog(String catalogName, String connectorName, Map<String, String> properties);

void loadFunctionNamespaceManager(String functionNamespaceManagerName, String catalogName, Map<String, String> properties);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spi;

import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;

import static java.util.Collections.emptyList;

/**
* Introduces a new {@link CoordinatorPlugin} interface for native C++ clusters.
* Certain elements of the {@link Plugin} SPI are not used in Prestissimo deployments, or may result in inconsistencies.
* The {@link CoordinatorPlugin} includes only the interfaces relevant to native C++ clusters and
* is a successor to {@link Plugin} and will eventually replace it.
* It contains only interfaces that are valid and used in a coordinator.
*/
public interface CoordinatorPlugin
{
default Iterable<FunctionNamespaceManagerFactory> getFunctionNamespaceManagerFactories()
{
return emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.server.BasicQueryInfo;
import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.NodePoolType;
import com.facebook.presto.spi.NodeState;
import com.facebook.presto.spi.Plugin;
Expand Down Expand Up @@ -586,6 +587,12 @@ public void installPlugin(Plugin plugin)
installPlugin(plugin, false);
}

@Override
public void installCoordinatorPlugin(CoordinatorPlugin plugin)
{
installCoordinatorPlugin(plugin, false);
}

public void createCatalog(String catalogName, String connectorName)
{
createCatalog(catalogName, connectorName, ImmutableMap.of());
Expand Down Expand Up @@ -852,6 +859,18 @@ private void installPlugin(Plugin plugin, boolean coordinatorOnly)
log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}

private void installCoordinatorPlugin(CoordinatorPlugin plugin, boolean coordinatorOnly)
{
long start = nanoTime();
for (TestingPrestoServer server : servers) {
if (coordinatorOnly && !server.isCoordinator()) {
continue;
}
server.installCoordinatorPlugin(plugin);
}
log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit());
}

private static void closeUnchecked(AutoCloseable closeable)
{
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.tests;

import com.facebook.presto.spi.CoordinatorPlugin;
import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory;

public class TestingCoordinatorPlugin
implements CoordinatorPlugin
{
@Override
public Iterable<FunctionNamespaceManagerFactory> getFunctionNamespaceManagerFactories()
{
throw new RuntimeException(String.format("This exception confirms that %s was called.", this.getClass().getSimpleName()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.facebook.presto.tests.TestingCoordinatorPlugin
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.server;

import com.facebook.presto.server.testing.TestingPrestoServer;
import com.facebook.presto.sql.parser.SqlParserOptions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.testng.annotations.Test;

public class TestCoordinatorPluginIntegration
{
private final TestingPrestoServer testingPrestoServer;

private TestCoordinatorPluginIntegration()
throws Exception
{
this.testingPrestoServer = new TestingPrestoServer(
true,
ImmutableMap.<String, String>builder()
.put("plugin.bundles", "../presto-tests/target/")
.build(),
null, null, new SqlParserOptions(), ImmutableList.of());
}

@Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp =
"This exception confirms that TestingCoordinatorPlugin was called.")
public void testCoordinatorPluginIntegration()
throws Exception
{
PluginManager pluginManager = testingPrestoServer.getPluginManager();
pluginManager.loadPlugins();
}
}

0 comments on commit 9466f35

Please sign in to comment.