Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3d567a3
[MINOR][SQL] Avoid unnecessary invocation on checkAndGlobPathIfNecessary
Ngone51 Oct 22, 2019
484f93e
[SPARK-29530][SQL] Make SQLConf in SQL parse process thread safe
AngersZhuuuu Oct 22, 2019
467c3f6
[SPARK-29529][DOCS] Remove unnecessary orc version and hive version i…
denglingang Oct 22, 2019
811d563
[SPARK-29536][PYTHON] Upgrade cloudpickle to 1.1.1 to support Python 3.8
HyukjinKwon Oct 22, 2019
868d851
[SPARK-29232][ML] Update the parameter maps of the DecisionTreeRegres…
huaxingao Oct 22, 2019
3163b6b
[SPARK-29516][SQL][TEST] Test ThriftServerQueryTestSuite asynchronously
wangyum Oct 22, 2019
bb49c80
[SPARK-21492][SQL] Fix memory leak in SortMergeJoin
xuanyuanking Oct 22, 2019
b4844ee
[SPARK-29517][SQL] TRUNCATE TABLE should look up catalog/table like v…
viirya Oct 22, 2019
8779938
[SPARK-28787][DOC][SQL] Document LOAD DATA statement in SQL Reference
huaxingao Oct 22, 2019
c1c6485
[SPARK-28793][DOC][SQL] Document CREATE FUNCTION in SQL Reference
dilipbiswal Oct 22, 2019
2036a8c
[SPARK-29488][WEBUI] In Web UI, stage page has js error when sort table
jennyinspur Oct 22, 2019
8009468
[SPARK-29556][CORE] Avoid putting request path in error response in E…
srowen Oct 22, 2019
3bf5355
[SPARK-29539][SQL] SHOW PARTITIONS should look up catalog/table like …
huaxingao Oct 22, 2019
f23c5d7
[SPARK-29560][BUILD] Add typesafe bintray repo for sbt-mima-plugin
dongjoon-hyun Oct 22, 2019
e674909
[SPARK-29107][SQL][TESTS] Port window.sql (Part 1)
DylanGuedes Oct 23, 2019
c128ac5
[SPARK-29511][SQL] DataSourceV2: Support CREATE NAMESPACE
imback82 Oct 23, 2019
8c34690
[SPARK-29546][TESTS] Recover jersey-guava test dependency in docker-i…
dongjoon-hyun Oct 23, 2019
cbe6ead
[SPARK-29352][SQL][SS] Track active streaming queries in the SparkSes…
brkyvz Oct 23, 2019
70dd9c0
[SPARK-29542][SQL][DOC] Make the descriptions of spark.sql.files.* be…
turboFei Oct 23, 2019
0a70951
[SPARK-29499][CORE][PYSPARK] Add mapPartitionsWithIndex for RDDBarrier
ConeyLiu Oct 23, 2019
df00b5c
[SPARK-29569][BUILD][DOCS] Copy and paste minified jquery instead whe…
HyukjinKwon Oct 23, 2019
53a5f17
[SPARK-29513][SQL] REFRESH TABLE should look up catalog/table like v2…
imback82 Oct 23, 2019
bfbf282
[SPARK-29503][SQL] Remove conversion CreateNamedStruct to CreateNamed…
HeartSaVioR Oct 23, 2019
7e8e4c0
[SPARK-29552][SQL] Execute the "OptimizeLocalShuffleReader" rule when…
JkSelf Oct 23, 2019
5867707
[SPARK-29557][BUILD] Update dropwizard/codahale metrics library to 3.2.6
LucaCanali Oct 23, 2019
b91356e
[SPARK-29533][SQL][TESTS][FOLLOWUP] Regenerate the result on EC2
dongjoon-hyun Oct 23, 2019
7ecf968
[SPARK-29567][TESTS] Update JDBC Integration Test Docker Images
dongjoon-hyun Oct 23, 2019
fd899d6
[SPARK-29576][CORE] Use Spark's CompressionCodec for Ser/Deser of Map…
dbtsai Oct 24, 2019
55ced9c
[SPARK-29571][SQL][TESTS][FOLLOWUP] Fix UT in AllExecutionsPageSuite
07ARB Oct 24, 2019
177bf67
[SPARK-29522][SQL] CACHE TABLE should look up catalog/table like v2 c…
viirya Oct 24, 2019
9e77d48
[SPARK-21492][SQL][FOLLOW UP] Reimplement UnsafeExternalRowSorter in …
xuanyuanking Oct 24, 2019
1296bbb
[SPARK-29504][WEBUI] Toggle full job description on click
PavithraRamachandran Oct 24, 2019
67cf043
[SPARK-29145][SQL] Support sub-queries in join conditions
AngersZhuuuu Oct 24, 2019
1ec1b2b
[SPARK-28791][DOC] Documentation for Alter table Command
PavithraRamachandran Oct 24, 2019
76d4beb
[SPARK-29559][WEBUI] Support pagination for JDBC/ODBC Server page
shahidki31 Oct 24, 2019
a35fb4f
[SPARK-29578][TESTS] Add "8634" as another skipped day for Kwajalein …
srowen Oct 24, 2019
cdea520
[SPARK-29532][SQL] Simplify interval string parsing
cloud-fan Oct 24, 2019
dcf5eaf
[SPARK-29444][FOLLOWUP] add doc and python parameter for ignoreNullFi…
Oct 24, 2019
92b2529
[SPARK-21287][SQL] Remove requirement of fetch_size>=0 from JDBCOptions
fuwhu Oct 24, 2019
dec99d8
[SPARK-29526][SQL] UNCACHE TABLE should look up catalog/table like v2…
imback82 Oct 24, 2019
40df9d2
[SPARK-29227][SS] Track rule info in optimization phase
wenxuanguan Oct 25, 2019
7417c3e
[SPARK-29597][DOCS] Deprecate old Java 8 versions prior to 8u92
dongjoon-hyun Oct 25, 2019
1474ed0
[SPARK-29562][SQL] Speed up and slim down metric aggregation in SQL l…
Oct 25, 2019
091cbc3
[SPARK-9612][ML] Add instance weight support for GBTs
zhengruifeng Oct 25, 2019
cfbdd9d
[SPARK-29461][SQL] Measure the number of records being updated for JD…
HeartSaVioR Oct 25, 2019
8bd8f49
[SPARK-29500][SQL][SS] Support partition column when writing to Kafka
redsk Oct 25, 2019
0cf4f07
[SPARK-29545][SQL] Add support for bit_xor aggregate function
yaooqinn Oct 25, 2019
68dca9a
[SPARK-29527][SQL] SHOW CREATE TABLE should look up catalog/table lik…
viirya Oct 25, 2019
ae5b60d
[SPARK-29182][CORE][FOLLOWUP] Cache preferred locations of checkpoint…
viirya Oct 25, 2019
2baf7a1
[SPARK-29608][BUILD] Add `hadoop-3.2` profile to release build
dongjoon-hyun Oct 25, 2019
2549391
[SPARK-29580][TESTS] Add kerberos debug messages for Kafka secure tests
gaborgsomogyi Oct 25, 2019
5bdc58b
[SPARK-27653][SQL][FOLLOWUP] Fix `since` version of `min_by/max_by`
dongjoon-hyun Oct 26, 2019
9a46702
[SPARK-29554][SQL] Add `version` SQL function
yaooqinn Oct 26, 2019
2115bf6
[SPARK-29490][SQL] Reset 'WritableColumnVector' in 'RowToColumnarExec'
marin-ma Oct 26, 2019
077fb99
[SPARK-29589][WEBUI] Support pagination for sqlstats session table in…
shahidki31 Oct 26, 2019
74514b4
[SPARK-29614][SQL][TEST] Fix failures of DateTimeUtilsSuite and Times…
MaxGekk Oct 27, 2019
a43b966
[SPARK-29613][BUILD][SS] Upgrade to Kafka 2.3.1
dongjoon-hyun Oct 27, 2019
b19fd48
[SPARK-29093][PYTHON][ML] Remove automatically generated param setter…
huaxingao Oct 28, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-21492][SQL] Fix memory leak in SortMergeJoin
### What changes were proposed in this pull request?
We shall have a new mechanism that the downstream operators may notify its parents that they may release the output data stream. In this PR, we implement the mechanism as below:
- Add function named `cleanupResources` in SparkPlan, which default call children's `cleanupResources` function, the operator which need a resource cleanup should rewrite this with the self cleanup and also call `super.cleanupResources`, like SortExec in this PR.
- Add logic support on the trigger side, in this PR is SortMergeJoinExec, which make sure and call the `cleanupResources` to do the cleanup job for all its upstream(children) operator.

### Why are the changes needed?
Bugfix for SortMergeJoin memory leak, and implement a general framework for SparkPlan resource cleanup.

### Does this PR introduce any user-facing change?
No.

### How was this patch tested?
UT: Add new test suite JoinWithResourceCleanSuite to check both standard and code generation scenario.

Integrate Test: Test with driver/executor default memory set 1g, local mode 10 thread. The below test(thanks taosaildrone for providing this test  [here](apache#23762 (comment))) will pass with this PR.

```
from pyspark.sql.functions import rand, col

spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", -1)
# spark.conf.set("spark.sql.sortMergeJoinExec.eagerCleanupResources", "true")

r1 = spark.range(1, 1001).select(col("id").alias("timestamp1"))
r1 = r1.withColumn('value', rand())
r2 = spark.range(1000, 1001).select(col("id").alias("timestamp2"))
r2 = r2.withColumn('value2', rand())
joined = r1.join(r2, r1.timestamp1 == r2.timestamp2, "inner")
joined = joined.coalesce(1)
joined.explain()
joined.show()
```

Closes apache#26164 from xuanyuanking/SPARK-21492.

Authored-by: Yuanjian Li <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
  • Loading branch information
xuanyuanking authored and cloud-fan committed Oct 22, 2019
commit bb49c80c890452dc047a1975b16dcd876705ad23
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,12 @@ public final class UnsafeExternalRowSorter {
private final UnsafeExternalRowSorter.PrefixComputer prefixComputer;
private final UnsafeExternalSorter sorter;

// This flag makes sure the cleanupResource() has been called. After the cleanup work,
// iterator.next should always return false. Downstream operator triggers the resource
// cleanup while they found there's no need to keep the iterator any more.
// See more details in SPARK-21492.
private boolean isReleased = false;

public abstract static class PrefixComputer {

public static class Prefix {
Expand Down Expand Up @@ -157,7 +163,8 @@ public long getSortTimeNanos() {
return sorter.getSortTimeNanos();
}

private void cleanupResources() {
public void cleanupResources() {
isReleased = true;
sorter.cleanupResources();
}

Expand All @@ -176,7 +183,7 @@ public Iterator<UnsafeRow> sort() throws IOException {

@Override
public boolean hasNext() {
return sortedIterator.hasNext();
return !isReleased && sortedIterator.hasNext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,14 @@ case class SortExec(
"peakMemory" -> SQLMetrics.createSizeMetric(sparkContext, "peak memory"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))

private[sql] var rowSorter: UnsafeExternalRowSorter = _

/**
* This method gets invoked only once for each SortExec instance to initialize an
* UnsafeExternalRowSorter, both `plan.execute` and code generation are using it.
* In the code generation code path, we need to call this function outside the class so we
* should make it public.
*/
def createSorter(): UnsafeExternalRowSorter = {
val ordering = newOrdering(sortOrder, output)

Expand All @@ -87,13 +95,13 @@ case class SortExec(
}

val pageSize = SparkEnv.get.memoryManager.pageSizeBytes
val sorter = UnsafeExternalRowSorter.create(
rowSorter = UnsafeExternalRowSorter.create(
schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort)

if (testSpillFrequency > 0) {
sorter.setTestSpillFrequency(testSpillFrequency)
rowSorter.setTestSpillFrequency(testSpillFrequency)
}
sorter
rowSorter
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down Expand Up @@ -181,4 +189,17 @@ case class SortExec(
|$sorterVariable.insertRow((UnsafeRow)${row.value});
""".stripMargin
}

/**
* In SortExec, we overwrites cleanupResources to close UnsafeExternalRowSorter.
*/
override protected[sql] def cleanupResources(): Unit = {
if (rowSorter != null) {
// There's possible for rowSorter is null here, for example, in the scenario of empty
// iterator in the current task, the downstream physical node(like SortMergeJoinExec) will
// trigger cleanupResources before rowSorter initialized in createSorter.
rowSorter.cleanupResources()
}
super.cleanupResources()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,15 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
}
newOrdering(order, Seq.empty)
}

/**
* Cleans up the resources used by the physical operator (if any). In general, all the resources
* should be cleaned up when the task finishes but operators like SortMergeJoinExec and LimitExec
* may want eager cleanup to free up tight resources (e.g., memory).
*/
protected[sql] def cleanupResources(): Unit = {
children.foreach(_.cleanupResources())
}
}

trait LeafExecNode extends SparkPlan {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ case class SortMergeJoinExec(
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
spillThreshold,
cleanupResources
)
private[this] val joinRow = new JoinedRow

Expand Down Expand Up @@ -235,7 +236,8 @@ case class SortMergeJoinExec(
streamedIter = RowIterator.fromScala(leftIter),
bufferedIter = RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
spillThreshold,
cleanupResources
)
val rightNullRow = new GenericInternalRow(right.output.length)
new LeftOuterIterator(
Expand All @@ -249,7 +251,8 @@ case class SortMergeJoinExec(
streamedIter = RowIterator.fromScala(rightIter),
bufferedIter = RowIterator.fromScala(leftIter),
inMemoryThreshold,
spillThreshold
spillThreshold,
cleanupResources
)
val leftNullRow = new GenericInternalRow(left.output.length)
new RightOuterIterator(
Expand Down Expand Up @@ -283,7 +286,8 @@ case class SortMergeJoinExec(
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
spillThreshold,
cleanupResources
)
private[this] val joinRow = new JoinedRow

Expand Down Expand Up @@ -318,7 +322,8 @@ case class SortMergeJoinExec(
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
spillThreshold,
cleanupResources
)
private[this] val joinRow = new JoinedRow

Expand Down Expand Up @@ -360,7 +365,8 @@ case class SortMergeJoinExec(
RowIterator.fromScala(leftIter),
RowIterator.fromScala(rightIter),
inMemoryThreshold,
spillThreshold
spillThreshold,
cleanupResources
)
private[this] val joinRow = new JoinedRow

Expand Down Expand Up @@ -640,6 +646,9 @@ case class SortMergeJoinExec(
(evaluateVariables(leftVars), "")
}

val thisPlan = ctx.addReferenceObj("plan", this)
val eagerCleanup = s"$thisPlan.cleanupResources();"

s"""
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
| ${leftVarDecl.mkString("\n")}
Expand All @@ -653,6 +662,7 @@ case class SortMergeJoinExec(
| }
| if (shouldStop()) return;
|}
|$eagerCleanup
""".stripMargin
}
}
Expand All @@ -678,6 +688,7 @@ case class SortMergeJoinExec(
* @param inMemoryThreshold Threshold for number of rows guaranteed to be held in memory by
* internal buffer
* @param spillThreshold Threshold for number of rows to be spilled by internal buffer
* @param eagerCleanupResources the eager cleanup function to be invoked when no join row found
*/
private[joins] class SortMergeJoinScanner(
streamedKeyGenerator: Projection,
Expand All @@ -686,7 +697,8 @@ private[joins] class SortMergeJoinScanner(
streamedIter: RowIterator,
bufferedIter: RowIterator,
inMemoryThreshold: Int,
spillThreshold: Int) {
spillThreshold: Int,
eagerCleanupResources: () => Unit) {
private[this] var streamedRow: InternalRow = _
private[this] var streamedRowKey: InternalRow = _
private[this] var bufferedRow: InternalRow = _
Expand All @@ -710,7 +722,8 @@ private[joins] class SortMergeJoinScanner(
def getBufferedMatches: ExternalAppendOnlyUnsafeRowArray = bufferedMatches

/**
* Advances both input iterators, stopping when we have found rows with matching join keys.
* Advances both input iterators, stopping when we have found rows with matching join keys. If no
* join rows found, try to do the eager resources cleanup.
* @return true if matching rows have been found and false otherwise. If this returns true, then
* [[getStreamedRow]] and [[getBufferedMatches]] can be called to construct the join
* results.
Expand All @@ -720,7 +733,7 @@ private[joins] class SortMergeJoinScanner(
// Advance the streamed side of the join until we find the next row whose join key contains
// no nulls or we hit the end of the streamed iterator.
}
if (streamedRow == null) {
val found = if (streamedRow == null) {
// We have consumed the entire streamed iterator, so there can be no more matches.
matchJoinKey = null
bufferedMatches.clear()
Expand Down Expand Up @@ -760,17 +773,19 @@ private[joins] class SortMergeJoinScanner(
true
}
}
if (!found) eagerCleanupResources()
found
}

/**
* Advances the streamed input iterator and buffers all rows from the buffered input that
* have matching keys.
* have matching keys. If no join rows found, try to do the eager resources cleanup.
* @return true if the streamed iterator returned a row, false otherwise. If this returns true,
* then [[getStreamedRow]] and [[getBufferedMatches]] can be called to produce the outer
* join results.
*/
final def findNextOuterJoinRows(): Boolean = {
if (!advancedStreamed()) {
val found = if (!advancedStreamed()) {
// We have consumed the entire streamed iterator, so there can be no more matches.
matchJoinKey = null
bufferedMatches.clear()
Expand Down Expand Up @@ -800,6 +815,8 @@ private[joins] class SortMergeJoinScanner(
// If there is a streamed input then we always return true
true
}
if (!found) eagerCleanupResources()
found
}

// --- Private methods --------------------------------------------------------------------------
Expand Down
33 changes: 32 additions & 1 deletion sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@ import java.util.Locale
import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

import org.mockito.Mockito._

import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.{Ascending, SortOrder}
import org.apache.spark.sql.catalyst.plans.logical.Filter
import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, SortExec}
import org.apache.spark.sql.execution.{BinaryExecNode, FilterExec, SortExec, SparkPlan}
import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.execution.python.BatchEvalPythonExec
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -37,6 +39,23 @@ import org.apache.spark.sql.types.StructType
class JoinSuite extends QueryTest with SharedSparkSession {
import testImplicits._

private def attachCleanupResourceChecker(plan: SparkPlan): Unit = {
// SPARK-21492: Check cleanupResources are finally triggered in SortExec node for every
// test case
plan.foreachUp {
case s: SortExec =>
val sortExec = spy(s)
verify(sortExec, atLeastOnce).cleanupResources()
verify(sortExec.rowSorter, atLeastOnce).cleanupResources()
case _ =>
}
}

override protected def checkAnswer(df: => DataFrame, rows: Seq[Row]): Unit = {
attachCleanupResourceChecker(df.queryExecution.sparkPlan)
super.checkAnswer(df, rows)
}

setupTestData()

def statisticSizeInByte(df: DataFrame): BigInt = {
Expand Down Expand Up @@ -1039,4 +1058,16 @@ class JoinSuite extends QueryTest with SharedSparkSession {

checkAnswer(df, Row(1, 2, 1, 2) :: Nil)
}

test("SPARK-21492: cleanupResource without code generation") {
withSQLConf(
SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> "false",
SQLConf.SHUFFLE_PARTITIONS.key -> "1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df1 = spark.range(0, 10, 1, 2)
val df2 = spark.range(10).select($"id".as("b1"), (- $"id").as("b2"))
val res = df1.join(df2, $"id" === $"b1" && $"id" === $"b2").select($"b1", $"b2", $"id")
checkAnswer(res, Row(0, 0, 0))
}
}
}