Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ case class BroadcastHashJoinExec(
*/
private def getJoinCondition(
ctx: CodegenContext,
input: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
input: Seq[ExprCode],
anti: Boolean = false): (String, String, Seq[ExprCode]) = {
val matched = ctx.freshName("matched")
val buildVars = genBuildSideVars(ctx, matched)
val checkCondition = if (condition.isDefined) {
Expand All @@ -172,11 +173,18 @@ case class BroadcastHashJoinExec(
ctx.currentVars = input ++ buildVars
val ev =
BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx)
val skipRow = if (!anti) {
s"${ev.isNull} || !${ev.value}"
} else {
s"!${ev.isNull} && ${ev.value}"
}
s"""
|$eval
|${ev.code}
|if (${ev.isNull} || !${ev.value}) continue;
|if ($skipRow) continue;
""".stripMargin
} else if (anti) {
"continue;"
} else {
""
}
Expand Down Expand Up @@ -351,11 +359,12 @@ case class BroadcastHashJoinExec(
*/
private def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkCondition is also used by non-unique-key

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's correct, nwm

val numOutput = metricTerm(ctx, "numOutputRows")

if (broadcastRelation.value.keyIsUnique) {
if (uniqueKeyCodePath) {
s"""
|// generate join key for stream side
|${keyEv.code}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,12 +53,23 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
Row(6, null)
)), new StructType().add("c", IntegerType).add("d", DoubleType))

private lazy val condition = {
private lazy val rightUniqueKey = sqlContext.createDataFrame(
sparkContext.parallelize(Seq(
Row(2, 3.0),
Row(3, 2.0),
Row(4, 1.0),
Row(null, 5.0),
Row(6, null)
)), new StructType().add("c", IntegerType).add("d", DoubleType))

private lazy val singleConditionEQ = (left.col("a") === right.col("c")).expr

private lazy val composedConditionEQ = {
And((left.col("a") === right.col("c")).expr,
LessThan(left.col("b").expr, right.col("d").expr))
}

private lazy val conditionNEQ = {
private lazy val composedConditionNEQ = {
And((left.col("a") < right.col("c")).expr,
LessThan(left.col("b").expr, right.col("d").expr))
}
Expand Down Expand Up @@ -138,34 +149,67 @@ class ExistenceJoinSuite extends SparkPlanTest with SharedSQLContext {
}

testExistenceJoin(
"basic test for left semi join",
"test single condition (equal) for left semi join",
LeftSemi,
left,
right,
singleConditionEQ,
Seq(Row(2, 1.0), Row(2, 1.0), Row(3, 3.0), Row(6, null)))

testExistenceJoin(
"test composed condition (equal & non-equal) for left semi join",
LeftSemi,
left,
right,
condition,
composedConditionEQ,
Seq(Row(2, 1.0), Row(2, 1.0)))

testExistenceJoin(
"basic test for left semi non equal join",
"test composed condition (both non-equal) for left semi join",
LeftSemi,
left,
right,
conditionNEQ,
composedConditionNEQ,
Seq(Row(1, 2.0), Row(1, 2.0), Row(2, 1.0), Row(2, 1.0)))

testExistenceJoin(
"basic test for anti join",
"test single condition (equal) for left Anti join",
LeftAnti,
left,
right,
condition,
singleConditionEQ,
Seq(Row(1, 2.0), Row(1, 2.0), Row(null, null), Row(null, 5.0)))


testExistenceJoin(
"test single unique condition (equal) for left Anti join",
LeftAnti,
left,
right.select(right.col("c")).distinct(), /* Trigger BHJs unique key code path! */
singleConditionEQ,
Seq(Row(1, 2.0), Row(1, 2.0), Row(null, null), Row(null, 5.0)))

testExistenceJoin(
"test composed condition (equal & non-equal) test for anti join",
LeftAnti,
left,
right,
composedConditionEQ,
Seq(Row(1, 2.0), Row(1, 2.0), Row(3, 3.0), Row(6, null), Row(null, 5.0), Row(null, null)))

testExistenceJoin(
"basic test for anti non equal join",
"test composed condition (both non-equal) for anti join",
LeftAnti,
left,
right,
conditionNEQ,
composedConditionNEQ,
Seq(Row(3, 3.0), Row(6, null), Row(null, 5.0), Row(null, null)))

testExistenceJoin(
"test composed unique condition (both non-equal) for anti join",
LeftAnti,
left,
rightUniqueKey,
(left.col("a") === rightUniqueKey.col("c") && left.col("b") < rightUniqueKey.col("d")).expr,
Seq(Row(1, 2.0), Row(1, 2.0), Row(3, 3.0), Row(null, null), Row(null, 5.0), Row(6, null)))
}