diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index 7997e79003b12..75387fac64ed8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -356,10 +356,10 @@ package object dsl { def subquery(alias: Symbol): LogicalPlan = SubqueryAlias(alias.name, logicalPlan) - def except(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan = + def except(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan = Except(logicalPlan, otherPlan, isAll) - def intersect(otherPlan: LogicalPlan, isAll: Boolean = false): LogicalPlan = + def intersect(otherPlan: LogicalPlan, isAll: Boolean): LogicalPlan = Intersect(logicalPlan, otherPlan, isAll) def union(otherPlan: LogicalPlan): LogicalPlan = Union(logicalPlan, otherPlan) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 9906a30b488b8..732d762335f1e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -534,15 +534,15 @@ class AstBuilder(conf: SQLConf) extends SqlBaseBaseVisitor[AnyRef] with Logging case SqlBaseParser.INTERSECT if all => Intersect(left, right, isAll = true) case SqlBaseParser.INTERSECT => - Intersect(left, right) + Intersect(left, right, isAll = false) case SqlBaseParser.EXCEPT if all => Except(left, right, isAll = true) case SqlBaseParser.EXCEPT => - Except(left, right) + Except(left, right, isAll = false) case SqlBaseParser.SETMINUS if all => Except(left, right, isAll = true) case SqlBaseParser.SETMINUS => - Except(left, right) + Except(left, right, isAll = false) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index d7dbdb39a9afb..9d18ce5c7b80f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -167,7 +167,7 @@ object SetOperation { case class Intersect( left: LogicalPlan, right: LogicalPlan, - isAll: Boolean = false) extends SetOperation(left, right) { + isAll: Boolean) extends SetOperation(left, right) { override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" ) @@ -191,7 +191,7 @@ case class Intersect( case class Except( left: LogicalPlan, right: LogicalPlan, - isAll: Boolean = false) extends SetOperation(left, right) { + isAll: Boolean) extends SetOperation(left, right) { override def nodeName: String = getClass.getSimpleName + ( if ( isAll ) "All" else "" ) /** We don't use right.output because those rows get excluded from the set. */ override def output: Seq[Attribute] = left.output diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index ae8d77bbbf9a8..0a5194a287ecc 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -277,13 +277,13 @@ class AnalysisErrorSuite extends AnalysisTest { errorTest( "intersect with unequal number of columns", - testRelation.intersect(testRelation2), + testRelation.intersect(testRelation2, isAll = false), "intersect" :: "number of columns" :: testRelation2.output.length.toString :: testRelation.output.length.toString :: Nil) errorTest( "except with unequal number of columns", - testRelation.except(testRelation2), + testRelation.except(testRelation2, isAll = false), "except" :: "number of columns" :: testRelation2.output.length.toString :: testRelation.output.length.toString :: Nil) @@ -299,22 +299,22 @@ class AnalysisErrorSuite extends AnalysisTest { errorTest( "intersect with incompatible column types", - testRelation.intersect(nestedRelation), + testRelation.intersect(nestedRelation, isAll = false), "intersect" :: "the compatible column types" :: Nil) errorTest( "intersect with a incompatible column type and compatible column types", - testRelation3.intersect(testRelation4), + testRelation3.intersect(testRelation4, isAll = false), "intersect" :: "the compatible column types" :: "map" :: "decimal" :: Nil) errorTest( "except with incompatible column types", - testRelation.except(nestedRelation), + testRelation.except(nestedRelation, isAll = false), "except" :: "the compatible column types" :: Nil) errorTest( "except with a incompatible column type and compatible column types", - testRelation3.except(testRelation4), + testRelation3.except(testRelation4, isAll = false), "except" :: "the compatible column types" :: "map" :: "decimal" :: Nil) errorTest( diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 9fb50a5e565e0..f956e7640898b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -271,7 +271,7 @@ class AnalysisSuite extends AnalysisTest with Matchers { } test("self intersect should resolve duplicate expression IDs") { - val plan = testRelation.intersect(testRelation) + val plan = testRelation.intersect(testRelation, isAll = false) assertAnalysisSuccess(plan) } @@ -437,8 +437,8 @@ class AnalysisSuite extends AnalysisTest with Matchers { val unionPlan = Union(firstTable, secondTable) assertAnalysisSuccess(unionPlan) - val r1 = Except(firstTable, secondTable) - val r2 = Intersect(firstTable, secondTable) + val r1 = Except(firstTable, secondTable, isAll = false) + val r2 = Intersect(firstTable, secondTable, isAll = false) assertAnalysisSuccess(r1) assertAnalysisSuccess(r2) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala index 4161f09c63190..d71bbb3227134 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala @@ -1223,8 +1223,10 @@ class TypeCoercionSuite extends AnalysisTest { val expectedTypes = Seq(StringType, DecimalType.SYSTEM_DEFAULT, FloatType, DoubleType) - val r1 = widenSetOperationTypes(Except(firstTable, secondTable)).asInstanceOf[Except] - val r2 = widenSetOperationTypes(Intersect(firstTable, secondTable)).asInstanceOf[Intersect] + val r1 = widenSetOperationTypes( + Except(firstTable, secondTable, isAll = false)).asInstanceOf[Except] + val r2 = widenSetOperationTypes( + Intersect(firstTable, secondTable, isAll = false)).asInstanceOf[Intersect] checkOutput(r1.left, expectedTypes) checkOutput(r1.right, expectedTypes) checkOutput(r2.left, expectedTypes) @@ -1289,8 +1291,10 @@ class TypeCoercionSuite extends AnalysisTest { val expectedType1 = Seq(DecimalType(10, 8)) val r1 = widenSetOperationTypes(Union(left1, right1)).asInstanceOf[Union] - val r2 = widenSetOperationTypes(Except(left1, right1)).asInstanceOf[Except] - val r3 = widenSetOperationTypes(Intersect(left1, right1)).asInstanceOf[Intersect] + val r2 = widenSetOperationTypes( + Except(left1, right1, isAll = false)).asInstanceOf[Except] + val r3 = widenSetOperationTypes( + Intersect(left1, right1, isAll = false)).asInstanceOf[Intersect] checkOutput(r1.children.head, expectedType1) checkOutput(r1.children.last, expectedType1) @@ -1310,16 +1314,20 @@ class TypeCoercionSuite extends AnalysisTest { AttributeReference("r", rType)()) val r1 = widenSetOperationTypes(Union(plan1, plan2)).asInstanceOf[Union] - val r2 = widenSetOperationTypes(Except(plan1, plan2)).asInstanceOf[Except] - val r3 = widenSetOperationTypes(Intersect(plan1, plan2)).asInstanceOf[Intersect] + val r2 = widenSetOperationTypes( + Except(plan1, plan2, isAll = false)).asInstanceOf[Except] + val r3 = widenSetOperationTypes( + Intersect(plan1, plan2, isAll = false)).asInstanceOf[Intersect] checkOutput(r1.children.last, Seq(expectedType)) checkOutput(r2.right, Seq(expectedType)) checkOutput(r3.right, Seq(expectedType)) val r4 = widenSetOperationTypes(Union(plan2, plan1)).asInstanceOf[Union] - val r5 = widenSetOperationTypes(Except(plan2, plan1)).asInstanceOf[Except] - val r6 = widenSetOperationTypes(Intersect(plan2, plan1)).asInstanceOf[Intersect] + val r5 = widenSetOperationTypes( + Except(plan2, plan1, isAll = false)).asInstanceOf[Except] + val r6 = widenSetOperationTypes( + Intersect(plan2, plan1, isAll = false)).asInstanceOf[Intersect] checkOutput(r4.children.last, Seq(expectedType)) checkOutput(r5.left, Seq(expectedType)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index cb487c8893541..197d7c7668ef1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -575,14 +575,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite { // Except: *-stream not supported testBinaryOperationInStreamingPlan( "except", - _.except(_), + _.except(_, isAll = false), streamStreamSupported = false, batchStreamSupported = false) // Intersect: stream-stream not supported testBinaryOperationInStreamingPlan( "intersect", - _.intersect(_), + _.intersect(_, isAll = false), streamStreamSupported = false) // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala index f6db3c90ad96c..8d7c9bf220bc2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ColumnPruningSuite.scala @@ -180,10 +180,10 @@ class ColumnPruningSuite extends PlanTest { test("Column pruning on except/intersect/distinct") { val input = LocalRelation('a.int, 'b.string, 'c.double) - val query = Project('a :: Nil, Except(input, input)).analyze + val query = Project('a :: Nil, Except(input, input, isAll = false)).analyze comparePlans(Optimize.execute(query), query) - val query2 = Project('a :: Nil, Intersect(input, input)).analyze + val query2 = Project('a :: Nil, Intersect(input, input, isAll = false)).analyze comparePlans(Optimize.execute(query2), query2) val query3 = Project('a :: Nil, Distinct(input)).analyze comparePlans(Optimize.execute(query3), query3) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala index 52dc2e9fb076c..3b1b2d588ef67 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala @@ -42,7 +42,7 @@ class ReplaceOperatorSuite extends PlanTest { val table1 = LocalRelation('a.int, 'b.int) val table2 = LocalRelation('c.int, 'd.int) - val query = Intersect(table1, table2) + val query = Intersect(table1, table2, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -60,7 +60,7 @@ class ReplaceOperatorSuite extends PlanTest { val table2 = Filter(attributeB === 2, Filter(attributeA === 1, table1)) val table3 = Filter(attributeB < 1, Filter(attributeA >= 2, table1)) - val query = Except(table2, table3) + val query = Except(table2, table3, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -79,7 +79,7 @@ class ReplaceOperatorSuite extends PlanTest { val table1 = LocalRelation.fromExternalRows(Seq(attributeA, attributeB), data = Seq(Row(1, 2))) val table2 = Filter(attributeB < 1, Filter(attributeA >= 2, table1)) - val query = Except(table1, table2) + val query = Except(table1, table2, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -99,7 +99,7 @@ class ReplaceOperatorSuite extends PlanTest { val table3 = Project(Seq(attributeA, attributeB), Filter(attributeB < 1, Filter(attributeA >= 2, table1))) - val query = Except(table2, table3) + val query = Except(table2, table3, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -120,7 +120,7 @@ class ReplaceOperatorSuite extends PlanTest { val table3 = Project(Seq(attributeA, attributeB), Filter(attributeB < 1, Filter(attributeA >= 2, table1))) - val query = Except(table2, table3) + val query = Except(table2, table3, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -141,7 +141,7 @@ class ReplaceOperatorSuite extends PlanTest { Filter(attributeB < 1, Filter(attributeA >= 2, table1))) val table3 = Filter(attributeB === 2, Filter(attributeA === 1, table1)) - val query = Except(table2, table3) + val query = Except(table2, table3, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -158,7 +158,7 @@ class ReplaceOperatorSuite extends PlanTest { val table1 = LocalRelation('a.int, 'b.int) val table2 = LocalRelation('c.int, 'd.int) - val query = Except(table1, table2) + val query = Except(table1, table2, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = @@ -173,7 +173,7 @@ class ReplaceOperatorSuite extends PlanTest { val left = table.where('b < 1).select('a).as("left") val right = table.where('b < 3).select('a).as("right") - val query = Except(left, right) + val query = Except(left, right, isAll = false) val optimized = Optimize.execute(query.analyze) val correctAnswer = diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index d7200d0bff5d6..422bf97e30e7e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -65,14 +65,15 @@ class PlanParserSuite extends AnalysisTest { assertEqual("select * from a union select * from b", Distinct(a.union(b))) assertEqual("select * from a union distinct select * from b", Distinct(a.union(b))) assertEqual("select * from a union all select * from b", a.union(b)) - assertEqual("select * from a except select * from b", a.except(b)) - assertEqual("select * from a except distinct select * from b", a.except(b)) + assertEqual("select * from a except select * from b", a.except(b, isAll = false)) + assertEqual("select * from a except distinct select * from b", a.except(b, isAll = false)) assertEqual("select * from a except all select * from b", a.except(b, isAll = true)) - assertEqual("select * from a minus select * from b", a.except(b)) + assertEqual("select * from a minus select * from b", a.except(b, isAll = false)) assertEqual("select * from a minus all select * from b", a.except(b, isAll = true)) - assertEqual("select * from a minus distinct select * from b", a.except(b)) - assertEqual("select * from a intersect select * from b", a.intersect(b)) - assertEqual("select * from a intersect distinct select * from b", a.intersect(b)) + assertEqual("select * from a minus distinct select * from b", a.except(b, isAll = false)) + assertEqual("select * from a " + + "intersect select * from b", a.intersect(b, isAll = false)) + assertEqual("select * from a intersect distinct select * from b", a.intersect(b, isAll = false)) assertEqual("select * from a intersect all select * from b", a.intersect(b, isAll = true)) } @@ -735,18 +736,20 @@ class PlanParserSuite extends AnalysisTest { |SELECT * FROM d """.stripMargin - assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d))) + assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d, isAll = false), isAll = false)) assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true)) // Now disable precedence enforcement to verify the old behaviour. withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "true") { - assertEqual(query1, Distinct(a.union(b)).except(c).intersect(d)) + assertEqual(query1, + Distinct(a.union(b)).except(c, isAll = false).intersect(d, isAll = false)) assertEqual(query2, Distinct(a.union(b)).except(c, isAll = true).intersect(d, isAll = true)) } // Explicitly enable the precedence enforcement withSQLConf(SQLConf.LEGACY_SETOPS_PRECEDENCE_ENABLED.key -> "false") { - assertEqual(query1, Distinct(a.union(b)).except(c.intersect(d))) + assertEqual(query1, + Distinct(a.union(b)).except(c.intersect(d, isAll = false), isAll = false)) assertEqual(query2, Distinct(a.union(b)).except(c.intersect(d, isAll = true), isAll = true)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala index a37e06d922642..5ad748b6113d6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/plans/ConstraintPropagationSuite.scala @@ -187,7 +187,7 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest { verifyConstraints(tr1 .where('a.attr > 10) - .intersect(tr2.where('b.attr < 100)) + .intersect(tr2.where('b.attr < 100), isAll = false) .analyze.constraints, ExpressionSet(Seq(resolveColumn(tr1, "a") > 10, resolveColumn(tr1, "b") < 100, @@ -200,7 +200,7 @@ class ConstraintPropagationSuite extends SparkFunSuite with PlanTest { val tr2 = LocalRelation('a.int, 'b.int, 'c.int) verifyConstraints(tr1 .where('a.attr > 10) - .except(tr2.where('b.attr < 100)) + .except(tr2.where('b.attr < 100), isAll = false) .analyze.constraints, ExpressionSet(Seq(resolveColumn(tr1, "a") > 10, IsNotNull(resolveColumn(tr1, "a"))))) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 3b0a6d8840f1e..a4bf990ea9d6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -1927,7 +1927,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ def intersect(other: Dataset[T]): Dataset[T] = withSetOperator { - Intersect(logicalPlan, other.logicalPlan) + Intersect(logicalPlan, other.logicalPlan, isAll = false) } /** @@ -1958,7 +1958,7 @@ class Dataset[T] private[sql]( * @since 2.0.0 */ def except(other: Dataset[T]): Dataset[T] = withSetOperator { - Except(logicalPlan, other.logicalPlan) + Except(logicalPlan, other.logicalPlan, isAll = false) } /**