Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen

import org.apache.spark.sql.catalyst.expressions._

import scala.collection.mutable.ArrayBuffer

// MutableProjection is not accessible in Java
abstract class BaseMutableProjection extends MutableProjection

Expand All @@ -45,10 +47,41 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
else
${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)};
"""
}.mkString("\n")
}
// collect projections into blocks as function has 64kb codesize limit in JVM
val projectionBlocks = new ArrayBuffer[String]()
val blockBuilder = new StringBuilder()
for (projection <- projectionCode) {
if (blockBuilder.length > 16 * 1000) {
projectionBlocks.append(blockBuilder.toString())
blockBuilder.clear()
}
blockBuilder.append(projection)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we insert newlines so that the generated code is slightly more readable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code itself already has a new line before and after. I looked at debug results and the code look reasonably. I'm happy to add an extra newline to be safe in case that assumption changes in the future. Just let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can add it later if it's a problem; this seems fine for now, but just wanted to check. Thanks for looking into this.

}
projectionBlocks.append(blockBuilder.toString())

val (projectionFuns, projectionCalls) = {
// inline execution if codesize limit was not broken
if (projectionBlocks.length == 1) {
("", projectionBlocks.head)
} else {
(
projectionBlocks.zipWithIndex.map { case (body, i) =>
s"""
|private void apply$i(InternalRow i) {
| $body
|}
""".stripMargin
}.mkString,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similarly, should this be mkString("\n") or is there implicitly a newline somewhere that I'm overlooking?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to above. The lines of s""" and """ seem already added newlines for readability.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM.

projectionBlocks.indices.map(i => s"apply$i(i);").mkString("\n")
)
}
}

val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) =>
s"private $javaType $variableName = $initialValue;"
}.mkString("\n ")

val code = s"""
public Object generate($exprType[] expr) {
return new SpecificProjection(expr);
Expand All @@ -75,9 +108,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
return (InternalRow) mutableRow;
}

$projectionFuns

public Object apply(Object _i) {
InternalRow i = (InternalRow) _i;
$projectionCode
$projectionCalls

return mutableRow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._
/**
* Additional tests for code generation.
*/
class CodeGenerationSuite extends SparkFunSuite {
class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {

test("multithreaded eval") {
import scala.concurrent._
Expand All @@ -42,4 +42,16 @@ class CodeGenerationSuite extends SparkFunSuite {

futures.foreach(Await.result(_, 10.seconds))
}

test("SPARK-8443: split wide projections into blocks due to JVM code size limit") {
val length = 5000
val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
val plan = GenerateMutableProjection.generate(expressions)()
val actual = plan(new GenericMutableRow(length)).toSeq
val expected = Seq.fill(length)(true)

if (!checkResult(actual, expected)) {
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
}
}
}