Skip to content

Commit

Permalink
[SPARK-27813][SQL] DataSourceV2: Add DropTable logical operation
Browse files Browse the repository at this point in the history
  • Loading branch information
jzhuge committed May 26, 2019
1 parent a79cb84 commit db4a417
Show file tree
Hide file tree
Showing 17 changed files with 441 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,8 @@ statement
DROP (IF EXISTS)? partitionSpec (',' partitionSpec)* #dropTablePartitions
| ALTER TABLE tableIdentifier partitionSpec? SET locationSpec #setTableLocation
| ALTER TABLE tableIdentifier RECOVER PARTITIONS #recoverPartitions
| DROP TABLE (IF EXISTS)? tableIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? tableIdentifier #dropTable
| DROP TABLE (IF EXISTS)? multipartIdentifier PURGE? #dropTable
| DROP VIEW (IF EXISTS)? multipartIdentifier #dropTable
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
VIEW (IF NOT EXISTS)? tableIdentifier
identifierCommentList?
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.SparkException;
import org.apache.spark.annotation.Private;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;

import static org.apache.spark.sql.catalog.v2.Catalogs.classKey;
import static org.apache.spark.sql.catalog.v2.Catalogs.isOptionKey;
import static org.apache.spark.sql.catalog.v2.Catalogs.optionKeyPrefix;
import static scala.collection.JavaConverters.mapAsJavaMapConverter;

@Private
public class CatalogManager {

private final SQLConf conf;

public CatalogManager(SQLConf conf) {
this.conf = conf;
}

/**
* Load a catalog.
*
* @param name a catalog name
* @return a catalog plugin
*/
public CatalogPlugin load(String name) throws SparkException {
return Catalogs.load(name, conf);
}

/**
* Add a catalog.
*
* @param name a catalog name
* @param pluginClassName a catalog plugin class name
* @param options catalog options
*/
public void add(
String name,
String pluginClassName,
CaseInsensitiveStringMap options) {
options.entrySet().stream()
.forEach(e -> conf.setConfString(optionKeyPrefix(name) + e.getKey(), e.getValue()));
conf.setConfString(classKey(name), pluginClassName);
}

/**
* Add a catalog without option.
*
* @param name a catalog name
* @param pluginClassName a catalog plugin class name
*/
public void add(
String name,
String pluginClassName) {
add(name, pluginClassName, CaseInsensitiveStringMap.empty());
}

/**
* Remove a catalog.
*
* @param name a catalog name
*/
public void remove(String name) {
conf.unsetConf(classKey(name));
mapAsJavaMapConverter(conf.getAllConfs()).asJava().keySet().stream()
.filter(key -> isOptionKey(name, key))
.forEach(conf::unsetConf);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.Utils;

import java.util.HashMap;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static scala.collection.JavaConverters.mapAsJavaMapConverter;

Expand All @@ -35,6 +33,23 @@ public class Catalogs {
private Catalogs() {
}

public static String classKey(String name) {
return "spark.sql.catalog." + name;
}

public static String optionKeyPrefix(String name) {
return "spark.sql.catalog." + name + ".";
}

public static boolean isOptionKey(String name, String keyName) {
return keyName.startsWith(optionKeyPrefix(name));
}

public static String optionName(String name, String keyName) {
assert(isOptionKey(name, keyName));
return keyName.substring(optionKeyPrefix(name).length());
}

/**
* Load and configure a catalog by name.
* <p>
Expand All @@ -49,10 +64,10 @@ private Catalogs() {
*/
public static CatalogPlugin load(String name, SQLConf conf)
throws CatalogNotFoundException, SparkException {
String pluginClassName = conf.getConfString("spark.sql.catalog." + name, null);
String pluginClassName = conf.getConfString(classKey(name), null);
if (pluginClassName == null) {
throw new CatalogNotFoundException(String.format(
"Catalog '%s' plugin class not found: spark.sql.catalog.%s is not defined", name, name));
"Catalog '%s' plugin class not found: %s is not defined", name, classKey(name)));
}

ClassLoader loader = Utils.getContextOrSparkClassLoader();
Expand Down Expand Up @@ -96,17 +111,12 @@ public static CatalogPlugin load(String name, SQLConf conf)
* @return a case insensitive string map of options starting with spark.sql.catalog.(name).
*/
private static CaseInsensitiveStringMap catalogOptions(String name, SQLConf conf) {
Map<String, String> allConfs = mapAsJavaMapConverter(conf.getAllConfs()).asJava();
Pattern prefix = Pattern.compile("^spark\\.sql\\.catalog\\." + name + "\\.(.+)");

HashMap<String, String> options = new HashMap<>();
for (Map.Entry<String, String> entry : allConfs.entrySet()) {
Matcher matcher = prefix.matcher(entry.getKey());
if (matcher.matches() && matcher.groupCount() > 0) {
options.put(matcher.group(1), entry.getValue());
}
}

Map<String, String> options =
mapAsJavaMapConverter(conf.getAllConfs()).asJava().entrySet().stream()
.filter(e -> isOptionKey(name, e.getKey()))
.collect(Collectors.toMap(
e -> optionName(name, e.getKey()),
e -> e.getValue()));
return new CaseInsensitiveStringMap(options);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
* An {@link Identifier} implementation.
Expand Down Expand Up @@ -49,6 +51,21 @@ public String name() {
return name;
}

private String quote(String part) {
if (part.contains("`")) {
return part.replace("`", "``");
} else {
return part;
}
}

@Override
public String toString() {
return Stream.concat(Stream.of(namespace), Stream.of(name))
.map(part -> '`' + quote(part) + '`')
.collect(Collectors.joining("."));
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
Expand Down Expand Up @@ -178,4 +179,17 @@ public double getDouble(String key, double defaultValue) {
public Map<String, String> asCaseSensitiveMap() {
return Collections.unmodifiableMap(original);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
CaseInsensitiveStringMap that = (CaseInsensitiveStringMap) o;
return delegate.equals(that.delegate);
}

@Override
public int hashCode() {
return Objects.hash(delegate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,14 @@ object OverwritePartitionsDynamic {
}
}

/**
* Drop a table.
*/
case class DropTable(
catalog: TableCatalog,
ident: Identifier,
ifExists: Boolean) extends Command


/**
* Insert some data into a table. Note that this plan is unresolved and has to be replaced by the
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalyst.plans.logical.sql

import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* A DROP TABLE statement, as parsed from SQL.
*/
case class DropTableStatement(
tableName: Seq[String],
ifExists: Boolean,
isView: Boolean,
purge: Boolean) extends ParsedStatement {

override def output: Seq[Attribute] = Seq.empty

override def children: Seq[LogicalPlan] = Seq.empty
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.catalog.v2;

import org.apache.spark.SparkException;
import org.apache.spark.sql.internal.SQLConf;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.HashMap;

import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.junit.Assert.assertThat;

public class CatalogManagerSuite {

@Rule
public final ExpectedException exception = ExpectedException.none();

CatalogManager catalogManager = new CatalogManager(new SQLConf());

@Test
public void testAdd() throws SparkException {
CaseInsensitiveStringMap options = new CaseInsensitiveStringMap(
new HashMap<String, String>() {{
put("option1", "value1");
put("option2", "value2");
}});
catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName(), options);
CatalogPlugin catalogPlugin = catalogManager.load("testcat");
assertThat(catalogPlugin.name(), is("testcat"));
assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class));
assertThat(((TestCatalogPlugin) catalogPlugin).options, is(options));
}

@Test
public void testAddWithOption() throws SparkException {
catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName());
CatalogPlugin catalogPlugin = catalogManager.load("testcat");
assertThat(catalogPlugin.name(), is("testcat"));
assertThat(catalogPlugin, instanceOf(TestCatalogPlugin.class));
assertThat(((TestCatalogPlugin) catalogPlugin).options, is(CaseInsensitiveStringMap.empty()));
}

@Test
public void testRemove() throws SparkException {
catalogManager.add("testcat", TestCatalogPlugin.class.getCanonicalName());
catalogManager.load("testcat");
catalogManager.remove("testcat");
exception.expect(CatalogNotFoundException.class);
catalogManager.load("testcat");
}
}
Loading

0 comments on commit db4a417

Please sign in to comment.