Skip to content
Prev Previous commit
Next Next commit
Fix the corner case for SMJ inner join
  • Loading branch information
Victsm committed Sep 28, 2019
commit c20b3b961693688deb5ef412bfa884fcccdd0741
Original file line number Diff line number Diff line change
Expand Up @@ -654,8 +654,10 @@ case class SortMergeJoinExec(
(evaluateVariables(leftVars), "")
}

// The last two line of code generate in processNext here will handle properly
// releasing the resources if the input iterators are not fully consumed
// The last two lines of code generated in processNext here will attempt to handle
// releasing the resources if the input iterators are not fully consumed. It only
// attempts to release the resources of an iterator if the associated child operator
// is codegened
s"""
|while (findNextInnerJoinRows($leftInput, $rightInput)) {
| ${leftVarDecl.mkString("\n")}
Expand All @@ -669,10 +671,16 @@ case class SortMergeJoinExec(
| }
| if (shouldStop()) return;
|}
|((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$leftInput)
| .getBufferedRowIterator().close();
|((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$rightInput)
| .getBufferedRowIterator().close();
|if ($leftInput instanceof
| org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator) {
| ((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$leftInput)
| .getBufferedRowIterator().close();
|}
|if ($rightInput instanceof
| org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator) {
| ((org.apache.spark.sql.execution.ScalaIteratorWithBufferedIterator)$rightInput)
| .getBufferedRowIterator().close();
|}
""".stripMargin
}
}
Expand Down