From 21acda237744d4299e5bb449dce1ec8a1735f6de Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 4 May 2018 18:13:01 -0700 Subject: [PATCH 1/4] SPARK-24252: Add v2 data source mix-in for catalog support. --- .../catalog/v2/CaseInsensitiveStringMap.java | 107 ++++++++++ .../spark/sql/catalog/v2/CatalogProvider.java | 50 +++++ .../apache/spark/sql/catalog/v2/Catalogs.java | 109 +++++++++++ .../apache/spark/sql/catalog/v2/Table.java | 47 +++++ .../spark/sql/catalog/v2/TableCatalog.java | 137 +++++++++++++ .../spark/sql/catalog/v2/TableChange.java | 182 +++++++++++++++++ .../v2/CaseInsensitiveStringMapSuite.java | 48 +++++ .../sql/catalog/v2/CatalogLoadingSuite.java | 184 ++++++++++++++++++ .../org/apache/spark/sql/SparkSession.scala | 8 + 9 files changed, 872 insertions(+) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java create mode 100644 sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java create mode 100644 sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java new file mode 100644 index 0000000000000..a4ad1f6994f93 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMap.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +/** + * Case-insensitive map of string keys to string values. + *

+ * This is used to pass options to v2 implementations to ensure consistent case insensitivity. + *

+ * Methods that return keys in this map, like {@link #entrySet()} and {@link #keySet()}, return + * keys converted to lower case. + */ +public class CaseInsensitiveStringMap implements Map { + + public static CaseInsensitiveStringMap empty() { + return new CaseInsensitiveStringMap(); + } + + private final Map delegate; + + private CaseInsensitiveStringMap() { + this.delegate = new HashMap<>(); + } + + @Override + public int size() { + return delegate.size(); + } + + @Override + public boolean isEmpty() { + return delegate.isEmpty(); + } + + @Override + public boolean containsKey(Object key) { + return delegate.containsKey(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public boolean containsValue(Object value) { + return delegate.containsValue(value); + } + + @Override + public String get(Object key) { + return delegate.get(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public String put(String key, String value) { + return delegate.put(key.toLowerCase(Locale.ROOT), value); + } + + @Override + public String remove(Object key) { + return delegate.remove(key.toString().toLowerCase(Locale.ROOT)); + } + + @Override + public void putAll(Map m) { + for (Map.Entry entry : m.entrySet()) { + delegate.put(entry.getKey().toLowerCase(Locale.ROOT), entry.getValue()); + } + } + + @Override + public void clear() { + delegate.clear(); + } + + @Override + public Set keySet() { + return delegate.keySet(); + } + + @Override + public Collection values() { + return delegate.values(); + } + + @Override + public Set> entrySet() { + return delegate.entrySet(); + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java new file mode 100644 index 0000000000000..03831b7aa9155 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/CatalogProvider.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.internal.SQLConf; + +/** + * A marker interface to provide a catalog implementation for Spark. + *

+ * Implementations can provide catalog functions by implementing additional interfaces, like + * {@link TableCatalog} to expose table operations. + *

+ * Catalog implementations must implement this marker interface to be loaded by + * {@link Catalogs#load(String, SQLConf)}. The loader will instantiate catalog classes using the + * required public no-arg constructor. After creating an instance, it will be configured by calling + * {@link #initialize(CaseInsensitiveStringMap)}. + *

+ * Catalog implementations are registered to a name by adding a configuration option to Spark: + * {@code spark.sql.catalog.catalog-name=com.example.YourCatalogClass}. All configuration properties + * in the Spark configuration that share the catalog name prefix, + * {@code spark.sql.catalog.catalog-name.(key)=(value)} will be passed in the case insensitive + * string map of options in initialization with the prefix removed. An additional property, + * {@code name}, is also added to the options and will contain the catalog's name; in this case, + * "catalog-name". + */ +public interface CatalogProvider { + /** + * Called to initialize configuration. + *

+ * This method is called once, just after the provider is instantiated. + * + * @param options a case-insensitive string map of configuration + */ + void initialize(CaseInsensitiveStringMap options); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java new file mode 100644 index 0000000000000..71ab9f528dbe7 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Catalogs.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.util.Utils; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static scala.collection.JavaConverters.mapAsJavaMapConverter; + +public class Catalogs { + private Catalogs() { + } + + /** + * Load and configure a catalog by name. + *

+ * This loads, instantiates, and initializes the catalog provider for each call; it does not + * cache or reuse instances. + * + * @param name a String catalog name + * @param conf a SQLConf + * @return an initialized CatalogProvider + * @throws SparkException If the provider class cannot be found or instantiated + */ + public static CatalogProvider load(String name, SQLConf conf) throws SparkException { + String providerClassName = conf.getConfString("spark.sql.catalog." + name, null); + if (providerClassName == null) { + throw new SparkException(String.format( + "Catalog '%s' provider not found: spark.sql.catalog.%s is not defined", name, name)); + } + + ClassLoader loader = Utils.getContextOrSparkClassLoader(); + + try { + Class providerClass = loader.loadClass(providerClassName); + + if (!CatalogProvider.class.isAssignableFrom(providerClass)) { + throw new SparkException(String.format( + "Provider class for catalog '%s' does not implement CatalogProvider: %s", + name, providerClassName)); + } + + CatalogProvider provider = CatalogProvider.class.cast(providerClass.newInstance()); + + provider.initialize(catalogOptions(name, conf)); + + return provider; + + } catch (ClassNotFoundException e) { + throw new SparkException(String.format( + "Cannot find catalog provider class for catalog '%s': %s", name, providerClassName)); + + } catch (IllegalAccessException e) { + throw new SparkException(String.format( + "Failed to call public no-arg constructor for catalog '%s': %s", name, providerClassName), + e); + + } catch (InstantiationException e) { + throw new SparkException(String.format( + "Failed while instantiating provider for catalog '%s': %s", name, providerClassName), + e.getCause()); + } + } + + /** + * Extracts a named catalog's configuration from a SQLConf. + * + * @param name a catalog name + * @param conf a SQLConf + * @return a case insensitive string map of options starting with spark.sql.catalog.(name). + */ + private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) { + Map allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava(); + Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)"); + + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + for (Map.Entry entry : allConfs.entrySet()) { + Matcher matcher = prefix.matcher(entry.getKey()); + if (matcher.matches() && matcher.groupCount() > 0) { + options.put(matcher.group(1), entry.getValue()); + } + } + + // add name last to ensure it overwrites any conflicting options + options.put("name", name); + + return options; + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java new file mode 100644 index 0000000000000..30a20f27b8c65 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.List; +import java.util.Map; + +/** + * Represents table metadata from a {@link TableCatalog} or other table sources. + */ +public interface Table { + /** + * Return the table properties. + * @return this table's map of string properties + */ + Map properties(); + + /** + * Return the table schema. + * @return this table's schema as a struct type + */ + StructType schema(); + + /** + * Return the table partitioning expressions. + * @return this table's partitioning expressions + */ + List partitionExpressions(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java new file mode 100644 index 0000000000000..539beb0c39c56 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.catalyst.TableIdentifier; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; +import org.apache.spark.sql.catalyst.expressions.Expression; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public interface TableCatalog extends CatalogProvider { + /** + * Load table metadata by {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return the table's metadata + * @throws NoSuchTableException If the table doesn't exist. + */ + Table loadTable(TableIdentifier ident) throws NoSuchTableException; + + /** + * Test whether a table exists using an {@link TableIdentifier identifier} from the catalog. + * + * @param ident a table identifier + * @return true if the table exists, false otherwise + */ + default boolean tableExists(TableIdentifier ident) { + try { + return loadTable(ident) != null; + } catch (NoSuchTableException e) { + return false; + } + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, + StructType schema) throws TableAlreadyExistsException { + return createTable(ident, schema, Collections.emptyList(), Collections.emptyMap()); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + default Table createTable(TableIdentifier ident, + StructType schema, + Map properties) throws TableAlreadyExistsException { + return createTable(ident, schema, Collections.emptyList(), properties); + } + + /** + * Create a table in the catalog. + * + * @param ident a table identifier + * @param schema the schema of the new table, as a struct type + * @param partitions a list of expressions to use for partitioning data in the table + * @param properties a string map of table properties + * @return metadata for the new table + * @throws TableAlreadyExistsException If a table already exists for the identifier + */ + Table createTable(TableIdentifier ident, + StructType schema, + List partitions, + Map properties) throws TableAlreadyExistsException; + + /** + * Apply a list of {@link TableChange changes} to a table in the catalog. + *

+ * Implementations may reject the requested changes. If any change is rejected, none of the + * changes should be applied to the table. + * + * @param ident a table identifier + * @param changes a list of changes to apply to the table + * @return updated metadata for the table + * @throws NoSuchTableException If the table doesn't exist. + * @throws IllegalArgumentException If any change is rejected by the implementation. + */ + Table alterTable(TableIdentifier ident, + List changes) throws NoSuchTableException; + + /** + * Apply {@link TableChange changes} to a table in the catalog. + *

+ * Implementations may reject the requested changes. If any change is rejected, none of the + * changes should be applied to the table. + * + * @param ident a table identifier + * @param changes a list of changes to apply to the table + * @return updated metadata for the table + * @throws NoSuchTableException If the table doesn't exist. + * @throws IllegalArgumentException If any change is rejected by the implementation. + */ + default Table alterTable(TableIdentifier ident, + TableChange... changes) throws NoSuchTableException { + return alterTable(ident, Arrays.asList(changes)); + } + + /** + * Drop a table in the catalog. + * + * @param ident a table identifier + * @return true if a table was deleted, false if no table exists for the identifier + */ + boolean dropTable(TableIdentifier ident); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java new file mode 100644 index 0000000000000..3a8ba5e00b397 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableChange.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.sql.types.DataType; + +/** + * TableChange subclasses represent requested changes to a table. These are passed to + * {@link TableCatalog#alterTable}. For example, + *

+ *   import TableChange._
+ *   val catalog = source.asInstanceOf[TableSupport].catalog()
+ *   catalog.alterTable(ident,
+ *       addColumn("x", IntegerType),
+ *       renameColumn("a", "b"),
+ *       deleteColumn("c")
+ *     )
+ * 
+ */ +public interface TableChange { + + /** + * Create a TableChange for adding a top-level column to a table. + *

+ * Because "." may be interpreted as a field path separator or may be used in field names, it is + * not allowed in names passed to this method. To add to nested types or to add fields with + * names that contain ".", use {@link #addColumn(String, String, DataType)}. + * + * @param name the new top-level column name + * @param dataType the new column's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String name, DataType dataType) { + return new AddColumn(null, name, dataType); + } + + /** + * Create a TableChange for adding a nested column to a table. + *

+ * The parent name is used to find the parent struct type where the nested field will be added. + * If the parent name is null, the new column will be added to the root as a top-level column. + * If parent identifies a struct, a new column is added to that struct. If it identifies a list, + * the column is added to the list element struct, and if it identifies a map, the new column is + * added to the map's value struct. + *

+ * The given name is used to name the new column and names containing "." are not handled + * differently. + * + * @param parent the new field's parent + * @param name the new field name + * @param dataType the new field's data type + * @return a TableChange for the addition + */ + static TableChange addColumn(String parent, String name, DataType dataType) { + return new AddColumn(parent, name, dataType); + } + + /** + * Create a TableChange for renaming a field. + *

+ * The name is used to find the field to rename. The new name will replace the name of the type. + * For example, renameColumn("a.b.c", "x") should produce column a.b.x. + * + * @param name the current field name + * @param newName the new name + * @return a TableChange for the rename + */ + static TableChange renameColumn(String name, String newName) { + return new RenameColumn(name, newName); + } + + /** + * Create a TableChange for updating the type of a field. + *

+ * The name is used to find the field to update. + * + * @param name the field name + * @param newDataType the new data type + * @return a TableChange for the update + */ + static TableChange updateColumn(String name, DataType newDataType) { + return new UpdateColumn(name, newDataType); + } + + /** + * Create a TableChange for deleting a field from a table. + * + * @param name the name of the field to delete + * @return a TableChange for the delete + */ + static TableChange deleteColumn(String name) { + return new DeleteColumn(name); + } + + final class AddColumn implements TableChange { + private final String parent; + private final String name; + private final DataType dataType; + + private AddColumn(String parent, String name, DataType dataType) { + this.parent = parent; + this.name = name; + this.dataType = dataType; + } + + public String parent() { + return parent; + } + + public String name() { + return name; + } + + public DataType type() { + return dataType; + } + } + + final class RenameColumn implements TableChange { + private final String name; + private final String newName; + + private RenameColumn(String name, String newName) { + this.name = name; + this.newName = newName; + } + + public String name() { + return name; + } + + public String newName() { + return newName; + } + } + + final class UpdateColumn implements TableChange { + private final String name; + private final DataType newDataType; + + private UpdateColumn(String name, DataType newDataType) { + this.name = name; + this.newDataType = newDataType; + } + + public String name() { + return name; + } + + public DataType newDataType() { + return newDataType; + } + } + + final class DeleteColumn implements TableChange { + private final String name; + + private DeleteColumn(String name) { + this.name = name; + } + + public String name() { + return name; + } + } + +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java new file mode 100644 index 0000000000000..0d869108fa7d3 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CaseInsensitiveStringMapSuite.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +public class CaseInsensitiveStringMapSuite { + @Test + public void testPutAndGet() { + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + options.put("kEy", "valUE"); + + Assert.assertEquals("Should return correct value for lower-case key", + "valUE", options.get("key")); + Assert.assertEquals("Should return correct value for upper-case key", + "valUE", options.get("KEY")); + } + + @Test + public void testKeySet() { + CaseInsensitiveStringMap options = CaseInsensitiveStringMap.empty(); + options.put("kEy", "valUE"); + + Set expectedKeySet = new HashSet<>(); + expectedKeySet.add("key"); + + Assert.assertEquals("Should return lower-case key set", expectedKeySet, options.keySet()); + } +} diff --git a/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java new file mode 100644 index 0000000000000..62e26af7f0c60 --- /dev/null +++ b/sql/catalyst/src/test/java/org/apache/spark/sql/catalog/v2/CatalogLoadingSuite.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +import org.apache.spark.SparkException; +import org.apache.spark.sql.internal.SQLConf; +import org.junit.Assert; +import org.junit.Test; + +import java.util.concurrent.Callable; + +public class CatalogLoadingSuite { + @Test + public void testLoad() throws SparkException { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.test-name", TestCatalogProvider.class.getCanonicalName()); + + CatalogProvider provider = Catalogs.load("test-name", conf); + Assert.assertNotNull("Should instantiate a non-null provider", provider); + Assert.assertEquals("Provider should have correct implementation", + TestCatalogProvider.class, provider.getClass()); + + TestCatalogProvider testProvider = (TestCatalogProvider) provider; + Assert.assertEquals("Options should contain only one key", 1, testProvider.options.size()); + Assert.assertEquals("Options should contain correct catalog name", + "test-name", testProvider.options.get("name")); + } + + @Test + public void testInitializationOptions() throws SparkException { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.test-name", TestCatalogProvider.class.getCanonicalName()); + conf.setConfString("spark.sql.catalog.test-name.name", "overwritten"); + conf.setConfString("spark.sql.catalog.test-name.kEy", "valUE"); + + CatalogProvider provider = Catalogs.load("test-name", conf); + Assert.assertNotNull("Should instantiate a non-null provider", provider); + Assert.assertEquals("Provider should have correct implementation", + TestCatalogProvider.class, provider.getClass()); + + TestCatalogProvider testProvider = (TestCatalogProvider) provider; + + Assert.assertEquals("Options should contain only two keys", 2, testProvider.options.size()); + Assert.assertEquals("Options should contain correct catalog name", + "test-name", testProvider.options.get("name")); + Assert.assertEquals("Options should contain correct value for key", + "valUE", testProvider.options.get("key")); + } + + @Test + public void testLoadWithoutConfig() { + SQLConf conf = new SQLConf(); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf)); + + Assert.assertTrue("Should complain that implementation is not configured", + exc.getMessage().contains("provider not found: spark.sql.catalog.missing is not defined")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("missing")); + } + + @Test + public void testLoadMissingClass() { + SQLConf conf = new SQLConf(); + conf.setConfString("spark.sql.catalog.missing", "com.example.NoSuchCatalogProvider"); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("missing", conf)); + + Assert.assertTrue("Should complain that the class is not found", + exc.getMessage().contains("Cannot find catalog provider class")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("missing")); + Assert.assertTrue("Should identify the missing class", + exc.getMessage().contains("com.example.NoSuchCatalogProvider")); + } + + @Test + public void testLoadNonCatalogProvider() { + SQLConf conf = new SQLConf(); + String invalidClassName = InvalidCatalogProvider.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should complain that class does not implement CatalogProvider", + exc.getMessage().contains("does not implement CatalogProvider")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("invalid")); + Assert.assertTrue("Should identify the class", exc.getMessage().contains(invalidClassName)); + } + + @Test + public void testLoadConstructorFailureCatalogProvider() { + SQLConf conf = new SQLConf(); + String invalidClassName = ConstructorFailureCatalogProvider.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + RuntimeException exc = intercept(RuntimeException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should have expected error message", + exc.getMessage().contains("Expected failure")); + } + + @Test + public void testLoadAccessErrorCatalogProvider() { + SQLConf conf = new SQLConf(); + String invalidClassName = AccessErrorCatalogProvider.class.getCanonicalName(); + conf.setConfString("spark.sql.catalog.invalid", invalidClassName); + + SparkException exc = intercept(SparkException.class, () -> Catalogs.load("invalid", conf)); + + Assert.assertTrue("Should complain that no public constructor is provided", + exc.getMessage().contains("Failed to call public no-arg constructor for catalog")); + Assert.assertTrue("Should identify the catalog by name", exc.getMessage().contains("invalid")); + Assert.assertTrue("Should identify the class", exc.getMessage().contains(invalidClassName)); + } + + @SuppressWarnings("unchecked") + public static E intercept(Class expected, Callable callable) { + try { + callable.call(); + Assert.fail("No exception was thrown, expected: " + + expected.getName()); + } catch (Exception actual) { + try { + Assert.assertEquals(expected, actual.getClass()); + return (E) actual; + } catch (AssertionError e) { + e.addSuppressed(actual); + throw e; + } + } + // Compiler doesn't catch that Assert.fail will always throw an exception. + throw new UnsupportedOperationException("[BUG] Should not reach this statement"); + } +} + +class TestCatalogProvider implements CatalogProvider { + CaseInsensitiveStringMap options = null; + + TestCatalogProvider() { + } + + @Override + public void initialize(CaseInsensitiveStringMap options) { + this.options = options; + } +} + +class ConstructorFailureCatalogProvider implements CatalogProvider { // fails in its constructor + ConstructorFailureCatalogProvider() { + throw new RuntimeException("Expected failure."); + } + + @Override + public void initialize(CaseInsensitiveStringMap options) { + } +} + +class AccessErrorCatalogProvider implements CatalogProvider { // no public constructor + private AccessErrorCatalogProvider() { + } + + @Override + public void initialize(CaseInsensitiveStringMap options) { + } +} + +class InvalidCatalogProvider { // doesn't implement CatalogProvider + public void initialize(CaseInsensitiveStringMap options) { + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index d9278d8cd23d6..a4c8de6afceb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -21,6 +21,7 @@ import java.io.Closeable import java.util.concurrent.atomic.AtomicReference import scala.collection.JavaConverters._ +import scala.collection.mutable import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -31,6 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog +import org.apache.spark.sql.catalog.v2.{CatalogProvider, Catalogs} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders._ @@ -610,6 +612,12 @@ class SparkSession private( */ @transient lazy val catalog: Catalog = new CatalogImpl(self) + @transient private lazy val catalogs = new mutable.HashMap[String, CatalogProvider]() + + private[sql] def catalog(name: String): CatalogProvider = synchronized { + catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) + } + /** * Returns the specified table/view as a `DataFrame`. * From 4697406ef56352adea40cc2794bc14be99ee97b5 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Wed, 15 Aug 2018 14:03:13 -0700 Subject: [PATCH 2/4] SPARK-24252: Add PartitionTransform to replace Expression. Expression is internal and should not be used in public APIs. To avoid using Expression in the TableCatalog API, this commit adds a small set of transformations that are used to communicate partitioning to catalog implementations. This also adds an apply transformation that passes the name of a transform instead of a Transform class. This can be used to pass transforms that are unknown to Spark to the underlying catalog implementation. --- .../sql/catalog/v2/PartitionTransform.java | 49 ++++ .../sql/catalog/v2/PartitionTransforms.java | 229 ++++++++++++++++++ .../apache/spark/sql/catalog/v2/Table.java | 7 +- .../spark/sql/catalog/v2/TableCatalog.java | 4 +- 4 files changed, 283 insertions(+), 6 deletions(-) create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransform.java create mode 100644 sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransform.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransform.java new file mode 100644 index 0000000000000..117c99a42eb8c --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransform.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * A logical transformation function. + *

+ * This does not support applying transformations; it only communicates the type of transformation + * and its input column references. + *

+ * This interface is used to pass partitioning transformations to v2 catalog implementations. For + * example a table may partition data by the date of a timestamp column, ts, using + * date(ts). This is similar to org.apache.spark.sql.sources.Filter, which is used to + * pass boolean filter expressions to data source implementations. + *

+ * To use data values directly as partition values, use the "identity" transform: + * identity(col). Identity partition transforms are the only transforms used by Hive. + * For Hive tables, SQL statements produce data columns that are used without modification to + * partition the remaining data columns. + *

+ * Table formats other than Hive can use partition transforms to automatically derive partition + * values from rows and to transform data predicates to partition predicates. + */ +public interface PartitionTransform { + /** + * The name of this transform. + */ + String name(); + + /** + * The data columns that are referenced by this transform. + */ + String[] references(); +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java new file mode 100644 index 0000000000000..179f0505cc265 --- /dev/null +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2; + +/** + * A standard set of transformations that are passed to data sources during table creation. + * + * @see PartitionTransform + */ +public class PartitionTransforms { + private PartitionTransforms() { + } + + /** + * Create a transform for a column with the given name. + *

+ * This transform is used to pass named transforms that are not known to Spark. + * + * @param transform a name of the transform to apply to the column + * @param colName a column name + * @return an Apply transform for the column + */ + public static PartitionTransform apply(String transform, String colName) { + if ("identity".equals(transform)) { + return identity(colName); + } else if ("year".equals(transform)) { + return year(colName); + } else if ("month".equals(transform)) { + return month(colName); + } else if ("date".equals(transform)) { + return date(colName); + } else if ("hour".equals(transform)) { + return hour(colName); + } + + // unknown transform names are passed to sources with Apply + return new Apply(transform, colName); + } + + /** + * Create an identity transform for a column name. + * + * @param colName a column name + * @return an Identity transform for the column + */ + public static Identity identity(String colName) { + return new Identity(colName); + } + + /** + * Create a bucket transform with the given number of buckets for the named columns. + * + * @param colNames a column name + * @param numBuckets the number of buckets + * @return a BucketBy transform for the column + */ + public static Bucket bucket(int numBuckets, String... colNames) { + return new Bucket(numBuckets, colNames); + } + + /** + * Create a year transform for a column name. + *

+ * The corresponding column should be a timestamp or date column. + * + * @param colName a column name + * @return a Year transform for the column + */ + public static Year year(String colName) { + return new Year(colName); + } + + /** + * Create a month transform for a column name. + *

+ * The corresponding column should be a timestamp or date column. + * + * @param colName a column name + * @return a Month transform for the column + */ + public static Month month(String colName) { + return new Month(colName); + } + + /** + * Create a date transform for a column name. + *

+ * The corresponding column should be a timestamp or date column. + * + * @param colName a column name + * @return a Date transform for the column + */ + public static Date date(String colName) { + return new Date(colName); + } + + /** + * Create a date and hour transform for a column name. + *

+ * The corresponding column should be a timestamp column. + * + * @param colName a column name + * @return a DateAndHour transform for the column + */ + public static DateAndHour hour(String colName) { + return new DateAndHour(colName); + } + + private abstract static class SingleColumnTransform implements PartitionTransform { + private final String[] colNames; + + private SingleColumnTransform(String colName) { + this.colNames = new String[] { colName }; + } + + @Override + public String[] references() { + return colNames; + } + } + + public static final class Identity extends SingleColumnTransform { + private Identity(String colName) { + super(colName); + } + + @Override + public String name() { + return "identity"; + } + } + + public static final class Bucket implements PartitionTransform { + private final int numBuckets; + private final String[] colNames; + + private Bucket(int numBuckets, String[] colNames) { + this.numBuckets = numBuckets; + this.colNames = colNames; + } + + @Override + public String name() { + return "bucket"; + } + + public int numBuckets() { + return numBuckets; + } + + @Override + public String[] references() { + return colNames; + } + } + + public static final class Year extends SingleColumnTransform { + private Year(String colName) { + super(colName); + } + + @Override + public String name() { + return "year"; + } + } + + public static final class Month extends SingleColumnTransform { + private Month(String colName) { + super(colName); + } + + @Override + public String name() { + return "month"; + } + } + + public static final class Date extends SingleColumnTransform { + private Date(String colName) { + super(colName); + } + + @Override + public String name() { + return "date"; + } + } + + public static final class DateAndHour extends SingleColumnTransform { + private DateAndHour(String colName) { + super(colName); + } + + @Override + public String name() { + return "hour"; + } + } + + public static final class Apply extends SingleColumnTransform { + private final String transformName; + + private Apply(String transformName, String colName) { + super(colName); + this.transformName = transformName; + } + + @Override + public String name() { + return transformName; + } + } +} diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java index 30a20f27b8c65..644f7d474fc58 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/Table.java @@ -17,7 +17,6 @@ package org.apache.spark.sql.catalog.v2; -import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.types.StructType; import java.util.List; @@ -40,8 +39,8 @@ public interface Table { StructType schema(); /** - * Return the table partitioning expressions. - * @return this table's partitioning expressions + * Return the table partitioning transforms. + * @return this table's partitioning transforms */ - List partitionExpressions(); + List partitioning(); } diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java index 539beb0c39c56..8b9c89b509dce 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java @@ -20,7 +20,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException; -import org.apache.spark.sql.catalyst.expressions.Expression; import org.apache.spark.sql.types.StructType; import java.util.Arrays; @@ -89,10 +88,11 @@ default Table createTable(TableIdentifier ident, * @param properties a string map of table properties * @return metadata for the new table * @throws TableAlreadyExistsException If a table already exists for the identifier + * @throws UnsupportedOperationException If a requested partition transform is not supported */ Table createTable(TableIdentifier ident, StructType schema, - List partitions, + List partitions, Map properties) throws TableAlreadyExistsException; /** From dca4bf8176eaa92de295de54488c3398256e0f7a Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Fri, 17 Aug 2018 17:07:23 -0700 Subject: [PATCH 3/4] SPARK-24252: Add TableCatalog impl backed by SessionCatalog. --- .../org/apache/spark/sql/SparkSession.scala | 13 +- .../sql/catalog/v2/V1MetadataTable.scala | 117 ++++++++++++++ .../spark/sql/catalog/v2/V1TableCatalog.scala | 146 ++++++++++++++++++ 3 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1TableCatalog.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index a4c8de6afceb3..49e18354616b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -32,7 +32,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalog.Catalog -import org.apache.spark.sql.catalog.v2.{CatalogProvider, Catalogs} +import org.apache.spark.sql.catalog.v2.{CatalogProvider, Catalogs, V1TableCatalog} import org.apache.spark.sql.catalyst._ import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation import org.apache.spark.sql.catalyst.encoders._ @@ -614,8 +614,15 @@ class SparkSession private( @transient private lazy val catalogs = new mutable.HashMap[String, CatalogProvider]() - private[sql] def catalog(name: String): CatalogProvider = synchronized { - catalogs.getOrElseUpdate(name, Catalogs.load(name, sessionState.conf)) + @transient private lazy val v1CatalogAsV2 = new V1TableCatalog(sessionState) + + private[sql] def catalog(name: Option[String]): CatalogProvider = synchronized { + name match { + case Some(catalogName) => + catalogs.getOrElseUpdate(catalogName, Catalogs.load(catalogName, sessionState.conf)) + case _ => + v1CatalogAsV2 + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala new file mode 100644 index 0000000000000..60a2492f34a80 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import java.util +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.SaveMode +import org.apache.spark.sql.catalog.v2.PartitionTransforms.{bucket, identity} +import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogUtils} +import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2, ReadSupport, WriteSupport} +import org.apache.spark.sql.sources.v2.reader.DataSourceReader +import org.apache.spark.sql.sources.v2.writer.DataSourceWriter +import org.apache.spark.sql.types.StructType + +/** + * An implementation of catalog v2 [[Table]] to expose v1 table metadata. + */ +private[sql] class V1MetadataTable( + v1Table: CatalogTable, + v2Source: Option[DataSourceV2]) extends Table { + + def readDelegate: ReadSupport = v2Source match { + case r: ReadSupport => r + case _ => throw new UnsupportedOperationException(s"Source does not support reads: $v2Source") + } + + def writeDelegate: WriteSupport = v2Source match { + case w: WriteSupport => w + case _ => throw new UnsupportedOperationException(s"Source does not support writes: $v2Source") + } + + lazy val options: Map[String, String] = { + v1Table.storage.locationUri match { + case Some(uri) => + v1Table.storage.properties + ("path" -> CatalogUtils.URIToString(uri)) + case _ => + v1Table.storage.properties + } + } + + override lazy val properties: util.Map[String, String] = { + val allProperties = new util.HashMap[String, String]() + + allProperties.putAll(v1Table.properties.asJava) + allProperties.putAll(options.asJava) + + allProperties + } + + override lazy val schema: StructType = { + v1Table.schema + } + + override lazy val partitioning: util.List[PartitionTransform] = { + val partitions = new util.ArrayList[PartitionTransform]() + + v1Table.partitionColumnNames.foreach(col => partitions.add(identity(col))) + v1Table.bucketSpec.map(spec => + partitions.add(bucket(spec.numBuckets, spec.bucketColumnNames: _*))) + + partitions + } + + def mergeOptions(readOptions: DataSourceOptions): DataSourceOptions = { + val newMap = new util.HashMap[String, String]() + newMap.putAll(options.asJava) + newMap.putAll(readOptions.asMap) + new DataSourceOptions(newMap) + } +} + +private[sql] trait DelegateReadSupport extends ReadSupport { + def readDelegate: ReadSupport + + def mergeOptions(options: DataSourceOptions): DataSourceOptions + + override def createReader(options: DataSourceOptions): DataSourceReader = { + readDelegate.createReader(mergeOptions(options)) + } + + override def createReader( + schema: StructType, + options: DataSourceOptions): DataSourceReader = { + readDelegate.createReader(schema, mergeOptions(options)) + } +} + +private[sql] trait DelegateWriteSupport extends WriteSupport { + def writeDelegate: WriteSupport + + def mergeOptions(options: DataSourceOptions): DataSourceOptions + + override def createWriter( + writeUUID: String, + schema: StructType, + mode: SaveMode, + options: DataSourceOptions): util.Optional[DataSourceWriter] = { + assert(mode == SaveMode.Append, "Only append mode is supported") + writeDelegate.createWriter(writeUUID, schema, mode, mergeOptions(options)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1TableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1TableCatalog.scala new file mode 100644 index 0000000000000..8f7e381a704a4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1TableCatalog.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalog.v2 + +import java.util + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.catalog.v2.PartitionTransforms.{Bucket, Identity} +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException +import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType, SessionCatalog} +import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.internal.SessionState +import org.apache.spark.sql.sources.v2.{ReadSupport, WriteSupport} +import org.apache.spark.sql.types.StructType + +/** + * A [[TableCatalog]] that translates calls to a v1 SessionCatalog. + */ +class V1TableCatalog(sessionState: SessionState) extends TableCatalog { + + private lazy val catalog: SessionCatalog = sessionState.catalog + + override def loadTable( + ident: TableIdentifier): Table = { + val catalogTable = catalog.getTableMetadata(ident) + + catalogTable.provider match { + case Some(provider) => + DataSource.lookupDataSource(provider, sessionState.conf).newInstance() match { + case v2Source: ReadSupport if v2Source.isInstanceOf[WriteSupport] => + new V1MetadataTable(catalogTable, Some(v2Source)) + with DelegateReadSupport with DelegateWriteSupport + + case v2Source: ReadSupport => + new V1MetadataTable(catalogTable, Some(v2Source)) with DelegateReadSupport + + case v2Source: WriteSupport => + new V1MetadataTable(catalogTable, Some(v2Source)) with DelegateWriteSupport + + case _ => + new V1MetadataTable(catalogTable, None) + } + + case _ => + new V1MetadataTable(catalogTable, None) + } + } + + override def createTable(ident: TableIdentifier, + schema: StructType, + partitions: util.List[PartitionTransform], + properties: util.Map[String, String]): Table = { + val (partitionColumns, maybeBucketSpec) = convertTransforms(partitions.asScala) + val source = properties.getOrDefault("provider", sessionState.conf.defaultDataSourceName) + val tableProperties = properties.asScala + val storage = DataSource.buildStorageFormatFromOptions(tableProperties.toMap) + + val tableDesc = CatalogTable( + identifier = ident.copy( + database = Some(ident.database.getOrElse(sessionState.catalog.getCurrentDatabase))), + tableType = CatalogTableType.MANAGED, + storage = storage, + schema = schema, + provider = Some(source), + partitionColumnNames = partitionColumns, + bucketSpec = maybeBucketSpec, + properties = tableProperties.toMap, + tracksPartitionsInCatalog = sessionState.conf.manageFilesourcePartitions) + + catalog.createTable(tableDesc, ignoreIfExists = false, validateLocation = false) + + loadTable(ident) + } + + override def alterTable(ident: TableIdentifier, + changes: util.List[TableChange]): Table = { + throw new UnsupportedOperationException("Alter table is not supported for this source") + } + + /** + * Drop a table in the catalog. + * + * @param ident a table identifier + * @return true if a table was deleted, false if no table exists for the identifier + */ + override def dropTable(ident: TableIdentifier): Boolean = { + try { + if (loadTable(ident) != null) { + catalog.dropTable(ident, ignoreIfNotExists = true, purge = true /* skip HDFS trash */) + true + } else { + false + } + } catch { + case _: NoSuchTableException => + false + } + } + + override def initialize(options: CaseInsensitiveStringMap): Unit = { + // do nothing. + } + + private def convertTransforms( + partitions: Seq[PartitionTransform]): (Seq[String], Option[BucketSpec]) = { + val (identityTransforms, bucketTransforms) = partitions.partition(_.isInstanceOf[Identity]) + + val nonBucketTransforms = bucketTransforms.filterNot(_.isInstanceOf[Bucket]) + if (nonBucketTransforms.nonEmpty) { + throw new UnsupportedOperationException("SessionCatalog does not support partition " + + s"transforms: ${nonBucketTransforms.mkString(", ")}") + } + + val bucketSpec = bucketTransforms.size match { + case 0 => + None + case 1 => + val bucket = bucketTransforms.head.asInstanceOf[Bucket] + Some(BucketSpec(bucket.numBuckets, bucket.references, Nil)) + case _ => + throw new UnsupportedOperationException("SessionCatalog does not support multiple " + + s"clusterings: ${bucketTransforms.mkString(", ")}") + } + + val identityCols = identityTransforms.map(_.references.head) + + (identityCols, bucketSpec) + } +} From 6b45a119df8e6382fa2503f854b4a85aed3e3785 Mon Sep 17 00:00:00 2001 From: Ryan Blue Date: Mon, 20 Aug 2018 15:26:45 -0700 Subject: [PATCH 4/4] SPARK-24252: Fix build problems. --- .../org/apache/spark/sql/catalog/v2/PartitionTransforms.java | 2 +- .../org/apache/spark/sql/catalog/v2/V1MetadataTable.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java index 179f0505cc265..2de3f9e133a78 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/PartitionTransforms.java @@ -121,7 +121,7 @@ public static DateAndHour hour(String colName) { return new DateAndHour(colName); } - private abstract static class SingleColumnTransform implements PartitionTransform { + abstract static class SingleColumnTransform implements PartitionTransform { private final String[] colNames; private SingleColumnTransform(String colName) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala index 60a2492f34a80..37f1502724741 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/v2/V1MetadataTable.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalog.v2 import java.util + import scala.collection.JavaConverters._ import org.apache.spark.sql.SaveMode @@ -70,7 +71,7 @@ private[sql] class V1MetadataTable( override lazy val partitioning: util.List[PartitionTransform] = { val partitions = new util.ArrayList[PartitionTransform]() - v1Table.partitionColumnNames.foreach(col => partitions.add(identity(col))) + v1Table.partitionColumnNames.foreach(col => partitions.add(identity(col): PartitionTransform)) v1Table.bucketSpec.map(spec => partitions.add(bucket(spec.numBuckets, spec.bucketColumnNames: _*)))