diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index 9189d176111d5..5c6ef2dc90c73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.sources import java.util.{Date, UUID} +import scala.collection.JavaConversions.asScalaIterator + import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat} @@ -26,15 +28,14 @@ import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceF import org.apache.spark._ import org.apache.spark.mapred.SparkHadoopMapRedUtil import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.execution.RunnableCommand -import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode} +import org.apache.spark.sql.types.StringType import org.apache.spark.util.SerializableConfiguration private[sql] case class InsertIntoDataSource( @@ -170,14 +171,14 @@ private[sql] case class InsertIntoHadoopFsRelation( try { writerContainer.executorSideSetup(taskContext) - val converter = if (needsConversion) { + val converter: InternalRow => Row = if (needsConversion) { CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row] } else { r: InternalRow => r.asInstanceOf[Row] } while (iterator.hasNext) { - val row = converter(iterator.next()) - writerContainer.outputWriterForRow(row).write(row) + val internalRow = iterator.next() + writerContainer.outputWriterForRow(internalRow).write(converter(internalRow)) } writerContainer.commitTask() @@ -239,7 +240,9 @@ private[sql] case class InsertIntoHadoopFsRelation( try { writerContainer.executorSideSetup(taskContext) - val partitionProj = newProjection(codegenEnabled, partitionOutput, output) + // Projects all partition columns and casts them to strings to build partition directories. + val partitionCasts = partitionOutput.map(Cast(_, StringType)) + val partitionProj = newProjection(codegenEnabled, partitionCasts, output) val dataProj = newProjection(codegenEnabled, dataOutput, output) val dataConverter: InternalRow => Row = if (needsConversion) { @@ -247,15 +250,11 @@ private[sql] case class InsertIntoHadoopFsRelation( } else { r: InternalRow => r.asInstanceOf[Row] } - val partitionSchema = StructType.fromAttributes(partitionOutput) - val partConverter: InternalRow => Row = - CatalystTypeConverters.createToScalaConverter(partitionSchema) - .asInstanceOf[InternalRow => Row] while (iterator.hasNext) { - val row = iterator.next() - val partitionPart = partConverter(partitionProj(row)) - val dataPart = dataConverter(dataProj(row)) + val internalRow = iterator.next() + val partitionPart = partitionProj(internalRow) + val dataPart = dataConverter(dataProj(internalRow)) writerContainer.outputWriterForRow(partitionPart).write(dataPart) } @@ -435,7 +434,7 @@ private[sql] abstract class BaseWriterContainer( } // Called on executor side when writing rows - def outputWriterForRow(row: Row): OutputWriter + def outputWriterForRow(row: InternalRow): OutputWriter protected def initWriters(): Unit @@ -477,7 +476,7 @@ private[sql] class DefaultWriterContainer( writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext) } - override def outputWriterForRow(row: Row): OutputWriter = writer + override def outputWriterForRow(row: InternalRow): OutputWriter = writer override def commitTask(): Unit = { try { @@ -518,23 +517,36 @@ private[sql] class DynamicPartitionWriterContainer( outputWriters = new java.util.HashMap[String, OutputWriter] } - override def outputWriterForRow(row: Row): OutputWriter = { - // TODO (SPARK-8888): zip and all the stuff happening here is very inefficient. - val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) => - val string = if (rawValue == null) null else String.valueOf(rawValue) - val valueString = if (string == null || string.isEmpty) { - defaultPartitionName - } else { - PartitioningUtils.escapePathName(string) + // The `row` argument is supposed to only contain partition column values which have been casted + // to strings. + override def outputWriterForRow(row: InternalRow): OutputWriter = { + val partitionPath = { + val partitionPathBuilder = new StringBuilder + var i = 0 + + while (i < partitionColumns.length) { + val col = partitionColumns(i) + val partitionValueString = { + val string = row.getString(i) + if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string) + } + + if (i > 0) { + partitionPathBuilder.append(Path.SEPARATOR_CHAR) + } + + partitionPathBuilder.append(s"$col=$partitionValueString") + i += 1 } - s"/$col=$valueString" - }.mkString.stripPrefix(Path.SEPARATOR) + + partitionPathBuilder.toString() + } val writer = outputWriters.get(partitionPath) if (writer.eq(null)) { val path = new Path(getWorkPath, partitionPath) - taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path", - new Path(outputPath, partitionPath).toString) + taskAttemptContext.getConfiguration.set( + "spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString) val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext) outputWriters.put(partitionPath, newWriter) newWriter @@ -545,8 +557,7 @@ private[sql] class DynamicPartitionWriterContainer( private def clearOutputWriters(): Unit = { if (!outputWriters.isEmpty) { - val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator()) - iter.foreach(_.close()) + asScalaIterator(outputWriters.values().iterator()).foreach(_.close()) outputWriters.clear() } }