From 33630883685eafcc3ee4521ea8363be342f6e6b4 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Fri, 10 Jul 2015 18:15:36 -0700 Subject: [PATCH] [SPARK-8961] [SQL] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row This is a follow-up of [SPARK-8888] [1], which also aims to optimize writing dynamic partitions. Three more changes can be made here: 1. Using `InternalRow` instead of `Row` in `BaseWriterContainer.outputWriterForRow` 2. Using `Cast` expressions to convert partition columns to strings, so that we can leverage code generation. 3. Replacing the FP-style `zip` and `map` calls with a faster imperative `while` loop. [1]: https://issues.apache.org/jira/browse/SPARK-8888 Author: Cheng Lian Closes #7331 from liancheng/spark-8961 and squashes the following commits: b5ab9ae [Cheng Lian] Casts Java iterator to Scala iterator explicitly 719e63b [Cheng Lian] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row --- .../apache/spark/sql/sources/commands.scala | 73 +++++++++++-------- 1 file changed, 42 insertions(+), 31 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 9189d176111d5..5c6ef2dc90c73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.sources import java.util.{Date, UUID} +import scala.collection.JavaConversions.asScalaIterator + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} @@ -26,15 +28,14 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceF import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode} +import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration private[sql] case class InsertIntoDataSource( @@ -170,14 +171,14 @@ private[sql] case class InsertIntoHadoopFsRelation( try { writerContainer.executorSideSetup(taskContext) - val converter = if (needsConversion) { + val converter: InternalRow => Row = if (needsConversion) { CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] } else { r: InternalRow => r.asInstanceOf[Row] } while (iterator.hasNext) { - val row = converter(iterator.next()) - writerContainer.outputWriterForRow(row).write(row) + val internalRow = iterator.next() + writerContainer.outputWriterForRow(internalRow).write(converter(internalRow)) } writerContainer.commitTask() @@ -239,7 +240,9 @@ private[sql] case class InsertIntoHadoopFsRelation( try { writerContainer.executorSideSetup(taskContext) - val partitionProj = newProjection(codegenEnabled, partitionOutput, output) + // Projects all partition columns and casts them to strings to build partition directories. + val partitionCasts = partitionOutput.map(Cast(_, StringType)) + val partitionProj = newProjection(codegenEnabled, partitionCasts, output) val dataProj = newProjection(codegenEnabled, dataOutput, output) val dataConverter: InternalRow => Row = if (needsConversion) { @@ -247,15 +250,11 @@ private[sql] case class InsertIntoHadoopFsRelation( } else { r: InternalRow => r.asInstanceOf[Row] } - val partitionSchema = StructType.fromAttributes(partitionOutput) - val partConverter: InternalRow => Row = - CatalystTypeConverters.createToScalaConverter(partitionSchema) - .asInstanceOf[InternalRow => Row] while (iterator.hasNext) { - val row = iterator.next() - val partitionPart = partConverter(partitionProj(row)) - val dataPart = dataConverter(dataProj(row)) + val internalRow = iterator.next() + val partitionPart = partitionProj(internalRow) + val dataPart = dataConverter(dataProj(internalRow)) writerContainer.outputWriterForRow(partitionPart).write(dataPart) } @@ -435,7 +434,7 @@ private[sql] abstract class BaseWriterContainer( } // Called on executor side when writing rows - def outputWriterForRow(row: Row): OutputWriter + def outputWriterForRow(row: InternalRow): OutputWriter protected def initWriters(): Unit @@ -477,7 +476,7 @@ private[sql] class DefaultWriterContainer( writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) } - override def outputWriterForRow(row: Row): OutputWriter = writer + override def outputWriterForRow(row: InternalRow): OutputWriter = writer override def commitTask(): Unit = { try { @@ -518,23 +517,36 @@ private[sql] class DynamicPartitionWriterContainer( outputWriters = new java.util.HashMap[String, OutputWriter] } - override def outputWriterForRow(row: Row): OutputWriter = { - // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient. - val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) => - val string = if (rawValue == null) null else String.valueOf(rawValue) - val valueString = if (string == null || string.isEmpty) { - defaultPartitionName - } else { - PartitioningUtils.escapePathName(string) + // The `row` argument is supposed to only contain partition column values which have been casted + // to strings. + override def outputWriterForRow(row: InternalRow): OutputWriter = { + val partitionPath = { + val partitionPathBuilder = new StringBuilder + var i = 0 + + while (i < partitionColumns.length) { + val col = partitionColumns(i) + val partitionValueString = { + val string = row.getString(i) + if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string) + } + + if (i > 0) { + partitionPathBuilder.append(Path.SEPARATOR_CHAR) + } + + partitionPathBuilder.append(s"$col=$partitionValueString") + i += 1 } - s"/$col=$valueString" - }.mkString.stripPrefix(Path.SEPARATOR) + + partitionPathBuilder.toString() + } val writer = outputWriters.get(partitionPath) if (writer.eq(null)) { val path = new Path(getWorkPath, partitionPath) - taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", - new Path(outputPath, partitionPath).toString) + taskAttemptContext.getConfiguration.set( + "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) outputWriters.put(partitionPath, newWriter) newWriter @@ -545,8 +557,7 @@ private[sql] class DynamicPartitionWriterContainer( private def clearOutputWriters(): Unit = { if (!outputWriters.isEmpty) { - val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator()) - iter.foreach(_.close()) + asScalaIterator(outputWriters.values().iterator()).foreach(_.close()) outputWriters.clear() } }