-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-13636][SQL] Directly consume UnsafeRow in wholestage codegen plans #11484
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
9c77d73
6941eb1
00e5090
7428fd4
6400eb2
dea644a
6f0ae35
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -105,6 +105,8 @@ case class Sort( | |
| // Name of sorter variable used in codegen. | ||
| private var sorterVariable: String = _ | ||
|
|
||
| override def consumeUnsafeRow: Boolean = true | ||
|
|
||
| override protected def doProduce(ctx: CodegenContext): String = { | ||
| val needToSort = ctx.freshName("needToSort") | ||
| ctx.addMutableState("boolean", needToSort, s"$needToSort = true;") | ||
|
|
@@ -153,18 +155,22 @@ case class Sort( | |
| """.stripMargin.trim | ||
| } | ||
|
|
||
| override def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { | ||
| val colExprs = child.output.zipWithIndex.map { case (attr, i) => | ||
| BoundReference(i, attr.dataType, attr.nullable) | ||
| } | ||
| override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = { | ||
| if (row != null) { | ||
| s"$sorterVariable.insertRow((UnsafeRow)$row.copy());" | ||
|
||
| } else { | ||
| val colExprs = child.output.zipWithIndex.map { case (attr, i) => | ||
| BoundReference(i, attr.dataType, attr.nullable) | ||
| } | ||
|
|
||
| ctx.currentVars = input | ||
| val code = GenerateUnsafeProjection.createCode(ctx, colExprs) | ||
| ctx.currentVars = input | ||
| val code = GenerateUnsafeProjection.createCode(ctx, colExprs) | ||
|
|
||
| s""" | ||
| | // Convert the input attributes to an UnsafeRow and add it to the sorter | ||
| | ${code.code} | ||
| | $sorterVariable.insertRow(${code.value}); | ||
| """.stripMargin.trim | ||
| s""" | ||
| | // Convert the input attributes to an UnsafeRow and add it to the sorter | ||
| | ${code.code} | ||
| | $sorterVariable.insertRow(${code.value}); | ||
| """.stripMargin.trim | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -67,7 +67,12 @@ trait CodegenSupport extends SparkPlan { | |
| /** | ||
| * Which SparkPlan is calling produce() of this one. It's itself for the first SparkPlan. | ||
| */ | ||
| private var parent: CodegenSupport = null | ||
| protected var parent: CodegenSupport = null | ||
|
|
||
| /** | ||
| * Whether this SparkPlan accepts UnsafeRow as input in consumeChild. | ||
|
||
| */ | ||
| def consumeUnsafeRow: Boolean = false | ||
|
||
|
|
||
| /** | ||
| * Returns all the RDDs of InternalRow which generates the input rows. | ||
|
|
@@ -109,7 +114,10 @@ trait CodegenSupport extends SparkPlan { | |
| * Consume the columns generated from current SparkPlan, call it's parent. | ||
| */ | ||
| final def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = { | ||
| if (input != null) { | ||
| // We check if input expressions has same length as output when: | ||
| // 1. parent can't consume UnsafeRow and input is not null. | ||
| // 2. parent consumes UnsafeRow and row is null. | ||
| if ((input != null && !parent.consumeUnsafeRow) || (parent.consumeUnsafeRow && row == null)) { | ||
|
||
| assert(input.length == output.length) | ||
| } | ||
| parent.consumeChild(ctx, this, input, row) | ||
|
|
@@ -125,14 +133,27 @@ trait CodegenSupport extends SparkPlan { | |
| row: String = null): String = { | ||
| ctx.freshNamePrefix = variablePrefix | ||
| if (row != null) { | ||
| ctx.currentVars = null | ||
| ctx.INPUT_ROW = row | ||
| val evals = child.output.zipWithIndex.map { case (attr, i) => | ||
| BoundReference(i, attr.dataType, attr.nullable).gen(ctx) | ||
| val evals: Seq[ExprCode] = if (!consumeUnsafeRow) { | ||
| // If this SparkPlan can't consume UnsafeRow and there is an UnsafeRow, | ||
| // we extract the columns from the row and call doConsume. | ||
| ctx.currentVars = null | ||
| ctx.INPUT_ROW = row | ||
| child.output.zipWithIndex.map { case (attr, i) => | ||
| BoundReference(i, attr.dataType, attr.nullable).gen(ctx) | ||
| } | ||
| } else { | ||
| // If this SparkPlan consumes UnsafeRow and there is an UnsafeRow, | ||
| // we don't need to unpack variables from the row. | ||
| Seq.empty | ||
| } | ||
| val evalCode = if (evals.isEmpty) { | ||
| "" | ||
| } else { | ||
| s"${evals.map(_.code).mkString("\n")}" | ||
| } | ||
| s""" | ||
| | ${evals.map(_.code).mkString("\n")} | ||
| | ${doConsume(ctx, evals)} | ||
| | $evalCode | ||
| | ${doConsume(ctx, evals, row)} | ||
| """.stripMargin | ||
| } else { | ||
| doConsume(ctx, input) | ||
|
||
|
|
@@ -151,7 +172,7 @@ trait CodegenSupport extends SparkPlan { | |
| * # call consume(), which will call parent.doConsume() | ||
| * } | ||
| */ | ||
| protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode]): String = { | ||
| protected def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = { | ||
| throw new UnsupportedOperationException | ||
| } | ||
| } | ||
|
|
@@ -191,17 +212,29 @@ case class InputAdapter(child: SparkPlan) extends LeafNode with CodegenSupport { | |
| val input = ctx.freshName("input") | ||
| // Right now, InputAdapter is only used when there is one upstream. | ||
| ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") | ||
|
|
||
| val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) | ||
| val row = ctx.freshName("row") | ||
| ctx.INPUT_ROW = row | ||
| ctx.currentVars = null | ||
| val columns = exprs.map(_.gen(ctx)) | ||
|
|
||
| // If the parent of this InputAdapter can't consume UnsafeRow, | ||
| // we unpack variables from the row. | ||
| val columns: Seq[ExprCode] = if (!this.parent.consumeUnsafeRow) { | ||
| val exprs = output.zipWithIndex.map(x => new BoundReference(x._2, x._1.dataType, true)) | ||
| ctx.INPUT_ROW = row | ||
| ctx.currentVars = null | ||
| exprs.map(_.gen(ctx)) | ||
| } else { | ||
| // If the parent consumes UnsafeRow, we don't need to unpack the row. | ||
| Seq.empty | ||
| } | ||
| val columnsCode = if (columns.isEmpty) { | ||
| "" | ||
| } else { | ||
| s"${columns.map(_.code).mkString("\n").trim}" | ||
| } | ||
| s""" | ||
| | while ($input.hasNext()) { | ||
| | InternalRow $row = (InternalRow) $input.next(); | ||
| | ${columns.map(_.code).mkString("\n").trim} | ||
| | ${consume(ctx, columns).trim} | ||
| | $columnsCode | ||
| | ${consume(ctx, columns, row).trim} | ||
| | if (shouldStop()) { | ||
| | return; | ||
| | } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can remove the default value for row here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.