From 9466f352b1dd8305864aba7ab68b8429ee526c2f Mon Sep 17 00:00:00 2001 From: Pratik Joseph Dabre Date: Thu, 1 Aug 2024 10:53:04 -0700 Subject: [PATCH] Introduce a new CoordinatorPlugin interface for native clusters --- .../presto/server/PluginDiscovery.java | 12 ++--- .../facebook/presto/server/PluginManager.java | 52 ++++++++++++++----- .../server/testing/TestingPrestoServer.java | 10 ++++ .../facebook/presto/testing/QueryRunner.java | 6 +++ .../presto/spi/CoordinatorPlugin.java | 33 ++++++++++++ .../presto/tests/DistributedQueryRunner.java | 19 +++++++ .../tests/TestingCoordinatorPlugin.java | 27 ++++++++++ .../com.facebook.presto.spi.CoordinatorPlugin | 1 + .../TestCoordinatorPluginIntegration.java | 45 ++++++++++++++++ 9 files changed, 186 insertions(+), 19 deletions(-) create mode 100644 presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java create mode 100644 presto-tests/src/main/java/com/facebook/presto/tests/TestingCoordinatorPlugin.java create mode 100644 presto-tests/src/main/resources/META-INF/services/com.facebook.presto.spi.CoordinatorPlugin create mode 100644 presto-tests/src/test/java/com/facebook/presto/server/TestCoordinatorPluginIntegration.java diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java b/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java index c34162d7306c..6c0d1f4fe9ef 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginDiscovery.java @@ -13,7 +13,6 @@ */ package com.facebook.presto.server; -import com.facebook.presto.spi.Plugin; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import org.objectweb.asm.ClassReader; @@ -42,11 +41,10 @@ final class PluginDiscovery { private static final String CLASS_FILE_SUFFIX = ".class"; - private static final String SERVICES_FILE = "META-INF/services/" + Plugin.class.getName(); private PluginDiscovery() {} - public static Set discoverPlugins(Artifact artifact, ClassLoader classLoader) + public static Set discoverPlugins(Artifact artifact, ClassLoader classLoader, String servicesFile, String targetClassName) throws IOException { if (!artifact.getExtension().equals("presto-plugin")) { @@ -61,19 +59,19 @@ public static Set discoverPlugins(Artifact artifact, ClassLoader classLo throw new RuntimeException("Main artifact file is not a directory: " + file); } - if (new File(file, SERVICES_FILE).exists()) { + if (new File(file, servicesFile).exists()) { return ImmutableSet.of(); } return listClasses(file.toPath()).stream() - .filter(name -> classInterfaces(name, classLoader).contains(Plugin.class.getName())) + .filter(name -> classInterfaces(name, classLoader).contains(targetClassName)) .collect(toImmutableSet()); } - public static void writePluginServices(Iterable plugins, File root) + public static void writePluginServices(Iterable plugins, File root, String servicesFile) throws IOException { - Path path = root.toPath().resolve(SERVICES_FILE); + Path path = root.toPath().resolve(servicesFile); createDirectories(path.getParent()); try (Writer out = Files.newBufferedWriter(path, UTF_8)) { for (String plugin : plugins) { diff --git a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java index 0f72c608bb3b..8593c32cf899 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PluginManager.java @@ -27,6 +27,7 @@ import com.facebook.presto.metadata.Metadata; import com.facebook.presto.security.AccessControlManager; import com.facebook.presto.server.security.PasswordAuthenticatorManager; +import com.facebook.presto.spi.CoordinatorPlugin; import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.analyzer.AnalyzerProvider; import com.facebook.presto.spi.classloader.ThreadContextClassLoader; @@ -101,8 +102,10 @@ public class PluginManager .add("com.facebook.drift.TApplicationException") .build(); + // TODO: To make CoordinatorPlugin loading compulsory when native execution is enabled. + private static final String COORDINATOR_PLUGIN_SERVICES_FILE = "META-INF/services/" + CoordinatorPlugin.class.getName(); + private static final String PLUGIN_SERVICES_FILE = "META-INF/services/" + Plugin.class.getName(); private static final Logger log = Logger.get(PluginManager.class); - private final ConnectorManager connectorManager; private final Metadata metadata; private final ResourceGroupManager resourceGroupManager; @@ -206,23 +209,32 @@ private void loadPlugin(String plugin) log.info("-- Loading plugin %s --", plugin); URLClassLoader pluginClassLoader = buildClassLoader(plugin); try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(pluginClassLoader)) { - loadPlugin(pluginClassLoader); + loadPlugin(pluginClassLoader, CoordinatorPlugin.class); + loadPlugin(pluginClassLoader, Plugin.class); } log.info("-- Finished loading plugin %s --", plugin); } - private void loadPlugin(URLClassLoader pluginClassLoader) + private void loadPlugin(URLClassLoader pluginClassLoader, Class clazz) { - ServiceLoader serviceLoader = ServiceLoader.load(Plugin.class, pluginClassLoader); - List plugins = ImmutableList.copyOf(serviceLoader); + ServiceLoader serviceLoader = ServiceLoader.load(clazz, pluginClassLoader); + List plugins = ImmutableList.copyOf(serviceLoader); if (plugins.isEmpty()) { - log.warn("No service providers of type %s", Plugin.class.getName()); + log.warn("No service providers of type %s", clazz.getName()); } - for (Plugin plugin : plugins) { + for (Object plugin : plugins) { log.info("Installing %s", plugin.getClass().getName()); - installPlugin(plugin); + if (plugin instanceof Plugin) { + installPlugin((Plugin) plugin); + } + else if (plugin instanceof CoordinatorPlugin) { + installCoordinatorPlugin((CoordinatorPlugin) plugin); + } + else { + log.warn("Unknown plugin type: %s", plugin.getClass().getName()); + } } } @@ -328,6 +340,14 @@ public void installPlugin(Plugin plugin) } } + public void installCoordinatorPlugin(CoordinatorPlugin plugin) + { + for (FunctionNamespaceManagerFactory functionNamespaceManagerFactory : plugin.getFunctionNamespaceManagerFactories()) { + log.info("Registering function namespace manager %s", functionNamespaceManagerFactory.getName()); + metadata.getFunctionAndTypeManager().addFunctionNamespaceFactory(functionNamespaceManagerFactory); + } + } + private URLClassLoader buildClassLoader(String plugin) throws Exception { @@ -348,10 +368,9 @@ private URLClassLoader buildClassLoaderFromPom(File pomFile) URLClassLoader classLoader = createClassLoader(artifacts, pomFile.getPath()); Artifact artifact = artifacts.get(0); - Set plugins = discoverPlugins(artifact, classLoader); - if (!plugins.isEmpty()) { - writePluginServices(plugins, artifact.getFile()); - } + + processPlugins(artifact, classLoader, COORDINATOR_PLUGIN_SERVICES_FILE, CoordinatorPlugin.class.getName()); + processPlugins(artifact, classLoader, PLUGIN_SERVICES_FILE, Plugin.class.getName()); return classLoader; } @@ -416,4 +435,13 @@ private static List sortedArtifacts(List artifacts) Collections.sort(list, Ordering.natural().nullsLast().onResultOf(Artifact::getFile)); return list; } + + private void processPlugins(Artifact artifact, ClassLoader classLoader, String servicesFile, String className) + throws IOException + { + Set plugins = discoverPlugins(artifact, classLoader, servicesFile, className); + if (!plugins.isEmpty()) { + writePluginServices(plugins, artifact.getFile(), servicesFile); + } + } } diff --git a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java index 24993f8006d9..dbd12609cc56 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java @@ -60,6 +60,7 @@ import com.facebook.presto.server.ShutdownAction; import com.facebook.presto.server.security.ServerSecurityModule; import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.CoordinatorPlugin; import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.QueryId; import com.facebook.presto.spi.eventlistener.EventListener; @@ -475,11 +476,20 @@ public void close() } } + public PluginManager getPluginManager() + { + return pluginManager; + } public void installPlugin(Plugin plugin) { pluginManager.installPlugin(plugin); } + public void installCoordinatorPlugin(CoordinatorPlugin plugin) + { + pluginManager.installCoordinatorPlugin(plugin); + } + public DispatchManager getDispatchManager() { return dispatchManager; diff --git a/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java b/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java index e02f0aa3f3e7..2cae8bfa14d5 100644 --- a/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java +++ b/presto-main/src/main/java/com/facebook/presto/testing/QueryRunner.java @@ -18,6 +18,7 @@ import com.facebook.presto.common.type.Type; import com.facebook.presto.cost.StatsCalculator; import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.spi.CoordinatorPlugin; import com.facebook.presto.spi.Plugin; import com.facebook.presto.spi.WarningCollector; import com.facebook.presto.spi.eventlistener.EventListener; @@ -84,6 +85,11 @@ default Plan createPlan(Session session, @Language("SQL") String sql, WarningCol void installPlugin(Plugin plugin); + default void installCoordinatorPlugin(CoordinatorPlugin plugin) + { + throw new UnsupportedOperationException(); + } + void createCatalog(String catalogName, String connectorName, Map properties); void loadFunctionNamespaceManager(String functionNamespaceManagerName, String catalogName, Map properties); diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java b/presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java new file mode 100644 index 000000000000..f855e6a63e2d --- /dev/null +++ b/presto-spi/src/main/java/com/facebook/presto/spi/CoordinatorPlugin.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.spi; + +import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; + +import static java.util.Collections.emptyList; + +/** + * Introduces a new {@link CoordinatorPlugin} interface for native C++ clusters. + * Certain elements of the {@link Plugin} SPI are not used in Prestissimo deployments, or may result in inconsistencies. + * The {@link CoordinatorPlugin} includes only the interfaces relevant to native C++ clusters and + * is a successor to {@link Plugin} and will eventually replace it. + * It contains only interfaces that are valid and used in a coordinator. + */ +public interface CoordinatorPlugin +{ + default Iterable getFunctionNamespaceManagerFactories() + { + return emptyList(); + } +} diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java index 4db85b08f468..82652138b716 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java @@ -33,6 +33,7 @@ import com.facebook.presto.server.BasicQueryInfo; import com.facebook.presto.server.testing.TestingPrestoServer; import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.spi.CoordinatorPlugin; import com.facebook.presto.spi.NodePoolType; import com.facebook.presto.spi.NodeState; import com.facebook.presto.spi.Plugin; @@ -586,6 +587,12 @@ public void installPlugin(Plugin plugin) installPlugin(plugin, false); } + @Override + public void installCoordinatorPlugin(CoordinatorPlugin plugin) + { + installCoordinatorPlugin(plugin, false); + } + public void createCatalog(String catalogName, String connectorName) { createCatalog(catalogName, connectorName, ImmutableMap.of()); @@ -852,6 +859,18 @@ private void installPlugin(Plugin plugin, boolean coordinatorOnly) log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit()); } + private void installCoordinatorPlugin(CoordinatorPlugin plugin, boolean coordinatorOnly) + { + long start = nanoTime(); + for (TestingPrestoServer server : servers) { + if (coordinatorOnly && !server.isCoordinator()) { + continue; + } + server.installCoordinatorPlugin(plugin); + } + log.info("Installed plugin %s in %s", plugin.getClass().getSimpleName(), nanosSince(start).convertToMostSuccinctTimeUnit()); + } + private static void closeUnchecked(AutoCloseable closeable) { try { diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/TestingCoordinatorPlugin.java b/presto-tests/src/main/java/com/facebook/presto/tests/TestingCoordinatorPlugin.java new file mode 100644 index 000000000000..fabb46649f07 --- /dev/null +++ b/presto-tests/src/main/java/com/facebook/presto/tests/TestingCoordinatorPlugin.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tests; + +import com.facebook.presto.spi.CoordinatorPlugin; +import com.facebook.presto.spi.function.FunctionNamespaceManagerFactory; + +public class TestingCoordinatorPlugin + implements CoordinatorPlugin +{ + @Override + public Iterable getFunctionNamespaceManagerFactories() + { + throw new RuntimeException(String.format("This exception confirms that %s was called.", this.getClass().getSimpleName())); + } +} diff --git a/presto-tests/src/main/resources/META-INF/services/com.facebook.presto.spi.CoordinatorPlugin b/presto-tests/src/main/resources/META-INF/services/com.facebook.presto.spi.CoordinatorPlugin new file mode 100644 index 000000000000..99697e92b5ee --- /dev/null +++ b/presto-tests/src/main/resources/META-INF/services/com.facebook.presto.spi.CoordinatorPlugin @@ -0,0 +1 @@ +com.facebook.presto.tests.TestingCoordinatorPlugin diff --git a/presto-tests/src/test/java/com/facebook/presto/server/TestCoordinatorPluginIntegration.java b/presto-tests/src/test/java/com/facebook/presto/server/TestCoordinatorPluginIntegration.java new file mode 100644 index 000000000000..8d3ef30917cb --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/server/TestCoordinatorPluginIntegration.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.server; + +import com.facebook.presto.server.testing.TestingPrestoServer; +import com.facebook.presto.sql.parser.SqlParserOptions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +public class TestCoordinatorPluginIntegration +{ + private final TestingPrestoServer testingPrestoServer; + + private TestCoordinatorPluginIntegration() + throws Exception + { + this.testingPrestoServer = new TestingPrestoServer( + true, + ImmutableMap.builder() + .put("plugin.bundles", "../presto-tests/target/") + .build(), + null, null, new SqlParserOptions(), ImmutableList.of()); + } + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = + "This exception confirms that TestingCoordinatorPlugin was called.") + public void testCoordinatorPluginIntegration() + throws Exception + { + PluginManager pluginManager = testingPrestoServer.getPluginManager(); + pluginManager.loadPlugins(); + } +}