-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13255][SQL] Update vectorized reader to directly return ColumnarBatch instead of InternalRows. #11435
Changes from 9 commits
59dec91
058556c
2330576
42875ac
cab64e5
f35394c
3450313
f5f1e2b
ed79eee
48102e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -139,24 +139,79 @@ private[sql] case class PhysicalRDD( | |
// Support codegen so that we can avoid the UnsafeRow conversion in all cases. Codegen | ||
// never requires UnsafeRow as input. | ||
override protected def doProduce(ctx: CodegenContext): String = { | ||
val columnarBatchClz = "org.apache.spark.sql.execution.vectorized.ColumnarBatch" | ||
val input = ctx.freshName("input") | ||
val idx = ctx.freshName("batchIdx") | ||
val batch = ctx.freshName("batch") | ||
// PhysicalRDD always just has one input | ||
ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") | ||
ctx.addMutableState(columnarBatchClz, batch, s"$batch = null;") | ||
ctx.addMutableState("int", idx, s"$idx = 0;") | ||
|
||
val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) | ||
val row = ctx.freshName("row") | ||
val numOutputRows = metricTerm(ctx, "numOutputRows") | ||
ctx.INPUT_ROW = row | ||
ctx.currentVars = null | ||
val columns = exprs.map(_.gen(ctx)) | ||
|
||
// The input RDD can either return (all) ColumnarBatches or InternalRows. We determine this | ||
// by looking at the first value of the RDD and then calling the function which will process | ||
// the remaining. It is faster to return batches. | ||
// TODO: The abstractions between this class and SqlNewHadoopRDD makes it difficult to know | ||
// here which path to use. Fix this. | ||
|
||
|
||
val scanBatches = ctx.freshName("processBatches") | ||
ctx.addNewFunction(scanBatches, | ||
s""" | ||
| private void $scanBatches() throws java.io.IOException { | ||
| while (true) { | ||
| int numRows = $batch.numRows(); | ||
| if ($idx == 0) $numOutputRows.add(numRows); | ||
| | ||
| while ($idx < numRows) { | ||
| InternalRow $row = $batch.getRow($idx++); | ||
| ${columns.map(_.code).mkString("\n").trim} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We could generate code to use the ColumnBatch object, not InternalRow, not sure how the difference will be. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Something to try. Would be easy enough in a follow up. |
||
| ${consume(ctx, columns).trim} | ||
| if (shouldStop()) return; | ||
| } | ||
| | ||
| if (!$input.hasNext()) { | ||
| $batch = null; | ||
| break; | ||
| } | ||
| $batch = ($columnarBatchClz)$input.next(); | ||
| $idx = 0; | ||
| } | ||
| }""".stripMargin) | ||
|
||
val scanRows = ctx.freshName("processRows") | ||
ctx.addNewFunction(scanRows, | ||
s""" | ||
| private void $scanRows(InternalRow $row) throws java.io.IOException { | ||
| while (true) { | ||
| $numOutputRows.add(1); | ||
| ${columns.map(_.code).mkString("\n").trim} | ||
| ${consume(ctx, columns).trim} | ||
| if (shouldStop()) return; | ||
| if (!$input.hasNext()) break; | ||
| $row = (InternalRow)$input.next(); | ||
| } | ||
| }""".stripMargin) | ||
|
||
s""" | ||
| while ($input.hasNext()) { | ||
| InternalRow $row = (InternalRow) $input.next(); | ||
| $numOutputRows.add(1); | ||
| ${columns.map(_.code).mkString("\n").trim} | ||
| ${consume(ctx, columns).trim} | ||
| if (shouldStop()) { | ||
| return; | ||
| if ($batch != null || $input.hasNext()) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about this:
|
||
| if ($batch == null) { | ||
| Object value = $input.next(); | ||
| if (value instanceof $columnarBatchClz) { | ||
| $batch = ($columnarBatchClz)value; | ||
| $scanBatches(); | ||
| } else { | ||
| $scanRows((InternalRow)value); | ||
| } | ||
| } else { | ||
| $scanBatches(); | ||
| } | ||
| } | ||
""".stripMargin | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once we return ColumnarBatch in SqlNewHadoopRDD, the counter of number of records in SqlNewHadoopRDD will be wrong.
Should we have a BatchedSqlNewHadoopRDD for this purpose?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need to clean this up but let's do this in a follow up. That counter is too expensive to maintain right now and it's not clear why we would if we maintain sql metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They are used in different places, we can clean this later.