Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
999ec13
[SPARK-22570][SQL] Avoid to create a lot of global variables by using…
kiszk Nov 30, 2017
6ac57fd
[SPARK-21417][SQL] Infer join conditions using propagated constraints
Nov 30, 2017
bcceab6
[SPARK-22489][SQL] Shouldn't change broadcast join buildSide if user …
wangyum Nov 30, 2017
f5f8e84
[SPARK-22614] Dataset API: repartitionByRange(...)
adrian-ionescu Nov 30, 2017
7e5f669
[SPARK-22428][DOC] Add spark application garbage collector configurat…
gaborgsomogyi Dec 1, 2017
7da1f57
[SPARK-22373] Bump Janino dependency version to fix thread safety issue…
Victsm Dec 1, 2017
dc36542
[SPARK-22653] executorAddress registered in CoarseGrainedSchedulerBac…
tgravescs Dec 1, 2017
16adaf6
[SPARK-22601][SQL] Data load is getting displayed successful on provi…
sujith71955 Dec 1, 2017
9d06a9e
[SPARK-22393][SPARK-SHELL] spark-shell can't find imported types in c…
mpetruska Dec 1, 2017
ee10ca7
[SPARK-22638][SS] Use a separate queue for StreamingQueryListenerBus
zsxwing Dec 1, 2017
aa4cf2b
[SPARK-22651][PYTHON][ML] Prevent initiating multiple Hive clients fo…
HyukjinKwon Dec 2, 2017
d2cf95a
[SPARK-22634][BUILD] Update Bouncy Castle to 1.58
srowen Dec 2, 2017
f23dddf
[SPARK-20682][SPARK-15474][SPARK-21791] Add new ORCFileFormat based o…
dongjoon-hyun Dec 3, 2017
2c16267
[SPARK-22669][SQL] Avoid unnecessary function calls in code generation
mgaido91 Dec 3, 2017
dff440f
[SPARK-22626][SQL] deals with wrong Hive's statistics (zero rowCount)
wangyum Dec 3, 2017
4131ad0
[SPARK-22489][DOC][FOLLOWUP] Update broadcast behavior changes in mig…
wangyum Dec 4, 2017
3927bb9
[SPARK-22473][FOLLOWUP][TEST] Remove deprecated Date functions
mgaido91 Dec 4, 2017
f81401e
[SPARK-22162] Executors and the driver should use consistent JobIDs i…
Dec 4, 2017
e1dd03e
[SPARK-22372][CORE, YARN] Make cluster submission use SparkApplication.
Dec 4, 2017
dcaac45
Spark on Kubernetes - basic submission client
liyinan926 Nov 10, 2017
27c67ff
Addressed first round of review comments
liyinan926 Nov 27, 2017
6d597d0
Made Client implement the SparkApplication trait
liyinan926 Nov 28, 2017
5b9fa39
Addressed the second round of comments
liyinan926 Nov 28, 2017
5ccadb5
Added missing step for supporting local:// dependencies and addressed…
liyinan926 Nov 30, 2017
12f2797
Fixed Scala style check errors
liyinan926 Nov 30, 2017
c35fe48
Addressed another round of comments
liyinan926 Dec 4, 2017
faa2849
Rebased on master and added a constant val for the Client class
liyinan926 Dec 4, 2017
347ed69
Addressed another major round of comments
liyinan926 Dec 5, 2017
0e8ca01
Addressed one more round of comments
liyinan926 Dec 5, 2017
3a0b8e3
Removed mentioning of kubernetes-namespace
liyinan926 Dec 6, 2017
83d0b9c
Fixed a couple of bugs found during manual tests
liyinan926 Dec 7, 2017
44c40b1
Guard against client mode in SparkContext
liyinan926 Dec 8, 2017
67bc847
Added libc6-compat into the base docker image
liyinan926 Dec 8, 2017
7d2b303
Addressed latest comments
liyinan926 Dec 8, 2017
caf2206
Addressed docs comments
liyinan926 Dec 9, 2017
2e7810b
Fixed a comment
liyinan926 Dec 11, 2017
cbcd30e
Addressed latest comments
liyinan926 Dec 11, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-22570][SQL] Avoid to create a lot of global variables by using…
… a local variable with allocation of an object in generated code

## What changes were proposed in this pull request?

This PR reduces # of global variables in generated code by replacing a global variable with a local variable with an allocation of an object every time. When a lot of global variables were generated, the generated code may meet 64K constant pool limit.
This PR reduces # of generated global variables in the following three operations:
* `Cast` with String to primitive byte/short/int/long
* `RegExpReplace`
* `CreateArray`

I intentionally leave [this part](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala#L595-L603). This is because this variable keeps a class that is dynamically generated. In other word, it is not possible to reuse one class.

## How was this patch tested?

Added test cases

Author: Kazuaki Ishizaki <[email protected]>

Closes apache#19797 from kiszk/SPARK-22570.
  • Loading branch information
kiszk authored and cloud-fan committed Nov 30, 2017
commit 999ec137a97844abbbd483dd98c7ded2f8ff356c
Original file line number Diff line number Diff line change
Expand Up @@ -799,16 +799,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String

private[this] def castToByteCode(from: DataType, ctx: CodegenContext): CastFunction = from match {
case StringType =>
val wrapper = ctx.freshName("wrapper")
ctx.addMutableState("UTF8String.IntWrapper", wrapper,
s"$wrapper = new UTF8String.IntWrapper();")
val wrapper = ctx.freshName("intWrapper")
(c, evPrim, evNull) =>
s"""
UTF8String.IntWrapper $wrapper = new UTF8String.IntWrapper();
if ($c.toByte($wrapper)) {
$evPrim = (byte) $wrapper.value;
} else {
$evNull = true;
}
$wrapper = null;
"""
case BooleanType =>
(c, evPrim, evNull) => s"$evPrim = $c ? (byte) 1 : (byte) 0;"
Expand All @@ -826,16 +826,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String
from: DataType,
ctx: CodegenContext): CastFunction = from match {
case StringType =>
val wrapper = ctx.freshName("wrapper")
ctx.addMutableState("UTF8String.IntWrapper", wrapper,
s"$wrapper = new UTF8String.IntWrapper();")
val wrapper = ctx.freshName("intWrapper")
(c, evPrim, evNull) =>
s"""
UTF8String.IntWrapper $wrapper = new UTF8String.IntWrapper();
if ($c.toShort($wrapper)) {
$evPrim = (short) $wrapper.value;
} else {
$evNull = true;
}
$wrapper = null;
"""
case BooleanType =>
(c, evPrim, evNull) => s"$evPrim = $c ? (short) 1 : (short) 0;"
Expand All @@ -851,16 +851,16 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String

private[this] def castToIntCode(from: DataType, ctx: CodegenContext): CastFunction = from match {
case StringType =>
val wrapper = ctx.freshName("wrapper")
ctx.addMutableState("UTF8String.IntWrapper", wrapper,
s"$wrapper = new UTF8String.IntWrapper();")
val wrapper = ctx.freshName("intWrapper")
(c, evPrim, evNull) =>
s"""
UTF8String.IntWrapper $wrapper = new UTF8String.IntWrapper();
if ($c.toInt($wrapper)) {
$evPrim = $wrapper.value;
} else {
$evNull = true;
}
$wrapper = null;
"""
case BooleanType =>
(c, evPrim, evNull) => s"$evPrim = $c ? 1 : 0;"
Expand All @@ -876,17 +876,17 @@ case class Cast(child: Expression, dataType: DataType, timeZoneId: Option[String

private[this] def castToLongCode(from: DataType, ctx: CodegenContext): CastFunction = from match {
case StringType =>
val wrapper = ctx.freshName("wrapper")
ctx.addMutableState("UTF8String.LongWrapper", wrapper,
s"$wrapper = new UTF8String.LongWrapper();")
val wrapper = ctx.freshName("longWrapper")

(c, evPrim, evNull) =>
s"""
UTF8String.LongWrapper $wrapper = new UTF8String.LongWrapper();
if ($c.toLong($wrapper)) {
$evPrim = $wrapper.value;
} else {
$evNull = true;
}
$wrapper = null;
"""
case BooleanType =>
(c, evPrim, evNull) => s"$evPrim = $c ? 1L : 0L;"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ case class CreateArray(children: Seq[Expression]) extends Expression {
val (preprocess, assigns, postprocess, arrayData) =
GenArrayData.genCodeToCreateArrayData(ctx, et, evals, false)
ev.copy(
code = preprocess + ctx.splitExpressions(assigns) + postprocess,
code = preprocess + assigns + postprocess,
value = arrayData,
isNull = "false")
}
Expand All @@ -77,24 +77,22 @@ private [sql] object GenArrayData {
*
* @param ctx a [[CodegenContext]]
* @param elementType data type of underlying array elements
* @param elementsCode a set of [[ExprCode]] for each element of an underlying array
* @param elementsCode concatenated set of [[ExprCode]] for each element of an underlying array
* @param isMapKey if true, throw an exception when the element is null
* @return (code pre-assignments, assignments to each array elements, code post-assignments,
* arrayData name)
* @return (code pre-assignments, concatenated assignments to each array elements,
* code post-assignments, arrayData name)
*/
def genCodeToCreateArrayData(
ctx: CodegenContext,
elementType: DataType,
elementsCode: Seq[ExprCode],
isMapKey: Boolean): (String, Seq[String], String, String) = {
val arrayName = ctx.freshName("array")
isMapKey: Boolean): (String, String, String, String) = {
val arrayDataName = ctx.freshName("arrayData")
val numElements = elementsCode.length

if (!ctx.isPrimitiveType(elementType)) {
val arrayName = ctx.freshName("arrayObject")
val genericArrayClass = classOf[GenericArrayData].getName
ctx.addMutableState("Object[]", arrayName,
s"$arrayName = new Object[$numElements];")

val assignments = elementsCode.zipWithIndex.map { case (eval, i) =>
val isNullAssignment = if (!isMapKey) {
Expand All @@ -110,17 +108,21 @@ private [sql] object GenArrayData {
}
"""
}
val assignmentString = ctx.splitExpressions(
expressions = assignments,
funcName = "apply",
extraArguments = ("Object[]", arrayDataName) :: Nil)

("",
assignments,
(s"Object[] $arrayName = new Object[$numElements];",
assignmentString,
s"final ArrayData $arrayDataName = new $genericArrayClass($arrayName);",
arrayDataName)
} else {
val arrayName = ctx.freshName("array")
val unsafeArraySizeInBytes =
UnsafeArrayData.calculateHeaderPortionInBytes(numElements) +
ByteArrayMethods.roundNumberOfBytesToNearestWord(elementType.defaultSize * numElements)
val baseOffset = Platform.BYTE_ARRAY_OFFSET
ctx.addMutableState("UnsafeArrayData", arrayDataName)

val primitiveValueTypeName = ctx.primitiveTypeName(elementType)
val assignments = elementsCode.zipWithIndex.map { case (eval, i) =>
Expand All @@ -137,14 +139,18 @@ private [sql] object GenArrayData {
}
"""
}
val assignmentString = ctx.splitExpressions(
expressions = assignments,
funcName = "apply",
extraArguments = ("UnsafeArrayData", arrayDataName) :: Nil)

(s"""
byte[] $arrayName = new byte[$unsafeArraySizeInBytes];
$arrayDataName = new UnsafeArrayData();
UnsafeArrayData $arrayDataName = new UnsafeArrayData();
Platform.putLong($arrayName, $baseOffset, $numElements);
$arrayDataName.pointTo($arrayName, $baseOffset, $unsafeArraySizeInBytes);
""",
assignments,
assignmentString,
"",
arrayDataName)
}
Expand Down Expand Up @@ -216,10 +222,10 @@ case class CreateMap(children: Seq[Expression]) extends Expression {
s"""
final boolean ${ev.isNull} = false;
$preprocessKeyData
${ctx.splitExpressions(assignKeys)}
$assignKeys
$postprocessKeyData
$preprocessValueData
${ctx.splitExpressions(assignValues)}
$assignValues
$postprocessValueData
final MapData ${ev.value} = new $mapClass($keyArrayData, $valueArrayData);
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,7 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio

val termLastReplacement = ctx.freshName("lastReplacement")
val termLastReplacementInUTF8 = ctx.freshName("lastReplacementInUTF8")

val termResult = ctx.freshName("result")
val termResult = ctx.freshName("termResult")

val classNamePattern = classOf[Pattern].getCanonicalName
val classNameStringBuffer = classOf[java.lang.StringBuffer].getCanonicalName
Expand All @@ -334,8 +333,6 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio
ctx.addMutableState("String", termLastReplacement, s"${termLastReplacement} = null;")
ctx.addMutableState("UTF8String",
termLastReplacementInUTF8, s"${termLastReplacementInUTF8} = null;")
ctx.addMutableState(classNameStringBuffer,
termResult, s"${termResult} = new $classNameStringBuffer();")

val setEvNotNull = if (nullable) {
s"${ev.isNull} = false;"
Expand All @@ -355,14 +352,15 @@ case class RegExpReplace(subject: Expression, regexp: Expression, rep: Expressio
${termLastReplacementInUTF8} = $rep.clone();
${termLastReplacement} = ${termLastReplacementInUTF8}.toString();
}
${termResult}.delete(0, ${termResult}.length());
$classNameStringBuffer ${termResult} = new $classNameStringBuffer();
java.util.regex.Matcher ${matcher} = ${termPattern}.matcher($subject.toString());

while (${matcher}.find()) {
${matcher}.appendReplacement(${termResult}, ${termLastReplacement});
}
${matcher}.appendTail(${termResult});
${ev.value} = UTF8String.fromString(${termResult}.toString());
${termResult} = null;
$setEvNotNull
"""
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.{Calendar, Locale, TimeZone}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.util.DateTimeTestUtils._
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.catalyst.util.DateTimeUtils.TimeZoneGMT
Expand Down Expand Up @@ -845,4 +846,11 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
val outputOuter = Row.fromSeq((1 to N).map(_ => outputInner))
checkEvaluation(cast(Literal.create(inputOuter, fromOuter), toOuter), outputOuter)
}

test("SPARK-22570: Cast should not create a lot of global variables") {
val ctx = new CodegenContext
cast("1", IntegerType).genCode(ctx)
cast("2", LongType).genCode(ctx)
assert(ctx.mutableStates.length == 0)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ package org.apache.spark.sql.catalyst.expressions
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.types.{IntegerType, StringType}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.types.StringType

/**
* Unit tests for regular expression (regexp) related SQL expressions.
Expand Down Expand Up @@ -178,6 +179,14 @@ class RegexpExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(nonNullExpr, "num-num", row1)
}

test("SPARK-22570: RegExpReplace should not create a lot of global variables") {
val ctx = new CodegenContext
RegExpReplace(Literal("100"), Literal("(\\d+)"), Literal("num")).genCode(ctx)
// four global variables (lastRegex, pattern, lastReplacement, and lastReplacementInUTF8)
// are always required
assert(ctx.mutableStates.length == 4)
}

test("RegexExtract") {
val row1 = create_row("100-200", "(\\d+)-(\\d+)", 1)
val row2 = create_row("100-200", "(\\d+)-(\\d+)", 2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -164,6 +165,12 @@ class ComplexTypesSuite extends PlanTest{
comparePlans(Optimizer execute query, expected)
}

test("SPARK-22570: CreateArray should not create a lot of global variables") {
val ctx = new CodegenContext
CreateArray(Seq(Literal(1))).genCode(ctx)
assert(ctx.mutableStates.length == 0)
}

test("simplify map ops") {
val rel = relation
.select(
Expand Down