Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -356,10 +356,10 @@ package object dsl {

def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan)

def except(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan =
def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
Except(logicalPlan, otherPlan, isAll)

def intersect(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan =
def intersect(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan =
Intersect(logicalPlan, otherPlan, isAll)

def union(otherPlan: LogicalPlan): LogicalPlan = Union(logicalPlan, otherPlan)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,15 +534,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging
case SqlBaseParser.INTERSECT if all =>
Intersect(left, right, isAll = true)
case SqlBaseParser.INTERSECT =>
Intersect(left, right)
Intersect(left, right, isAll = false)
case SqlBaseParser.EXCEPT if all =>
Except(left, right, isAll = true)
case SqlBaseParser.EXCEPT =>
Except(left, right)
Except(left, right, isAll = false)
case SqlBaseParser.SETMINUS if all =>
Except(left, right, isAll = true)
case SqlBaseParser.SETMINUS =>
Except(left, right)
Except(left, right, isAll = false)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ object SetOperation {
case class Intersect(
left: LogicalPlan,
right: LogicalPlan,
isAll: Boolean = false) extends SetOperation(left, right) {
isAll: Boolean) extends SetOperation(left, right) {

override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" )

Expand All @@ -191,7 +191,7 @@ case class Intersect(
case class Except(
left: LogicalPlan,
right: LogicalPlan,
isAll: Boolean = false) extends SetOperation(left, right) {
isAll: Boolean) extends SetOperation(left, right) {
override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" )
/** We don't use right.output because those rows get excluded from the set. */
override def output: Seq[Attribute] = left.output
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,13 +277,13 @@ class AnalysisErrorSuite extends AnalysisTest {

errorTest(
"intersect with unequal number of columns",
testRelation.intersect(testRelation2),
testRelation.intersect(testRelation2, isAll = false),
"intersect" :: "number of columns" :: testRelation2.output.length.toString ::
testRelation.output.length.toString :: Nil)

errorTest(
"except with unequal number of columns",
testRelation.except(testRelation2),
testRelation.except(testRelation2, isAll = false),
"except" :: "number of columns" :: testRelation2.output.length.toString ::
testRelation.output.length.toString :: Nil)

Expand All @@ -299,22 +299,22 @@ class AnalysisErrorSuite extends AnalysisTest {

errorTest(
"intersect with incompatible column types",
testRelation.intersect(nestedRelation),
testRelation.intersect(nestedRelation, isAll = false),
"intersect" :: "the compatible column types" :: Nil)

errorTest(
"intersect with a incompatible column type and compatible column types",
testRelation3.intersect(testRelation4),
testRelation3.intersect(testRelation4, isAll = false),
"intersect" :: "the compatible column types" :: "map" :: "decimal" :: Nil)

errorTest(
"except with incompatible column types",
testRelation.except(nestedRelation),
testRelation.except(nestedRelation, isAll = false),
"except" :: "the compatible column types" :: Nil)

errorTest(
"except with a incompatible column type and compatible column types",
testRelation3.except(testRelation4),
testRelation3.except(testRelation4, isAll = false),
"except" :: "the compatible column types" :: "map" :: "decimal" :: Nil)

errorTest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ class AnalysisSuite extends AnalysisTest with Matchers {
}

test("self intersect should resolve duplicate expression IDs") {
val plan = testRelation.intersect(testRelation)
val plan = testRelation.intersect(testRelation, isAll = false)
assertAnalysisSuccess(plan)
}

Expand Down Expand Up @@ -437,8 +437,8 @@ class AnalysisSuite extends AnalysisTest with Matchers {
val unionPlan = Union(firstTable, secondTable)
assertAnalysisSuccess(unionPlan)

val r1 = Except(firstTable, secondTable)
val r2 = Intersect(firstTable, secondTable)
val r1 = Except(firstTable, secondTable, isAll = false)
val r2 = Intersect(firstTable, secondTable, isAll = false)

assertAnalysisSuccess(r1)
assertAnalysisSuccess(r2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,10 @@ class TypeCoercionSuite extends AnalysisTest {

val expectedTypes = Seq(StringType, DecimalType.SYSTEM_DEFAULT, FloatType, DoubleType)

val r1 = widenSetOperationTypes(Except(firstTable, secondTable)).asInstanceOf[Except]
val r2 = widenSetOperationTypes(Intersect(firstTable, secondTable)).asInstanceOf[Intersect]
val r1 = widenSetOperationTypes(
Except(firstTable, secondTable, isAll = false)).asInstanceOf[Except]
val r2 = widenSetOperationTypes(
Intersect(firstTable, secondTable, isAll = false)).asInstanceOf[Intersect]
checkOutput(r1.left, expectedTypes)
checkOutput(r1.right, expectedTypes)
checkOutput(r2.left, expectedTypes)
Expand Down Expand Up @@ -1289,8 +1291,10 @@ class TypeCoercionSuite extends AnalysisTest {
val expectedType1 = Seq(DecimalType(10, 8))

val r1 = widenSetOperationTypes(Union(left1, right1)).asInstanceOf[Union]
val r2 = widenSetOperationTypes(Except(left1, right1)).asInstanceOf[Except]
val r3 = widenSetOperationTypes(Intersect(left1, right1)).asInstanceOf[Intersect]
val r2 = widenSetOperationTypes(
Except(left1, right1, isAll = false)).asInstanceOf[Except]
val r3 = widenSetOperationTypes(
Intersect(left1, right1, isAll = false)).asInstanceOf[Intersect]

checkOutput(r1.children.head, expectedType1)
checkOutput(r1.children.last, expectedType1)
Expand All @@ -1310,16 +1314,20 @@ class TypeCoercionSuite extends AnalysisTest {
AttributeReference("r", rType)())

val r1 = widenSetOperationTypes(Union(plan1, plan2)).asInstanceOf[Union]
val r2 = widenSetOperationTypes(Except(plan1, plan2)).asInstanceOf[Except]
val r3 = widenSetOperationTypes(Intersect(plan1, plan2)).asInstanceOf[Intersect]
val r2 = widenSetOperationTypes(
Except(plan1, plan2, isAll = false)).asInstanceOf[Except]
val r3 = widenSetOperationTypes(
Intersect(plan1, plan2, isAll = false)).asInstanceOf[Intersect]

checkOutput(r1.children.last, Seq(expectedType))
checkOutput(r2.right, Seq(expectedType))
checkOutput(r3.right, Seq(expectedType))

val r4 = widenSetOperationTypes(Union(plan2, plan1)).asInstanceOf[Union]
val r5 = widenSetOperationTypes(Except(plan2, plan1)).asInstanceOf[Except]
val r6 = widenSetOperationTypes(Intersect(plan2, plan1)).asInstanceOf[Intersect]
val r5 = widenSetOperationTypes(
Except(plan2, plan1, isAll = false)).asInstanceOf[Except]
val r6 = widenSetOperationTypes(
Intersect(plan2, plan1, isAll = false)).asInstanceOf[Intersect]

checkOutput(r4.children.last, Seq(expectedType))
checkOutput(r5.left, Seq(expectedType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,14 +575,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
// Except: *-stream not supported
testBinaryOperationInStreamingPlan(
"except",
_.except(_),
_.except(_, isAll = false),
streamStreamSupported = false,
batchStreamSupported = false)

// Intersect: stream-stream not supported
testBinaryOperationInStreamingPlan(
"intersect",
_.intersect(_),
_.intersect(_, isAll = false),
streamStreamSupported = false)

// Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,10 @@ class ColumnPruningSuite extends PlanTest {

test("Column pruning on except/intersect/distinct") {
val input = LocalRelation('a.int, 'b.string, 'c.double)
val query = Project('a :: Nil, Except(input, input)).analyze
val query = Project('a :: Nil, Except(input, input, isAll = false)).analyze
comparePlans(Optimize.execute(query), query)

val query2 = Project('a :: Nil, Intersect(input, input)).analyze
val query2 = Project('a :: Nil, Intersect(input, input, isAll = false)).analyze
comparePlans(Optimize.execute(query2), query2)
val query3 = Project('a :: Nil, Distinct(input)).analyze
comparePlans(Optimize.execute(query3), query3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class ReplaceOperatorSuite extends PlanTest {
val table1 = LocalRelation('a.int, 'b.int)
val table2 = LocalRelation('c.int, 'd.int)

val query = Intersect(table1, table2)
val query = Intersect(table1, table2, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -60,7 +60,7 @@ class ReplaceOperatorSuite extends PlanTest {
val table2 = Filter(attributeB === 2, Filter(attributeA === 1, table1))
val table3 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))

val query = Except(table2, table3)
val query = Except(table2, table3, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -79,7 +79,7 @@ class ReplaceOperatorSuite extends PlanTest {
val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2)))
val table2 = Filter(attributeB < 1, Filter(attributeA >= 2, table1))

val query = Except(table1, table2)
val query = Except(table1, table2, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -99,7 +99,7 @@ class ReplaceOperatorSuite extends PlanTest {
val table3 = Project(Seq(attributeA, attributeB),
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))

val query = Except(table2, table3)
val query = Except(table2, table3, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -120,7 +120,7 @@ class ReplaceOperatorSuite extends PlanTest {
val table3 = Project(Seq(attributeA, attributeB),
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))

val query = Except(table2, table3)
val query = Except(table2, table3, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -141,7 +141,7 @@ class ReplaceOperatorSuite extends PlanTest {
Filter(attributeB < 1, Filter(attributeA >= 2, table1)))
val table3 = Filter(attributeB === 2, Filter(attributeA === 1, table1))

val query = Except(table2, table3)
val query = Except(table2, table3, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -158,7 +158,7 @@ class ReplaceOperatorSuite extends PlanTest {
val table1 = LocalRelation('a.int, 'b.int)
val table2 = LocalRelation('c.int, 'd.int)

val query = Except(table1, table2)
val query = Except(table1, table2, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand All @@ -173,7 +173,7 @@ class ReplaceOperatorSuite extends PlanTest {
val left = table.where('b < 1).select('a).as("left")
val right = table.where('b < 3).select('a).as("right")

val query = Except(left, right)
val query = Except(left, right, isAll = false)
val optimized = Optimize.execute(query.analyze)

val correctAnswer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,15 @@ class PlanParserSuite extends AnalysisTest {
assertEqual("select * from a union select * from b", Distinct(a.union(b)))
assertEqual("select * from a union distinct select * from b", Distinct(a.union(b)))
assertEqual("select * from a union all select * from b", a.union(b))
assertEqual("select * from a except select * from b", a.except(b))
assertEqual("select * from a except distinct select * from b", a.except(b))
assertEqual("select * from a except select * from b", a.except(b, isAll = false))
assertEqual("select * from a except distinct select * from b", a.except(b, isAll = false))
assertEqual("select * from a except all select * from b", a.except(b, isAll = true))
assertEqual("select * from a minus select * from b", a.except(b))
assertEqual("select * from a minus select * from b", a.except(b, isAll = false))
assertEqual("select * from a minus all select * from b", a.except(b, isAll = true))
assertEqual("select * from a minus distinct select * from b", a.except(b))
assertEqual("select * from a intersect select * from b", a.intersect(b))
assertEqual("select * from a intersect distinct select * from b", a.intersect(b))
assertEqual("select * from a minus distinct select * from b", a.except(b, isAll = false))
assertEqual("select * from a " +
"intersect select * from b", a.intersect(b, isAll = false))
assertEqual("select * from a intersect distinct select * from b", a.intersect(b, isAll = false))
assertEqual("select * from a intersect all select * from b", a.intersect(b, isAll = true))
}

Expand Down Expand Up @@ -735,18 +736,20 @@ class PlanParserSuite extends AnalysisTest {
|SELECT * FROM d
""".stripMargin

assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d)))
assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d, isAll = false), isAll = false))
assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true))

// Now disable precedence enforcement to verify the old behaviour.
withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "true") {
assertEqual(query1, Distinct(a.union(b)).except(c).intersect(d))
assertEqual(query1,
Distinct(a.union(b)).except(c, isAll = false).intersect(d, isAll = false))
assertEqual(query2, Distinct(a.union(b)).except(c, isAll = true).intersect(d, isAll = true))
}

// Explicitly enable the precedence enforcement
withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "false") {
assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d)))
assertEqual(query1,
Distinct(a.union(b)).except(c.intersect(d, isAll = false), isAll = false))
assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest {

verifyConstraints(tr1
.where('a.attr > 10)
.intersect(tr2.where('b.attr < 100))
.intersect(tr2.where('b.attr < 100), isAll = false)
.analyze.constraints,
ExpressionSet(Seq(resolveColumn(tr1, "a") > 10,
resolveColumn(tr1, "b") < 100,
Expand All @@ -200,7 +200,7 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest {
val tr2 = LocalRelation('a.int, 'b.int, 'c.int)
verifyConstraints(tr1
.where('a.attr > 10)
.except(tr2.where('b.attr < 100))
.except(tr2.where('b.attr < 100), isAll = false)
.analyze.constraints,
ExpressionSet(Seq(resolveColumn(tr1, "a") > 10,
IsNotNull(resolveColumn(tr1, "a")))))
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1927,7 +1927,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
def intersect(other: Dataset[T]): Dataset[T] = withSetOperator {
Intersect(logicalPlan, other.logicalPlan)
Intersect(logicalPlan, other.logicalPlan, isAll = false)
}

/**
Expand Down Expand Up @@ -1958,7 +1958,7 @@ class Dataset[T] private[sql](
* @since 2.0.0
*/
def except(other: Dataset[T]): Dataset[T] = withSetOperator {
Except(logicalPlan, other.logicalPlan)
Except(logicalPlan, other.logicalPlan, isAll = false)
}

/**
Expand Down