Skip to content
Prev Previous commit
Next Next commit
antiJoin fix from Herman
  • Loading branch information
gatorsmile committed Apr 27, 2016
commit 89fae2a6a6a819b8d65938915dd2fff577c8bb22
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,8 @@ case class BroadcastHashJoinExec(
*/
private def getJoinCondition(
ctx: CodegenContext,
input: Seq[ExprCode]): (String, String, Seq[ExprCode]) = {
input: Seq[ExprCode],
anti: Boolean = false): (String, String, Seq[ExprCode]) = {
val matched = ctx.freshName("matched")
val buildVars = genBuildSideVars(ctx, matched)
val checkCondition = if (condition.isDefined) {
Expand All @@ -172,11 +173,18 @@ case class BroadcastHashJoinExec(
ctx.currentVars = input ++ buildVars
val ev =
BindReferences.bindReference(expr, streamedPlan.output ++ buildPlan.output).genCode(ctx)
val skipRow = if (!anti) {
s"${ev.isNull} || !${ev.value}"
} else {
s"!${ev.isNull} && ${ev.value}"
}
s"""
|$eval
|${ev.code}
|if (${ev.isNull} || !${ev.value}) continue;
|if ($skipRow) continue;
""".stripMargin
} else if (anti) {
"continue;"
} else {
""
}
Expand Down Expand Up @@ -351,11 +359,12 @@ case class BroadcastHashJoinExec(
*/
private def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = {
val (broadcastRelation, relationTerm) = prepareBroadcast(ctx)
val uniqueKeyCodePath = broadcastRelation.value.keyIsUnique
val (keyEv, anyNull) = genStreamSideJoinKey(ctx, input)
val (matched, checkCondition, _) = getJoinCondition(ctx, input)
val (matched, checkCondition, _) = getJoinCondition(ctx, input, uniqueKeyCodePath)
val numOutput = metricTerm(ctx, "numOutputRows")

if (broadcastRelation.value.keyIsUnique) {
if (uniqueKeyCodePath) {
s"""
|// generate join key for stream side
|${keyEv.code}
Expand Down