Skip to content

Commit

Permalink
[SPARK-8961] [SQL] Makes BaseWriterContainer.outputWriterForRow accep…
Browse files Browse the repository at this point in the history
…ts InternalRow instead of Row

This is a follow-up of [SPARK-8888] [1], which also aims to optimize writing dynamic partitions.

Three more changes can be made here:

1. Using `InternalRow` instead of `Row` in `BaseWriterContainer.outputWriterForRow`
2. Using `Cast` expressions to convert partition columns to strings, so that we can leverage code generation.
3. Replacing the FP-style `zip` and `map` calls with a faster imperative `while` loop.

[1]: https://issues.apache.org/jira/browse/SPARK-8888

Author: Cheng Lian <lian@databricks.com>

Closes apache#7331 from liancheng/spark-8961 and squashes the following commits:

b5ab9ae [Cheng Lian] Casts Java iterator to Scala iterator explicitly
719e63b [Cheng Lian] Makes BaseWriterContainer.outputWriterForRow accepts InternalRow instead of Row
  • Loading branch information
liancheng committed Jul 11, 2015
1 parent b6fc0ad commit 3363088
Showing 1 changed file with 42 additions and 31 deletions.
73 changes: 42 additions & 31 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ package org.apache.spark.sql.sources

import java.util.{Date, UUID}

import scala.collection.JavaConversions.asScalaIterator

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}

import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration

private[sql] case class InsertIntoDataSource(
Expand Down Expand Up @@ -170,14 +171,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
try {
writerContainer.executorSideSetup(taskContext)

val converter = if (needsConversion) {
val converter: InternalRow => Row = if (needsConversion) {
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
} else {
r: InternalRow => r.asInstanceOf[Row]
}
while (iterator.hasNext) {
val row = converter(iterator.next())
writerContainer.outputWriterForRow(row).write(row)
val internalRow = iterator.next()
writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
}

writerContainer.commitTask()
Expand Down Expand Up @@ -239,23 +240,21 @@ private[sql] case class InsertIntoHadoopFsRelation(
try {
writerContainer.executorSideSetup(taskContext)

val partitionProj = newProjection(codegenEnabled, partitionOutput, output)
// Projects all partition columns and casts them to strings to build partition directories.
val partitionCasts = partitionOutput.map(Cast(_, StringType))
val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
val dataProj = newProjection(codegenEnabled, dataOutput, output)

val dataConverter: InternalRow => Row = if (needsConversion) {
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
} else {
r: InternalRow => r.asInstanceOf[Row]
}
val partitionSchema = StructType.fromAttributes(partitionOutput)
val partConverter: InternalRow => Row =
CatalystTypeConverters.createToScalaConverter(partitionSchema)
.asInstanceOf[InternalRow => Row]

while (iterator.hasNext) {
val row = iterator.next()
val partitionPart = partConverter(partitionProj(row))
val dataPart = dataConverter(dataProj(row))
val internalRow = iterator.next()
val partitionPart = partitionProj(internalRow)
val dataPart = dataConverter(dataProj(internalRow))
writerContainer.outputWriterForRow(partitionPart).write(dataPart)
}

Expand Down Expand Up @@ -435,7 +434,7 @@ private[sql] abstract class BaseWriterContainer(
}

// Called on executor side when writing rows
def outputWriterForRow(row: Row): OutputWriter
def outputWriterForRow(row: InternalRow): OutputWriter

protected def initWriters(): Unit

Expand Down Expand Up @@ -477,7 +476,7 @@ private[sql] class DefaultWriterContainer(
writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}

override def outputWriterForRow(row: Row): OutputWriter = writer
override def outputWriterForRow(row: InternalRow): OutputWriter = writer

override def commitTask(): Unit = {
try {
Expand Down Expand Up @@ -518,23 +517,36 @@ private[sql] class DynamicPartitionWriterContainer(
outputWriters = new java.util.HashMap[String, OutputWriter]
}

override def outputWriterForRow(row: Row): OutputWriter = {
// TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
val string = if (rawValue == null) null else String.valueOf(rawValue)
val valueString = if (string == null || string.isEmpty) {
defaultPartitionName
} else {
PartitioningUtils.escapePathName(string)
// The `row` argument is supposed to only contain partition column values which have been casted
// to strings.
override def outputWriterForRow(row: InternalRow): OutputWriter = {
val partitionPath = {
val partitionPathBuilder = new StringBuilder
var i = 0

while (i < partitionColumns.length) {
val col = partitionColumns(i)
val partitionValueString = {
val string = row.getString(i)
if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
}

if (i > 0) {
partitionPathBuilder.append(Path.SEPARATOR_CHAR)
}

partitionPathBuilder.append(s"$col=$partitionValueString")
i += 1
}
s"/$col=$valueString"
}.mkString.stripPrefix(Path.SEPARATOR)

partitionPathBuilder.toString()
}

val writer = outputWriters.get(partitionPath)
if (writer.eq(null)) {
val path = new Path(getWorkPath, partitionPath)
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
outputWriters.put(partitionPath, newWriter)
newWriter
Expand All @@ -545,8 +557,7 @@ private[sql] class DynamicPartitionWriterContainer(

private def clearOutputWriters(): Unit = {
if (!outputWriters.isEmpty) {
val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator())
iter.foreach(_.close())
asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
outputWriters.clear()
}
}
Expand Down

0 comments on commit 3363088

Please sign in to comment.