From 940568000e9381712e9a27b050c9016c8c411ef9 Mon Sep 17 00:00:00 2001 From: Forest Fang Date: Thu, 18 Jun 2015 11:56:38 -0400 Subject: [PATCH 1/4] [SPARK-8443][SQL] split projection code by size limit --- .../codegen/GenerateMutableProjection.scala | 31 +++++++++++++++++-- .../expressions/CodeGenerationSuite.scala | 4 +++ 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 71e47d4f9b62..49d4e5df5fc5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -45,7 +45,32 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu else ${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)}; """ - }.mkString("\n") + } + val partitionedProjectionCode = projectionCode.foldLeft(List.empty[String]) { + (acc, code) => + acc match { + case Nil => List(code) + case head::tail => + // code size limit is 64kb and each char takes less or equal to 2 bytes + if (head.length < 32 * 1000) { + s"$head\n$code"::tail + } else { + code::acc + } + } + } + .zipWithIndex + .map { + case (body, i) => + s""" + private void apply$i(InternalRow i) { + $body + } + """ + } + val projectionCalls = ((partitionedProjectionCode.length - 1) to 0 by -1) + .map(i => s"apply$i(i);") + .mkString("\n") val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) => s"private $javaType $variableName = $initialValue;" }.mkString("\n ") @@ -75,9 +100,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu return (InternalRow) mutableRow; } + ${partitionedProjectionCode.mkString("\n")} + public Object apply(Object _i) { InternalRow i = (InternalRow) _i; - $projectionCode + $projectionCalls return mutableRow; } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 481b335d15df..8119b303c1db 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -42,4 +42,8 @@ class CodeGenerationSuite extends SparkFunSuite { futures.foreach(Await.result(_, 10.seconds)) } + + test("SPARK-8443: code size limit") { + GenerateMutableProjection.generate(List.fill(5000)(EqualTo(Literal(1), Literal(1)))) + } } From 1b5aa7e3c4fd9f0ffd2a55e0b26a1fdbda446005 Mon Sep 17 00:00:00 2001 From: Forest Fang Date: Sat, 18 Jul 2015 01:10:01 -0400 Subject: [PATCH 2/4] [SPARK-8443][SQL] inline execution if one block only --- .../codegen/GenerateMutableProjection.scala | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 49d4e5df5fc5..364b8d80af04 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -46,7 +46,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu ${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)}; """ } - val partitionedProjectionCode = projectionCode.foldLeft(List.empty[String]) { + val projectionBlocks = projectionCode.foldLeft(List.empty[String]) { (acc, code) => acc match { case Nil => List(code) @@ -58,19 +58,24 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu code::acc } } - } - .zipWithIndex - .map { - case (body, i) => - s""" - private void apply$i(InternalRow i) { - $body - } - """ } - val projectionCalls = ((partitionedProjectionCode.length - 1) to 0 by -1) - .map(i => s"apply$i(i);") - .mkString("\n") + val (projectionFuns, projectionCalls) = { + // inline execution if codesize limit was not broken + if (projectionBlocks.length == 1) { + ("", projectionBlocks.head) + } else { + ( + projectionBlocks.zipWithIndex.map { case (body, i) => + s""" + |private void apply$i(InternalRow i) { + | $body + |} + """.stripMargin + }.mkString, + ((projectionBlocks.length - 1) to 0 by -1).map(i => s"apply$i(i);").mkString("\n") + ) + } + } val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) => s"private $javaType $variableName = $initialValue;" }.mkString("\n ") @@ -100,7 +105,7 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu return (InternalRow) mutableRow; } - ${partitionedProjectionCode.mkString("\n")} + $projectionFuns public Object apply(Object _i) { InternalRow i = (InternalRow) _i; From adef95a513d6556cb58dd1584f842f75803990e5 Mon Sep 17 00:00:00 2001 From: Forest Fang Date: Sat, 18 Jul 2015 18:50:02 -0400 Subject: [PATCH 3/4] [SPARK-8443][SQL] Use safer factor and rewrite splitting code per @rxin suggestion to improve code readability --- .../codegen/GenerateMutableProjection.scala | 29 ++++++++++--------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala index 364b8d80af04..b82bd6814b48 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen import org.apache.spark.sql.catalyst.expressions._ +import scala.collection.mutable.ArrayBuffer + // MutableProjection is not accessible in Java abstract class BaseMutableProjection extends MutableProjection @@ -46,19 +48,18 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu ${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)}; """ } - val projectionBlocks = projectionCode.foldLeft(List.empty[String]) { - (acc, code) => - acc match { - case Nil => List(code) - case head::tail => - // code size limit is 64kb and each char takes less or equal to 2 bytes - if (head.length < 32 * 1000) { - s"$head\n$code"::tail - } else { - code::acc - } - } + // collect projections into blocks as function has 64kb codesize limit in JVM + val projectionBlocks = new ArrayBuffer[String]() + val blockBuilder = new StringBuilder() + for (projection <- projectionCode) { + if (blockBuilder.length > 16 * 1000) { + projectionBlocks.append(blockBuilder.toString()) + blockBuilder.clear() } + blockBuilder.append(projection) + } + projectionBlocks.append(blockBuilder.toString()) + val (projectionFuns, projectionCalls) = { // inline execution if codesize limit was not broken if (projectionBlocks.length == 1) { @@ -72,13 +73,15 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu |} """.stripMargin }.mkString, - ((projectionBlocks.length - 1) to 0 by -1).map(i => s"apply$i(i);").mkString("\n") + projectionBlocks.indices.map(i => s"apply$i(i);").mkString("\n") ) } } + val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) => s"private $javaType $variableName = $initialValue;" }.mkString("\n ") + val code = s""" public Object generate($exprType[] expr) { return new SpecificProjection(expr); From b7a7635376f6f4499e639093cf7a53e61a950ce7 Mon Sep 17 00:00:00 2001 From: Forest Fang Date: Sat, 18 Jul 2015 22:18:12 -0400 Subject: [PATCH 4/4] [SPARK-8443][SQL] Execute and verify split projections in test --- .../catalyst/expressions/CodeGenerationSuite.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala index 8119b303c1db..e05218a23aa7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen._ /** * Additional tests for code generation. */ -class CodeGenerationSuite extends SparkFunSuite { +class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper { test("multithreaded eval") { import scala.concurrent._ @@ -43,7 +43,15 @@ class CodeGenerationSuite extends SparkFunSuite { futures.foreach(Await.result(_, 10.seconds)) } - test("SPARK-8443: code size limit") { - GenerateMutableProjection.generate(List.fill(5000)(EqualTo(Literal(1), Literal(1)))) + test("SPARK-8443: split wide projections into blocks due to JVM code size limit") { + val length = 5000 + val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1))) + val plan = GenerateMutableProjection.generate(expressions)() + val actual = plan(new GenericMutableRow(length)).toSeq + val expected = Seq.fill(length)(true) + + if (!checkResult(actual, expected)) { + fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected") + } } }