Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Address all comments
  • Loading branch information
c21 committed May 12, 2021
commit 429edcc5b6ab11f600642fc61d5205da135ce083
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,8 @@ case class SortMergeJoinExec(
// The function has the following step:
// - Step 1: Find the next `streamedRow` with non-null join keys.
// For `streamedRow` with null join keys (`handleStreamedAnyNull`):
// 1. Inner join: skip the row.
// 1. Inner join: skip the row. `matches` will be cleared later when hitting the
// next `streamedRow` with non-null join keys.
// 2. Left/Right Outer join: clear the previous `matches` if needed, keep the row,
// and return false.
//
Expand Down Expand Up @@ -699,21 +700,21 @@ case class SortMergeJoinExec(
""".stripMargin

lazy val outerJoin = {
val foundMatch = ctx.freshName("foundMatch")
val hasOutputRow = ctx.freshName("hasOutputRow")
s"""
|while ($streamedInput.hasNext()) {
| findNextJoinRows($streamedInput, $bufferedInput);
| ${streamedVarDecl.mkString("\n")}
| ${beforeLoop.trim}
| scala.collection.Iterator<UnsafeRow> $iterator = $matches.generateIterator();
| boolean $foundMatch = false;
| boolean $hasOutputRow = false;
|
| // the last iteration of this loop is to emit an empty row if there is no matched rows.
| while ($iterator.hasNext() || !$foundMatch) {
| while ($iterator.hasNext() || !$hasOutputRow) {
| InternalRow $bufferedRow = $iterator.hasNext() ?
| (InternalRow) $iterator.next() : null;
| ${condCheck.trim}
| $foundMatch = true;
| $hasOutputRow = true;
| $numOutput.add(1);
| ${consume(ctx, resultVars)}
| }
Expand Down