From f0b653e21616a9f599404738fd81a70f1ae8c9b8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 16 Mar 2016 14:45:29 +0800 Subject: [PATCH] update --- .../org/apache/spark/sql/DataFrameReader.scala | 4 ++-- .../scala/org/apache/spark/sql/Dataset.scala | 6 +++++- .../scala/org/apache/spark/sql/SQLContext.scala | 17 +++++++---------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 76b8d71ac9359..62ab9395c218a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -128,7 +128,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) - Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) + Dataset.newNamedDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) } /** @@ -345,7 +345,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions) } - Dataset.newDataFrame( + Dataset.newNamedDataFrame( sqlContext, LogicalRDD( schema.toAttributes, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5533e2d05f73a..a6e37fd23251f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -60,9 +60,13 @@ private[sql] object Dataset { new Dataset[Row](sqlContext, qe, RowEncoder(qe.analyzed.schema)) } + def newNamedDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { + newDataFrame(sqlContext, SubqueryAlias(newDataFrameName, logicalPlan)) + } + private[this] val nextDataFrameId = new AtomicLong(0) - def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" + private def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 413c757f9a840..17596911f643b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand @@ -367,8 +367,7 @@ class SQLContext private[sql]( val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) - val relation = LogicalRDD(attributeSeq, rowRDD)(self) - Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) + Dataset.newNamedDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self)) } /** @@ -383,8 +382,7 @@ class SQLContext private[sql]( SQLContext.setActive(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes - val relation = LocalRelation.fromProduct(attributeSeq, data) - Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) + Dataset.newNamedDataFrame(self, LocalRelation.fromProduct(attributeSeq, data)) } /** @@ -394,7 +392,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { - Dataset.newDataFrame(this, LogicalRelation(baseRelation)) + Dataset.newNamedDataFrame(this, LogicalRelation(baseRelation)) } /** @@ -448,8 +446,8 @@ class SQLContext private[sql]( } else { rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)} } - val relation = LogicalRDD(schema.toAttributes, catalystRows)(self) - Dataset.newDataFrame(this, SubqueryAlias(Dataset.newDataFrameName, relation)) + val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) + Dataset.newNamedDataFrame(this, logicalPlan) } @@ -887,8 +885,7 @@ class SQLContext private[sql]( rdd: RDD[Array[Any]], schema: StructType): DataFrame = { val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow]) - val relation = LogicalRDD(schema.toAttributes, rowRdd)(self) - Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) + Dataset.newNamedDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) } /**