Skip to content

Commit

Permalink
CR and add partition column benchmark.
Browse files Browse the repository at this point in the history
  • Loading branch information
nongli committed Mar 4, 2016
1 parent cab64e5 commit f35394c
Show file tree
Hide file tree
Showing 4 changed files with 64 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ public static void populate(ColumnVector col, InternalRow row, int fieldIdx) {
col.putDoubles(0, capacity, row.getDouble(fieldIdx));
} else if (t == DataTypes.StringType) {
UTF8String v = row.getUTF8String(fieldIdx);
byte[] bytes = v.getBytes();
for (int i = 0; i < capacity; i++) {
col.putByteArray(i, v.getBytes());
col.putByteArray(i, bytes);
}
} else if (t instanceof DecimalType) {
DecimalType dt = (DecimalType)t;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,18 +162,21 @@ private[sql] case class PhysicalRDD(
ctx.addNewFunction(scanBatches,
s"""
| private void $scanBatches($columnarBatchClz batch) throws java.io.IOException {
| int batchIdx = 0;
| while (true) {
| int numRows = batch.numRows();
| $numOutputRows.add(numRows);
| for (int i = 0; i < numRows; i++) {
| InternalRow $row = batch.getRow(i);
| if (batchIdx == 0) $numOutputRows.add(numRows);
|
| while (batchIdx < numRows) {
| InternalRow $row = batch.getRow(batchIdx++);
| ${columns.map(_.code).mkString("\n").trim}
| ${consume(ctx, columns).trim}
| if (shouldStop()) return;
| }
|
| if (shouldStop()) return;
| if (!$input.hasNext()) break;
| batch = ($columnarBatchClz)$input.next();
| batchIdx = 0;
| }
| }""".stripMargin)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,12 +222,14 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
sparkPlan
}

// Creates a ColumnarBatch that contains the values for `requiredColumns`. These columns can
// either come from `input` (columns scanned from the data source) or from the partitioning
// values (data from `partitionValues`). This is done *once* per physical partition. When
// the column is from `input`, it just references the same underlying column. When using
// partition columns, the column is populated once.
// TODO: there's probably a cleaner way to do this.
/**
* Creates a ColumnarBatch that contains the values for `requiredColumns`. These columns can
* either come from `input` (columns scanned from the data source) or from the partitioning
* values (data from `partitionValues`). This is done *once* per physical partition. When
* the column is from `input`, it just references the same underlying column. When using
* partition columns, the column is populated once.
* TODO: there's probably a cleaner way to do this.
*/
private def projectedColumnBatch(
input: ColumnarBatch,
requiredColumns: Seq[Attribute],
Expand Down Expand Up @@ -288,6 +290,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
// If we are returning batches directly, we need to augment them with the partitioning
// columns. We want to do this without a row by row operation.
var columnBatch: ColumnarBatch = null
var firstBatch: ColumnarBatch = null

iterator.map { input => {
if (input.isInstanceOf[InternalRow]) {
Expand All @@ -297,9 +300,11 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
require(input.isInstanceOf[ColumnarBatch])
val inputBatch = input.asInstanceOf[ColumnarBatch]
if (columnBatch == null) {
firstBatch = inputBatch
columnBatch = projectedColumnBatch(inputBatch, requiredColumns,
dataColumns, partitionColumnSchema, partitionValues)
}
require(firstBatch == inputBatch, "Reader must return the same batch object.")
columnBatch.setNumRows(inputBatch.numRows())
columnBatch
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ object ParquetReadBenchmark {

val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
// Driving the parquet reader directly without Spark.
parquetReaderBenchmark.addCase("ParquetReader") { num =>
parquetReaderBenchmark.addCase("ParquetReader Non-Vectorized") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
Expand All @@ -110,7 +110,7 @@ object ParquetReadBenchmark {
}

// Driving the parquet reader in batch mode directly.
parquetReaderBenchmark.addCase("ParquetReader(Batched)") { num =>
parquetReaderBenchmark.addCase("ParquetReader Vectorized") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
Expand All @@ -133,7 +133,7 @@ object ParquetReadBenchmark {
}

// Decoding in vectorized but having the reader return rows.
parquetReaderBenchmark.addCase("ParquetReader(Batch -> Row)") { num =>
parquetReaderBenchmark.addCase("ParquetReader Vectorized -> Row") { num =>
var sum = 0L
files.map(_.asInstanceOf[String]).foreach { p =>
val reader = new UnsafeRowParquetRecordReader
Expand Down Expand Up @@ -167,7 +167,7 @@ object ParquetReadBenchmark {
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Parquet Reader Single Int Column Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
ParquetReader 565 / 609 27.8 35.9 1.0X
ParquetReader (Non-vectorized) 565 / 609 27.8 35.9 1.0X
ParquetReader(Batched) 165 / 174 95.3 10.5 3.4X
ParquetReader(Batch -> Row) 158 / 188 99.3 10.1 3.6X
*/
Expand Down Expand Up @@ -264,9 +264,46 @@ object ParquetReadBenchmark {
}
}

def partitionTableScanBenchmark(values: Int): Unit = {
withTempPath { dir =>
withTempTable("t1", "tempTable") {
sqlContext.range(values).registerTempTable("t1")
sqlContext.sql("select id % 2 as p, cast(id as INT) as id from t1")
.write.partitionBy("p").parquet(dir.getCanonicalPath)
sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")

val benchmark = new Benchmark("Partitioned Table", values)

benchmark.addCase("Read data column") { iter =>
sqlContext.sql("select sum(id) from tempTable").collect
}

benchmark.addCase("Read partition column") { iter =>
sqlContext.sql("select sum(p) from tempTable").collect
}

benchmark.addCase("Read both columns") { iter =>
sqlContext.sql("select sum(p), sum(id) from tempTable").collect
}

/*
Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
Partitioned Table: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------
Read data column 751 / 805 20.9 47.8 1.0X
Read partition column 713 / 761 22.1 45.3 1.1X
Read both columns 1004 / 1109 15.7 63.8 0.7X
*/
benchmark.run()
}
}
}

def main(args: Array[String]): Unit = {
intScanBenchmark(1024 * 1024 * 15)
intStringScanBenchmark(1024 * 1024 * 10)
stringDictionaryScanBenchmark(1024 * 1024 * 10)
//intScanBenchmark(1024 * 1024 * 15)
//intStringScanBenchmark(1024 * 1024 * 10)
//stringDictionaryScanBenchmark(1024 * 1024 * 10)
partitionTableScanBenchmark(1024 * 1024 * 15)

}
}

0 comments on commit f35394c

Please sign in to comment.