Skip to content

Commit

Permalink
[SPARK-24252][SQL] Add v2 catalog plugin system
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

This adds a v2 API for adding new catalog plugins to Spark.

* Catalog implementations extend `CatalogPlugin` and are loaded via reflection, similar to data sources
* `Catalogs` loads and initializes catalogs using configuration from a `SQLConf`
* `CaseInsensitiveStringMap` is used to pass configuration to `CatalogPlugin` via `initialize`

Catalogs are configured by adding config properties starting with `spark.sql.catalog.(name)`. The name property must specify a class that implements `CatalogPlugin`. Other properties under the namespace (`spark.sql.catalog.(name).(prop)`) are passed to the provider during initialization along with the catalog name.

This replaces #21306, which will be implemented in two multiple parts: the catalog plugin system (this commit) and specific catalog APIs, like `TableCatalog`.

## How was this patch tested?

Added test suites for `CaseInsensitiveStringMap` and for catalog loading.

Closes #23915 from rdblue/SPARK-24252-add-v2-catalog-plugins.

Authored-by: Ryan Blue <blue@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
rdblue authored and cloud-fan committed Mar 8, 2019
1 parent 2036074 commit 6170e40
Show file tree
Hide file tree
Showing 6 changed files with 544 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

/**
* A marker interface to provide a catalog implementation for Spark.
* <p>
* Implementations can provide catalog functions by implementing additional interfaces for tables,
* views, and functions.
* <p>
* Catalog implementations must implement this marker interface to be loaded by
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
* required public no-arg constructor. After creating an instance, it will be configured by calling
* {@link #initialize(String, CaseInsensitiveStringMap)}.
* <p>
* Catalog implementations are registered to a name by adding a configuration option to Spark:
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
* in the Spark configuration that share the catalog name prefix,
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
* string map of options in initialization with the prefix removed.
* {@code name}, is also passed and is the catalog's name; in this case, "catalog-name".
*/
@Experimental
public interface CatalogPlugin {
/**
* Called to initialize configuration.
* <p>
* This method is called once, just after the provider is instantiated.
*
* @param name the name used to identify and load this catalog
* @param options a case-insensitive string map of configuration
*/
void initialize(String name, CaseInsensitiveStringMap options);

/**
* Called to get this catalog's name.
* <p>
* This method is only called after {@link #initialize(String, CaseInsensitiveStringMap)} is
* called to pass the catalog's name.
*/
String name();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.SparkException;
import org.apache.spark.annotation.Private;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static scala.collection.JavaConverters.mapAsJavaMapConverter;

@Private
public class Catalogs {
private Catalogs() {
}

/**
* Load and configure a catalog by name.
* <p>
* This loads, instantiates, and initializes the catalog plugin for each call; it does not cache
* or reuse instances.
*
* @param name a String catalog name
* @param conf a SQLConf
* @return an initialized CatalogPlugin
* @throws SparkException If the plugin class cannot be found or instantiated
*/
public static CatalogPlugin load(String name, SQLConf conf) throws SparkException {
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
if (pluginClassName == null) {
throw new SparkException(String.format(
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
}

ClassLoader loader = Utils.getContextOrSparkClassLoader();

try {
Class<?> pluginClass = loader.loadClass(pluginClassName);

if (!CatalogPlugin.class.isAssignableFrom(pluginClass)) {
throw new SparkException(String.format(
"Plugin class for catalog '%s' does not implement CatalogPlugin: %s",
name, pluginClassName));
}

CatalogPlugin plugin = CatalogPlugin.class.cast(pluginClass.newInstance());

plugin.initialize(name, catalogOptions(name, conf));

return plugin;

} catch (ClassNotFoundException e) {
throw new SparkException(String.format(
"Cannot find catalog plugin class for catalog '%s': %s", name, pluginClassName));

} catch (IllegalAccessException e) {
throw new SparkException(String.format(
"Failed to call public no-arg constructor for catalog '%s': %s", name, pluginClassName),
e);

} catch (InstantiationException e) {
throw new SparkException(String.format(
"Failed while instantiating plugin for catalog '%s': %s", name, pluginClassName),
e.getCause());
}
}

/**
* Extracts a named catalog's configuration from a SQLConf.
*
* @param name a catalog name
* @param conf a SQLConf
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");

CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}

return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.util;

import org.apache.spark.annotation.Experimental;

import java.util.Collection;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

/**
* Case-insensitive map of string keys to string values.
* <p>
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
* <p>
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
* keys converted to lower case.
*/
@Experimental
public class CaseInsensitiveStringMap implements Map<String, String> {

public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap();
}

private final Map<String, String> delegate;

private CaseInsensitiveStringMap() {
this.delegate = new HashMap<>();
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}

@Override
public String get(Object key) {
return delegate.get(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public String put(String key, String value) {
return delegate.put(key.toLowerCase(Locale.ROOT), value);
}

@Override
public String remove(Object key) {
return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public void putAll(Map<? extends String, ? extends String> m) {
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
put(entry.getKey(), entry.getValue());
}
}

@Override
public void clear() {
delegate.clear();
}

@Override
public Set<String> keySet() {
return delegate.keySet();
}

@Override
public Collection<String> values() {
return delegate.values();
}

@Override
public Set<Map.Entry<String, String>> entrySet() {
return delegate.entrySet();
}
}
Loading

0 comments on commit 6170e40

Please sign in to comment.