-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-24252][SQL] Add catalog registration and table catalog APIs. #21306
Changes from all commits
21acda2
4697406
dca4bf8
6b45a11
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
import java.util.Collection; | ||
import java.util.HashMap; | ||
import java.util.Locale; | ||
import java.util.Map; | ||
import java.util.Set; | ||
|
||
/** | ||
* Case-insensitive map of string keys to string values. | ||
* <p> | ||
* This is used to pass options to v2 implementations to ensure consistent case insensitivity. | ||
* <p> | ||
* Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return | ||
* keys converted to lower case. | ||
*/ | ||
public class CaseInsensitiveStringMap implements Map<String, String> { | ||
|
||
public static CaseInsensitiveStringMap empty() { | ||
return new CaseInsensitiveStringMap(); | ||
} | ||
|
||
private final Map<String, String> delegate; | ||
|
||
private CaseInsensitiveStringMap() { | ||
this.delegate = new HashMap<>(); | ||
} | ||
|
||
@Override | ||
public int size() { | ||
return delegate.size(); | ||
} | ||
|
||
@Override | ||
public boolean isEmpty() { | ||
return delegate.isEmpty(); | ||
} | ||
|
||
@Override | ||
public boolean containsKey(Object key) { | ||
return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT)); | ||
} | ||
|
||
@Override | ||
public boolean containsValue(Object value) { | ||
return delegate.containsValue(value); | ||
} | ||
|
||
@Override | ||
public String get(Object key) { | ||
return delegate.get(key.toString().toLowerCase(Locale.ROOT)); | ||
} | ||
|
||
@Override | ||
public String put(String key, String value) { | ||
return delegate.put(key.toLowerCase(Locale.ROOT), value); | ||
} | ||
|
||
@Override | ||
public String remove(Object key) { | ||
return delegate.remove(key.toString().toLowerCase(Locale.ROOT)); | ||
} | ||
|
||
@Override | ||
public void putAll(Map<? extends String, ? extends String> m) { | ||
for (Map.Entry<? extends String, ? extends String> entry : m.entrySet()) { | ||
delegate.put(entry.getKey().toLowerCase(Locale.ROOT), entry.getValue()); | ||
} | ||
} | ||
|
||
@Override | ||
public void clear() { | ||
delegate.clear(); | ||
} | ||
|
||
@Override | ||
public Set<String> keySet() { | ||
return delegate.keySet(); | ||
} | ||
|
||
@Override | ||
public Collection<String> values() { | ||
return delegate.values(); | ||
} | ||
|
||
@Override | ||
public Set<Map.Entry<String, String>> entrySet() { | ||
return delegate.entrySet(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
import org.apache.spark.sql.internal.SQLConf; | ||
|
||
/** | ||
* A marker interface to provide a catalog implementation for Spark. | ||
* <p> | ||
* Implementations can provide catalog functions by implementing additional interfaces, like | ||
* {@link TableCatalog} to expose table operations. | ||
* <p> | ||
* Catalog implementations must implement this marker interface to be loaded by | ||
* {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the | ||
* required public no-arg constructor. After creating an instance, it will be configured by calling | ||
* {@link #initialize(CaseInsensitiveStringMap)}. | ||
* <p> | ||
* Catalog implementations are registered to a name by adding a configuration option to Spark: | ||
* {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties | ||
* in the Spark configuration that share the catalog name prefix, | ||
* {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive | ||
* string map of options in initialization with the prefix removed. An additional property, | ||
* {@code name}, is also added to the options and will contain the catalog's name; in this case, | ||
* "catalog-name". | ||
*/ | ||
public interface CatalogProvider { | ||
/** | ||
* Called to initialize configuration. | ||
* <p> | ||
* This method is called once, just after the provider is instantiated. | ||
* | ||
* @param options a case-insensitive string map of configuration | ||
*/ | ||
void initialize(CaseInsensitiveStringMap options); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we reuse There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's a Scala map and the v2 APIs are intended to be used with both Java and Scala. My intent is to reuse this map in place of DataSourceOptions, so at least we will reduce some duplication. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. Thanks! |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
import org.apache.spark.SparkException; | ||
import org.apache.spark.sql.internal.SQLConf; | ||
import org.apache.spark.util.Utils; | ||
|
||
import java.util.Map; | ||
import java.util.regex.Matcher; | ||
import java.util.regex.Pattern; | ||
|
||
import static scala.collection.JavaConverters.mapAsJavaMapConverter; | ||
|
||
public class Catalogs { | ||
private Catalogs() { | ||
} | ||
|
||
/** | ||
* Load and configure a catalog by name. | ||
* <p> | ||
* This loads, instantiates, and initializes the catalog provider for each call; it does not | ||
* cache or reuse instances. | ||
* | ||
* @param name a String catalog name | ||
* @param conf a SQLConf | ||
* @return an initialized CatalogProvider | ||
* @throws SparkException If the provider class cannot be found or instantiated | ||
*/ | ||
public static CatalogProvider load(String name, SQLConf conf) throws SparkException { | ||
String providerClassName = conf.getConfString("spark.sql.catalog." + name, null); | ||
if (providerClassName == null) { | ||
throw new SparkException(String.format( | ||
"Catalog '%s' provider not found: spark.sql.catalog.%s is not defined", name, name)); | ||
} | ||
|
||
ClassLoader loader = Utils.getContextOrSparkClassLoader(); | ||
|
||
try { | ||
Class<?> providerClass = loader.loadClass(providerClassName); | ||
|
||
if (!CatalogProvider.class.isAssignableFrom(providerClass)) { | ||
throw new SparkException(String.format( | ||
"Provider class for catalog '%s' does not implement CatalogProvider: %s", | ||
name, providerClassName)); | ||
} | ||
|
||
CatalogProvider provider = CatalogProvider.class.cast(providerClass.newInstance()); | ||
|
||
provider.initialize(catalogOptions(name, conf)); | ||
|
||
return provider; | ||
|
||
} catch (ClassNotFoundException e) { | ||
throw new SparkException(String.format( | ||
"Cannot find catalog provider class for catalog '%s': %s", name, providerClassName)); | ||
|
||
} catch (IllegalAccessException e) { | ||
throw new SparkException(String.format( | ||
"Failed to call public no-arg constructor for catalog '%s': %s", name, providerClassName), | ||
e); | ||
|
||
} catch (InstantiationException e) { | ||
throw new SparkException(String.format( | ||
"Failed while instantiating provider for catalog '%s': %s", name, providerClassName), | ||
e.getCause()); | ||
} | ||
} | ||
|
||
/** | ||
* Extracts a named catalog's configuration from a SQLConf. | ||
* | ||
* @param name a catalog name | ||
* @param conf a SQLConf | ||
* @return a case insensitive string map of options starting with spark.sql.catalog.(name). | ||
*/ | ||
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { | ||
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); | ||
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); | ||
|
||
CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); | ||
for (Map.Entry<String, String> entry : allConfs.entrySet()) { | ||
Matcher matcher = prefix.matcher(entry.getKey()); | ||
if (matcher.matches() && matcher.groupCount() > 0) { | ||
options.put(matcher.group(1), entry.getValue()); | ||
} | ||
} | ||
|
||
// add name last to ensure it overwrites any conflicting options | ||
options.put("name", name); | ||
|
||
return options; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,49 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalog.v2; | ||
|
||
/** | ||
* A logical transformation function. | ||
* <p> | ||
* This does not support applying transformations; it only communicates the type of transformation | ||
* and its input column references. | ||
* <p> | ||
* This interface is used to pass partitioning transformations to v2 catalog implementations. For | ||
* example a table may partition data by the date of a timestamp column, ts, using | ||
* <code>date(ts)</code>. This is similar to org.apache.spark.sql.sources.Filter, which is used to | ||
* pass boolean filter expressions to data source implementations. | ||
* <p> | ||
* To use data values directly as partition values, use the "identity" transform: | ||
* <code>identity(col)</code>. Identity partition transforms are the only transforms used by Hive. | ||
* For Hive tables, SQL statements produce data columns that are used without modification to | ||
* partition the remaining data columns. | ||
* <p> | ||
* Table formats other than Hive can use partition transforms to automatically derive partition | ||
* values from rows and to transform data predicates to partition predicates. | ||
*/ | ||
public interface PartitionTransform { | ||
/** | ||
* The name of this transform. | ||
*/ | ||
String name(); | ||
|
||
/** | ||
* The data columns that are referenced by this transform. | ||
*/ | ||
String[] references(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As we discussed, will these APIs now live in the
sql-api
package? Also at what point are we going to introduce this new Maven module and package?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@cloud-fan, do you want me to create the
sql-api
package in this PR, or do you want to add a separate PR to move the current v2 API?