diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 92e04ce17c2e7..15b6344948961 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -532,8 +532,9 @@ trait DataFrame extends RDDApi[Row] { /** * :: Experimental :: - * Creates a table from the the contents of this DataFrame. This will fail if the table already - * exists. + * Creates a table from the the contents of this DataFrame with a set of options. + * It will use the default data source + * configured in spark.sql.source.default. This will fail if the table already exists. * * Note that this currently only works with DataFrames that are created from a HiveContext as * there is no notion of a persisted catalog in a standard SQL context. Instead you can write @@ -541,12 +542,13 @@ trait DataFrame extends RDDApi[Row] { * be the target of an `insertInto`. */ @Experimental - def saveAsTable(tableName: String): Unit + def saveAsTable(tableName: String, options: (String, String)*): Unit /** * :: Experimental :: - * Creates a table from the the contents of this DataFrame based on a given data source and - * a set of options. This will fail if the table already exists. + * Creates a table from the the contents of this DataFrame based on a given data source + * and a set of options. + * This will fail if the table already exists. * * Note that this currently only works with DataFrames that are created from a HiveContext as * there is no notion of a persisted catalog in a standard SQL context. Instead you can write @@ -557,9 +559,24 @@ trait DataFrame extends RDDApi[Row] { def saveAsTable( tableName: String, dataSourceName: String, - option: (String, String), options: (String, String)*): Unit + /** + * :: Experimental :: + * Creates a table from the the contents of this DataFrame based on a set of options. + * It will use the default data source configured in spark.sql.source.default. + * This will fail if the table already exists. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + */ + @Experimental + def saveAsTable( + tableName: String, + options: java.util.Map[String, String]): Unit + /** * :: Experimental :: * Creates a table from the the contents of this DataFrame based on a given data source and @@ -576,20 +593,66 @@ trait DataFrame extends RDDApi[Row] { dataSourceName: String, options: java.util.Map[String, String]): Unit + /** + * :: Experimental :: + * Creates a table from the the contents of this DataFrame based on a given data source + * and a set of options. This will fail if the table already exists. + * + * Note that this currently only works with DataFrames that are created from a HiveContext as + * there is no notion of a persisted catalog in a standard SQL context. Instead you can write + * an RDD out to a parquet file, and then register that file as a table. This "table" can then + * be the target of an `insertInto`. + */ + @Experimental + def saveAsTable( + tableName: String, + dataSourceName: String, + options: Map[String, String]): Unit + + /** + * :: Experimental :: + * Saves the contents of this DataFrame to the given path. + * It will use the default data source configured in spark.sql.source.default. + */ @Experimental def save(path: String): Unit + /** + * :: Experimental :: + * Saves the contents of this DataFrame to the given path based on the given data source + * and a set of options. + */ + @Experimental + def save(path: String, dataSourceName: String, options: (String, String)*): Unit + + /** + * :: Experimental :: + * Saves the contents of this DataFrame based on the given data source and a set of options. + */ @Experimental def save( dataSourceName: String, option: (String, String), options: (String, String)*): Unit + /** + * :: Experimental :: + * Saves the contents of this DataFrame based on the given data source and a set of options. + */ @Experimental def save( dataSourceName: String, options: java.util.Map[String, String]): Unit + /** + * :: Experimental :: + * Saves the contents of this DataFrame based on the given data source and a set of options. + */ + @Experimental + def save( + dataSourceName: String, + options: Map[String, String]): Unit + /** * :: Experimental :: * Adds the rows from this RDD to the specified table, optionally overwriting the existing data. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 4911443dd6dde..839f9567d9d1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython} import org.apache.spark.sql.json.JsonRDD -import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan} +import org.apache.spark.sql.sources.{CaseInsensitiveMap, ResolvedDataSource, CreateTableUsingAsLogicalPlan} import org.apache.spark.sql.types.{NumericType, StructType} @@ -304,62 +304,79 @@ private[sql] class DataFrameImpl protected[sql]( } } - override def saveAsTable(tableName: String): Unit = { + override def saveAsTable(tableName: String, options: (String, String)*): Unit = { val dataSourceName = sqlContext.conf.defaultDataSourceName - val cmd = - CreateTableUsingAsLogicalPlan( - tableName, - dataSourceName, - temporary = false, - Map.empty, - allowExisting = false, - logicalPlan) - - sqlContext.executePlan(cmd).toRdd + saveAsTable(tableName, dataSourceName, options.toMap) } override def saveAsTable( tableName: String, dataSourceName: String, - option: (String, String), options: (String, String)*): Unit = { + saveAsTable(tableName, dataSourceName, Map.empty[String, String]) + } + + override def saveAsTable( + tableName: String, + options: java.util.Map[String, String]): Unit = { + val dataSourceName = sqlContext.conf.defaultDataSourceName + saveAsTable(tableName, dataSourceName, options) + } + + override def saveAsTable( + tableName: String, + dataSourceName: String, + options: java.util.Map[String, String]): Unit = { + saveAsTable(tableName, dataSourceName, options.toMap) + } + + override def saveAsTable( + tableName: String, + dataSourceName: String, + options: Map[String, String]): Unit = { val cmd = CreateTableUsingAsLogicalPlan( tableName, dataSourceName, temporary = false, - (option +: options).toMap, + options, allowExisting = false, logicalPlan) sqlContext.executePlan(cmd).toRdd } - override def saveAsTable( - tableName: String, - dataSourceName: String, - options: java.util.Map[String, String]): Unit = { - val opts = options.toSeq - saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*) - } - override def save(path: String): Unit = { val dataSourceName = sqlContext.conf.defaultDataSourceName - save(dataSourceName, "path" -> path) + save(path, dataSourceName) + } + + override def save(path: String, dataSourceName: String, options: (String, String)*): Unit = { + val opts = new CaseInsensitiveMap(options.toMap) + if (opts.contains("path")) { + sys.error(s"path already specified as $path. Please do not add path in options.") + } + + save(dataSourceName, "path" -> path, options:_*) } override def save( dataSourceName: String, option: (String, String), options: (String, String)*): Unit = { - ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this) + save(dataSourceName, (option +: options).toMap) } override def save( dataSourceName: String, options: java.util.Map[String, String]): Unit = { - val opts = options.toSeq - save(dataSourceName, opts.head, opts.tail:_*) + save(dataSourceName, options.toMap) + } + + override def save( + dataSourceName: String, + options: Map[String, String]): Unit = { + ResolvedDataSource(sqlContext, dataSourceName, options, this) } override def insertInto(tableName: String, overwrite: Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index 180f5e765fb91..db49033fcc55d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -50,7 +50,7 @@ private[spark] object SQLConf { val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" // This is used to set the default data source - val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource" + val DEFAULT_DATA_SOURCE_NAME = "spark.sql.source.default" // Whether to perform eager analysis on a DataFrame. val DATAFRAME_EAGER_ANALYSIS = "spark.sql.dataframe.eagerAnalysis" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala index 9c37e0169ff85..b8a2325e2bb43 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala @@ -360,7 +360,7 @@ private [sql] case class CreateTempTableUsingAsSelect( /** * Builds a map in which keys are case insensitive */ -protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] +protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String] with Serializable { val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))