Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-24252][SQL] Add catalog registration and table catalog APIs. #21306

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import java.util.Collection;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Set;

/**
* Case-insensitive map of string keys to string values.
* <p>
* This is used to pass options to v2 implementations to ensure consistent case insensitivity.
* <p>
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return
* keys converted to lower case.
*/
public class CaseInsensitiveStringMap implements Map<String, String> {

public static CaseInsensitiveStringMap empty() {
return new CaseInsensitiveStringMap();
}

private final Map<String, String> delegate;

private CaseInsensitiveStringMap() {
this.delegate = new HashMap<>();
}

@Override
public int size() {
return delegate.size();
}

@Override
public boolean isEmpty() {
return delegate.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public boolean containsValue(Object value) {
return delegate.containsValue(value);
}

@Override
public String get(Object key) {
return delegate.get(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public String put(String key, String value) {
return delegate.put(key.toLowerCase(Locale.ROOT), value);
}

@Override
public String remove(Object key) {
return delegate.remove(key.toString().toLowerCase(Locale.ROOT));
}

@Override
public void putAll(Map<? extends String, ? extends String> m) {
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) {
delegate.put(entry.getKey().toLowerCase(Locale.ROOT), entry.getValue());
}
}

@Override
public void clear() {
delegate.clear();
}

@Override
public Set<String> keySet() {
return delegate.keySet();
}

@Override
public Collection<String> values() {
return delegate.values();
}

@Override
public Set<Map.Entry<String, String>> entrySet() {
return delegate.entrySet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.sql.internal.SQLConf;

/**
* A marker interface to provide a catalog implementation for Spark.
* <p>
* Implementations can provide catalog functions by implementing additional interfaces, like
* {@link TableCatalog} to expose table operations.
* <p>
* Catalog implementations must implement this marker interface to be loaded by
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the
* required public no-arg constructor. After creating an instance, it will be configured by calling
* {@link #initialize(CaseInsensitiveStringMap)}.
* <p>
* Catalog implementations are registered to a name by adding a configuration option to Spark:
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties
* in the Spark configuration that share the catalog name prefix,
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive
* string map of options in initialization with the prefix removed. An additional property,
* {@code name}, is also added to the options and will contain the catalog's name; in this case,
* "catalog-name".
*/
public interface CatalogProvider {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed, will these APIs now live in the sql-api package? Also at what point are we going to introduce this new Maven module and package?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, do you want me to create the sql-api package in this PR, or do you want to add a separate PR to move the current v2 API?

/**
* Called to initialize configuration.
* <p>
* This method is called once, just after the provider is instantiated.
*
* @param options a case-insensitive string map of configuration
*/
void initialize(CaseInsensitiveStringMap options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse CaseInsensitiveMap[String]?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a Scala map and the v2 APIs are intended to be used with both Java and Scala. My intent is to reuse this map in place of DataSourceOptions, so at least we will reduce some duplication.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thanks!

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.SparkException;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.util.Utils;

import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static scala.collection.JavaConverters.mapAsJavaMapConverter;

public class Catalogs {
private Catalogs() {
}

/**
* Load and configure a catalog by name.
* <p>
* This loads, instantiates, and initializes the catalog provider for each call; it does not
* cache or reuse instances.
*
* @param name a String catalog name
* @param conf a SQLConf
* @return an initialized CatalogProvider
* @throws SparkException If the provider class cannot be found or instantiated
*/
public static CatalogProvider load(String name, SQLConf conf) throws SparkException {
String providerClassName = conf.getConfString("spark.sql.catalog." + name, null);
if (providerClassName == null) {
throw new SparkException(String.format(
"Catalog '%s' provider not found: spark.sql.catalog.%s is not defined", name, name));
}

ClassLoader loader = Utils.getContextOrSparkClassLoader();

try {
Class<?> providerClass = loader.loadClass(providerClassName);

if (!CatalogProvider.class.isAssignableFrom(providerClass)) {
throw new SparkException(String.format(
"Provider class for catalog '%s' does not implement CatalogProvider: %s",
name, providerClassName));
}

CatalogProvider provider = CatalogProvider.class.cast(providerClass.newInstance());

provider.initialize(catalogOptions(name, conf));

return provider;

} catch (ClassNotFoundException e) {
throw new SparkException(String.format(
"Cannot find catalog provider class for catalog '%s': %s", name, providerClassName));

} catch (IllegalAccessException e) {
throw new SparkException(String.format(
"Failed to call public no-arg constructor for catalog '%s': %s", name, providerClassName),
e);

} catch (InstantiationException e) {
throw new SparkException(String.format(
"Failed while instantiating provider for catalog '%s': %s", name, providerClassName),
e.getCause());
}
}

/**
* Extracts a named catalog's configuration from a SQLConf.
*
* @param name a catalog name
* @param conf a SQLConf
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");

CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}

// add name last to ensure it overwrites any conflicting options
options.put("name", name);

return options;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

/**
* A logical transformation function.
* <p>
* This does not support applying transformations; it only communicates the type of transformation
* and its input column references.
* <p>
* This interface is used to pass partitioning transformations to v2 catalog implementations. For
* example a table may partition data by the date of a timestamp column, ts, using
* <code>date(ts)</code>. This is similar to org.apache.spark.sql.sources.Filter, which is used to
* pass boolean filter expressions to data source implementations.
* <p>
* To use data values directly as partition values, use the "identity" transform:
* <code>identity(col)</code>. Identity partition transforms are the only transforms used by Hive.
* For Hive tables, SQL statements produce data columns that are used without modification to
* partition the remaining data columns.
* <p>
* Table formats other than Hive can use partition transforms to automatically derive partition
* values from rows and to transform data predicates to partition predicates.
*/
public interface PartitionTransform {
/**
* The name of this transform.
*/
String name();

/**
* The data columns that are referenced by this transform.
*/
String[] references();
}
Loading