Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
01e4cdf
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 13, 2015
6835704
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
9180687
Merge remote-tracking branch 'upstream/master'
gatorsmile Nov 14, 2015
b38a21e
SPARK-11633
gatorsmile Nov 17, 2015
d2b84af
Merge remote-tracking branch 'upstream/master' into joinMakeCopy
gatorsmile Nov 17, 2015
fda8025
Merge remote-tracking branch 'upstream/master'
gatorspark Nov 17, 2015
ac0dccd
Merge branch 'master' of https://github.com/gatorsmile/spark
gatorspark Nov 17, 2015
6e0018b
Merge remote-tracking branch 'upstream/master'
Nov 20, 2015
0546772
converge
gatorsmile Nov 20, 2015
b37a64f
converge
gatorsmile Nov 20, 2015
c2a872c
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
ab6dbd7
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
4276356
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 6, 2016
0bd1771
replace Intersect with Left-semi Join
gatorsmile Jan 7, 2016
7bd102b
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 7, 2016
bfa99c5
address comments.
gatorsmile Jan 7, 2016
cd23b03
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 7, 2016
100174a
clean code.
gatorsmile Jan 7, 2016
9aad1cf
clean code.
gatorsmile Jan 7, 2016
6742984
address comments.
gatorsmile Jan 7, 2016
e4c34f0
added one more case for duplicate values
gatorsmile Jan 7, 2016
2dab708
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 7, 2016
9864b3f
added an exception in conversion from logical to physical operators.
gatorsmile Jan 8, 2016
24cea7d
Add DISTINCT and test cases.
gatorsmile Jan 8, 2016
27192be
test case updates.
gatorsmile Jan 8, 2016
a932cdb
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJoin
gatorsmile Jan 8, 2016
04a26bd
resolve the ambiguous attributes
gatorsmile Jan 8, 2016
0458770
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 8, 2016
6a52e2b
Merge branch 'IntersectBySemiJoin' into IntersectBySemiJoinMerged
gatorsmile Jan 8, 2016
f820c61
resolve comments.
gatorsmile Jan 9, 2016
4372170
style change.
gatorsmile Jan 9, 2016
1debdfa
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 9, 2016
763706d
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 14, 2016
4de6ec1
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 18, 2016
9422a4f
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 19, 2016
52bdf48
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 20, 2016
1e95df3
Merge remote-tracking branch 'upstream/master'
gatorsmile Jan 23, 2016
d59b37b
Merge branch 'IntersectBySemiJoinMerged' into IntersectBySemiJoinMerg…
gatorsmile Jan 23, 2016
6a7979d
fix failed test cases
gatorsmile Jan 23, 2016
fd87585
Merge remote-tracking branch 'upstream/master' into IntersectBySemiJo…
gatorsmile Jan 27, 2016
e566d79
address comments.
gatorsmile Jan 27, 2016
3be78c4
address comments.
gatorsmile Jan 28, 2016
e51de8f
fixed the failed cases.
gatorsmile Jan 28, 2016
b600089
addressed comments.
gatorsmile Jan 29, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed the failed cases.
  • Loading branch information
gatorsmile committed Jan 28, 2016
commit e51de8f98b25281b338ca57f0f13645b626c7673
Original file line number Diff line number Diff line change
Expand Up @@ -445,14 +445,11 @@ class Analyzer(
.map(_.asInstanceOf[NamedExpression])
a.copy(aggregateExpressions = expanded)

// To resolve duplicate expression IDs for all the BinaryNode
case b: BinaryNode if !b.duplicateResolved => b match {
case j @ Join(left, right, _, _) =>
j.copy(right = dedupRight(left, right))
case i @ Intersect(left, right) =>
i.copy(right = dedupRight(left, right))
case other => other
}
// To resolve duplicate expression IDs for Join and Intersect
case j @ Join(left, right, _, _) if !j.duplicateResolved =>
j.copy(right = dedupRight(left, right))
case i @ Intersect(left, right) if !i.duplicateResolved =>
i.copy(right = dedupRight(left, right))

// When resolve `SortOrder`s in Sort based on child, don't report errors as
// we still have chance to resolve it based on grandchild
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ trait CheckAnalysis {
s"""Only a single table generating function is allowed in a SELECT clause, found:
| ${exprs.map(_.prettyString).mkString(",")}""".stripMargin)

case j: BinaryNode if !j.duplicateResolved =>
case j: Join if !j.duplicateResolved =>
val conflictingAttributes = j.left.outputSet.intersect(j.right.outputSet)
failAnalysis(
s"""
Expand All @@ -224,6 +224,16 @@ trait CheckAnalysis {
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)

case i: Intersect if !i.duplicateResolved =>
val conflictingAttributes = i.left.outputSet.intersect(i.right.outputSet)
failAnalysis(
s"""
|Failure when resolving conflicting references
|in operator ${operator.simpleString}:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, we could say Intersect directly

|$plan
|Conflicting attributes: ${conflictingAttributes.mkString(",")}
|""".stripMargin)

case o if !o.resolved =>
failAnalysis(
s"unresolved operator ${operator.simpleString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,10 @@ object ReplaceDistinctWithAggregate extends Rule[LogicalPlan] {
* ==> SELECT DISTINCT a1, a2 FROM Tab1 LEFT SEMI JOIN Tab2 ON a1<=>b1 AND a2<=>b2
* }}}
*
* This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL.
* Note:
* 1. This rule is only applicable to INTERSECT DISTINCT. Do not use it for INTERSECT ALL.
* 2. This rule has to be done after de-duplicating the attributes; otherwise, the generated
* join conditions will be incorrect.
*/
object ReplaceIntersectWithSemiJoin extends Rule[LogicalPlan] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to add a comment at here to mention that this rewrite is just for INTERSECT (i.e. INTERSECT DISTINCT) and is not applicable to INTERSECT ALL.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, will do it. Actually, I will also implement INTERSECT ALL after this one. Thanks!

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use transformUp?

cc @yhuai

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually nvm.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,6 @@ abstract class BinaryNode extends LogicalPlan {

override def children: Seq[LogicalPlan] = Seq(left, right)

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

override lazy val resolved: Boolean =
expressions.forall(_.resolved) && childrenResolved && duplicateResolved
expressions.forall(_.resolved) && childrenResolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why override this?

}
Original file line number Diff line number Diff line change
Expand Up @@ -91,30 +91,38 @@ case class Filter(condition: Expression, child: LogicalPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
}

abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode {

final override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved
}
abstract class SetOperation(left: LogicalPlan, right: LogicalPlan) extends BinaryNode

private[sql] object SetOperation {
def unapply(p: SetOperation): Option[(LogicalPlan, LogicalPlan)] = Some((p.left, p.right))
}

case class Intersect(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

override def output: Seq[Attribute] =
left.output.zip(right.output).map { case (leftAttr, rightAttr) =>
leftAttr.withNullability(leftAttr.nullable && rightAttr.nullable)
}

// Intersect are only resolved if they don't introduce ambiguous expression ids,
// since the Optimizer will convert Intersect to Join.
override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType } &&
duplicateResolved
}

case class Except(left: LogicalPlan, right: LogicalPlan) extends SetOperation(left, right) {
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output

override lazy val resolved: Boolean =
childrenResolved &&
left.output.length == right.output.length &&
left.output.zip(right.output).forall { case (l, r) => l.dataType == r.dataType }
}

/** Factory for constructing new `Union` nodes. */
Expand Down Expand Up @@ -172,6 +180,8 @@ case class Join(
}
}

def duplicateResolved: Boolean = left.outputSet.intersect(right.outputSet).isEmpty

// Joins are only resolved if they don't introduce ambiguous expression ids.
override lazy val resolved: Boolean = {
childrenResolved &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,16 +421,13 @@ class HiveTypeCoercionSuite extends PlanTest {
assert(r2.left.isInstanceOf[Project])
assert(r2.right.isInstanceOf[Project])

// Even if we are doing self Except, we still add Project. The node Except will not be marked
// as analyzed unless their exprId are de-duplicated. Thus, the func resolveOperators called in
// WidenSetOperationTypes does not skip and return the node before applying the rule.
val r3 = wt(Except(firstTable, firstTable)).asInstanceOf[Except]
checkOutput(r3.left, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType))
checkOutput(r3.right, Seq(IntegerType, DecimalType.SYSTEM_DEFAULT, ByteType, DoubleType))

// Check if a Project is added
assert(r3.left.isInstanceOf[Project])
assert(r3.right.isInstanceOf[Project])
// Check if no Project is added
assert(r3.left.isInstanceOf[LocalRelation])
assert(r3.right.isInstanceOf[LocalRelation])
}

test("WidenSetOperationTypes for union") {
Expand Down