Skip to content

Commit

Permalink
Add more save APIs to DataFrame.
Browse files Browse the repository at this point in the history
  • Loading branch information
yhuai committed Feb 6, 2015
1 parent 6d3b7cb commit 5390743
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 34 deletions.
75 changes: 69 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -532,21 +532,23 @@ trait DataFrame extends RDDApi[Row] {

/**
* :: Experimental ::
* Creates a table from the the contents of this DataFrame. This will fail if the table already
* exists.
* Creates a table from the the contents of this DataFrame with a set of options.
* It will use the default data source
* configured in spark.sql.source.default. This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*/
@Experimental
def saveAsTable(tableName: String): Unit
def saveAsTable(tableName: String, options: (String, String)*): Unit

/**
* :: Experimental ::
* Creates a table from the the contents of this DataFrame based on a given data source and
* a set of options. This will fail if the table already exists.
* Creates a table from the the contents of this DataFrame based on a given data source
* and a set of options.
* This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
Expand All @@ -557,9 +559,24 @@ trait DataFrame extends RDDApi[Row] {
def saveAsTable(
tableName: String,
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit

/**
* :: Experimental ::
* Creates a table from the the contents of this DataFrame based on a set of options.
* It will use the default data source configured in spark.sql.source.default.
* This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*/
@Experimental
def saveAsTable(
tableName: String,
options: java.util.Map[String, String]): Unit

/**
* :: Experimental ::
* Creates a table from the the contents of this DataFrame based on a given data source and
Expand All @@ -576,20 +593,66 @@ trait DataFrame extends RDDApi[Row] {
dataSourceName: String,
options: java.util.Map[String, String]): Unit

/**
* :: Experimental ::
* Creates a table from the the contents of this DataFrame based on a given data source
* and a set of options. This will fail if the table already exists.
*
* Note that this currently only works with DataFrames that are created from a HiveContext as
* there is no notion of a persisted catalog in a standard SQL context. Instead you can write
* an RDD out to a parquet file, and then register that file as a table. This "table" can then
* be the target of an `insertInto`.
*/
@Experimental
def saveAsTable(
tableName: String,
dataSourceName: String,
options: Map[String, String]): Unit

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path.
* It will use the default data source configured in spark.sql.source.default.
*/
@Experimental
def save(path: String): Unit

/**
* :: Experimental ::
* Saves the contents of this DataFrame to the given path based on the given data source
* and a set of options.
*/
@Experimental
def save(path: String, dataSourceName: String, options: (String, String)*): Unit

/**
* :: Experimental ::
* Saves the contents of this DataFrame based on the given data source and a set of options.
*/
@Experimental
def save(
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit

/**
* :: Experimental ::
* Saves the contents of this DataFrame based on the given data source and a set of options.
*/
@Experimental
def save(
dataSourceName: String,
options: java.util.Map[String, String]): Unit

/**
* :: Experimental ::
* Saves the contents of this DataFrame based on the given data source and a set of options.
*/
@Experimental
def save(
dataSourceName: String,
options: Map[String, String]): Unit

/**
* :: Experimental ::
* Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
Expand Down
69 changes: 43 additions & 26 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan}
import org.apache.spark.sql.sources.{CaseInsensitiveMap, ResolvedDataSource, CreateTableUsingAsLogicalPlan}
import org.apache.spark.sql.types.{NumericType, StructType}


Expand Down Expand Up @@ -304,62 +304,79 @@ private[sql] class DataFrameImpl protected[sql](
}
}

override def saveAsTable(tableName: String): Unit = {
override def saveAsTable(tableName: String, options: (String, String)*): Unit = {
val dataSourceName = sqlContext.conf.defaultDataSourceName
val cmd =
CreateTableUsingAsLogicalPlan(
tableName,
dataSourceName,
temporary = false,
Map.empty,
allowExisting = false,
logicalPlan)

sqlContext.executePlan(cmd).toRdd
saveAsTable(tableName, dataSourceName, options.toMap)
}

override def saveAsTable(
tableName: String,
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit = {
saveAsTable(tableName, dataSourceName, Map.empty[String, String])
}

override def saveAsTable(
tableName: String,
options: java.util.Map[String, String]): Unit = {
val dataSourceName = sqlContext.conf.defaultDataSourceName
saveAsTable(tableName, dataSourceName, options)
}

override def saveAsTable(
tableName: String,
dataSourceName: String,
options: java.util.Map[String, String]): Unit = {
saveAsTable(tableName, dataSourceName, options.toMap)
}

override def saveAsTable(
tableName: String,
dataSourceName: String,
options: Map[String, String]): Unit = {
val cmd =
CreateTableUsingAsLogicalPlan(
tableName,
dataSourceName,
temporary = false,
(option +: options).toMap,
options,
allowExisting = false,
logicalPlan)

sqlContext.executePlan(cmd).toRdd
}

override def saveAsTable(
tableName: String,
dataSourceName: String,
options: java.util.Map[String, String]): Unit = {
val opts = options.toSeq
saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*)
}

override def save(path: String): Unit = {
val dataSourceName = sqlContext.conf.defaultDataSourceName
save(dataSourceName, "path" -> path)
save(path, dataSourceName)
}

override def save(path: String, dataSourceName: String, options: (String, String)*): Unit = {
val opts = new CaseInsensitiveMap(options.toMap)
if (opts.contains("path")) {
sys.error(s"path already specified as $path. Please do not add path in options.")
}

save(dataSourceName, "path" -> path, options:_*)
}

override def save(
dataSourceName: String,
option: (String, String),
options: (String, String)*): Unit = {
ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this)
save(dataSourceName, (option +: options).toMap)
}

override def save(
dataSourceName: String,
options: java.util.Map[String, String]): Unit = {
val opts = options.toSeq
save(dataSourceName, opts.head, opts.tail:_*)
save(dataSourceName, options.toMap)
}

override def save(
dataSourceName: String,
options: Map[String, String]): Unit = {
ResolvedDataSource(sqlContext, dataSourceName, options, this)
}

override def insertInto(tableName: String, overwrite: Boolean): Unit = {
Expand Down
2 changes: 1 addition & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ private[spark] object SQLConf {
val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"

// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"
val DEFAULT_DATA_SOURCE_NAME = "spark.sql.source.default"

// Whether to perform eager analysis on a DataFrame.
val DATAFRAME_EAGER_ANALYSIS = "spark.sql.dataframe.eagerAnalysis"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ private [sql] case class CreateTempTableUsingAsSelect(
/**
* Builds a map in which keys are case insensitive
*/
protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
with Serializable {

val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))
Expand Down

0 comments on commit 5390743

Please sign in to comment.