Skip to content

Commit

Permalink
Use Java enum for SaveMode.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Feb 9, 2015
1 parent 4679665 commit 99950a2
Show file tree
Hide file tree
Showing 12 changed files with 36 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,23 @@
*/
package org.apache.spark.sql.sources;

public class SaveModes {

/**
* SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
*/
public enum SaveMode {
/**
* Gets the Append object.
* Append mode means that when saving a DataFrame to a data source, if data already exists,
* contents of the DataFrame are expected to be appended to existing data.
*/
public static final SaveMode Append = Append$.MODULE$;
Append,
/**
* Gets the Overwrite object.
* Overwrite mode means that when saving a DataFrame to a data source, if data already exists,
* existing data is expected to be overwritten by the contents of the DataFrame.
*/
public static final SaveMode Overwrite = Overwrite$.MODULE$;

Overwrite,
/**
* Gets the ErrorIfExists object.
* ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
* an exception is expected to be thrown.
*/
public static final SaveMode ErrorIfExists = ErrorIfExists$.MODULE$;
ErrorIfExists
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ private[sql] class DataFrameImpl protected[sql](

override def saveAsParquetFile(path: String): Unit = {
if (sqlContext.conf.parquetUseDataSourceApi) {
save("org.apache.spark.sql.parquet", SaveModes.ErrorIfExists, Map("path" -> path))
save("org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> path))
} else {
sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
}
Expand Down Expand Up @@ -396,7 +396,7 @@ private[sql] class DataFrameImpl protected[sql](
}

override def save(path: String): Unit = {
save(path, SaveModes.ErrorIfExists)
save(path, SaveMode.ErrorIfExists)
}

override def save(path: String, mode: SaveMode): Unit = {
Expand All @@ -405,7 +405,7 @@ private[sql] class DataFrameImpl protected[sql](
}

override def save(path: String, dataSourceName: String): Unit = {
save(dataSourceName, SaveModes.ErrorIfExists, Map("path" -> path))
save(dataSourceName, SaveMode.ErrorIfExists, Map("path" -> path))
}

override def save(path: String, dataSourceName: String, mode: SaveMode): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql

import org.apache.spark.sql.sources.SaveMode

import scala.reflect.ClassTag

import org.apache.spark.api.java.JavaRDD
Expand All @@ -27,9 +25,9 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedSt
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.sources.SaveMode
import org.apache.spark.sql.types.StructType


private[sql] class IncomputableColumn(protected[sql] val expr: Expression) extends Column {

def this(name: String) = this(name match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,13 @@ private[sql] class DefaultSource
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
mode match {
case Append =>
case SaveMode.Append =>
sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
case Overwrite =>
case SaveMode.Overwrite =>
if (fs.exists(filesystemPath)) {
fs.delete(filesystemPath, true)
}
case ErrorIfExists =>
case SaveMode.ErrorIfExists =>
if (fs.exists(filesystemPath)) {
sys.error(s"path $path already exists.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,13 +87,13 @@ class DefaultSource
val filesystemPath = new Path(path)
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
mode match {
case Append =>
case SaveMode.Append =>
sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
case Overwrite =>
case SaveMode.Overwrite =>
if (fs.exists(filesystemPath)) {
fs.delete(filesystemPath, true)
}
case ErrorIfExists =>
case SaveMode.ErrorIfExists =>
if (fs.exists(filesystemPath)) {
sys.error(s"path $path already exists.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ private [sql] case class CreateTempTableUsingAsSelect(

def run(sqlContext: SQLContext) = {
val df = DataFrame(sqlContext, query)
val resolved = ResolvedDataSource(sqlContext, provider, SaveModes.ErrorIfExists, options, df)
val resolved = ResolvedDataSource(sqlContext, provider, SaveMode.ErrorIfExists, options, df)
sqlContext.registerRDDAsTable(
DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ trait CreatableRelationProvider {
* ErrorIfExists mode means that when saving a DataFrame to a data source,
* if data already exists, an exception is expected to be thrown.
*
* For Java users, mode can be inspected through equality check (e.g. mode == SaveModes.Append).
*
* @param sqlContext
* @param mode
* @param parameters
Expand Down
48 changes: 0 additions & 48 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/modes.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -70,29 +70,11 @@ public void setUp() throws IOException {
df.registerTempTable("jsonTable");
}

@Test
public void getSaveMode() {
SaveMode append = SaveModes.Append;
Assert.assertTrue(append == SaveModes.Append);
Assert.assertTrue(append != SaveModes.Overwrite);
Assert.assertTrue(append != SaveModes.ErrorIfExists);

SaveMode overwrite = SaveModes.Overwrite;
Assert.assertTrue(overwrite == SaveModes.Overwrite);
Assert.assertTrue(overwrite != SaveModes.Append);
Assert.assertTrue(overwrite != SaveModes.ErrorIfExists);

SaveMode errorIfExists = SaveModes.ErrorIfExists;
Assert.assertTrue(errorIfExists == SaveModes.ErrorIfExists);
Assert.assertTrue(errorIfExists != SaveModes.Append);
Assert.assertTrue(errorIfExists != SaveModes.Overwrite);
}

@Test
public void saveAndLoad() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
df.save("org.apache.spark.sql.json", SaveModes.ErrorIfExists, options);
df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);

DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", options);

Expand All @@ -103,7 +85,7 @@ public void saveAndLoad() {
public void saveAndLoadWithSchema() {
Map<String, String> options = new HashMap<String, String>();
options.put("path", path.toString());
df.save("org.apache.spark.sql.json", SaveModes.ErrorIfExists, options);
df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);

List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {

test("save with data source and options, and load") {
conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
df.save("org.apache.spark.sql.json", SaveModes.ErrorIfExists, Map("path" -> path.toString))
df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, Map("path" -> path.toString))
checkLoad
}

Expand All @@ -103,16 +103,15 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
df.save(path.toString, "org.apache.spark.sql.json")
checkLoad

df.save("org.apache.spark.sql.json", SaveModes.Overwrite, Map("path" -> path.toString))
df.save("org.apache.spark.sql.json", SaveMode.Overwrite, Map("path" -> path.toString))
checkLoad

message = intercept[RuntimeException] {
df.save("org.apache.spark.sql.json", SaveModes.Append, Map("path" -> path.toString))
df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
}.getMessage

assert(
message.contains("Append mode is not supported"),
"We should complain that 'Append mode is not supported' for JSON source.")

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.execution

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.sources.{SaveModes, ResolvedDataSource}
import org.apache.spark.sql.sources.{SaveMode, ResolvedDataSource}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -168,7 +168,7 @@ case class CreateMetastoreDataSourceAsSelect(
}

// Create the relation based on the data of df.
ResolvedDataSource(sqlContext, provider, SaveModes.ErrorIfExists, optionsWithPath, df)
ResolvedDataSource(sqlContext, provider, SaveMode.ErrorIfExists, optionsWithPath, df)

hiveContext.catalog.createDataSourceTable(
tableName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public class JavaMetastoreDataSourcesSuite {

String originalDefaultSource;
File path;
File hiveManagedPath;
DataFrame df;

private void checkAnswer(DataFrame actual, List<Row> expected) {
Expand All @@ -64,6 +65,10 @@ public void setUp() throws IOException {
if (path.exists()) {
path.delete();
}
hiveManagedPath = new File(sqlContext.catalog().hiveDefaultTableFilePath("javaSavedTable"));
if (hiveManagedPath.exists()) {
hiveManagedPath.delete();
}

List<String> jsonObjects = new ArrayList<String>(10);
for (int i = 0; i < 10; i++) {
Expand Down

0 comments on commit 99950a2

Please sign in to comment.