From 3201f0a4a6ee9421dbe50e6e3195a73d35d3ef20 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 16 Aug 2017 11:49:17 +0900 Subject: [PATCH 1/4] Keep a deterministic output order in Attribute.toSeq --- .../catalyst/expressions/AttributeSet.scala | 7 ++++- .../expressions/AttributeSetSuite.scala | 30 +++++++++++++++++++ .../sql/hive/execution/PruningSuite.scala | 3 +- 3 files changed, 38 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala index b77f93373e78..f643120b65fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala @@ -121,7 +121,12 @@ class AttributeSet private (val baseSet: Set[AttributeEquals]) // We must force toSeq to not be strict otherwise we end up with a [[Stream]] that captures all // sorts of things in its closure. - override def toSeq: Seq[Attribute] = baseSet.map(_.a).toArray.toSeq + override def toSeq: Seq[Attribute] = { + // We need to keep a deterministic output order for `baseSet` because this affects a variable + // order in generated code (e.g., `GenerateColumnAccessor`). + // See SPARK-18394 for details. + baseSet.map(_.a).toArray.sortBy { a => (a.name, a.exprId.id) } + } override def toString: String = "{" + baseSet.map(_.a).mkString(", ") + "}" diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala index 273f95f91ee5..578bb817de98 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala @@ -78,4 +78,34 @@ class AttributeSetSuite extends SparkFunSuite { assert(aSet == aSet) assert(aSet == AttributeSet(aUpper :: Nil)) } + + test("SPARK-18394 keep a deterministic output order along with attribute names") { + val attrSeqA = { + val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(1098)) + val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(107)) + val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(838)) + val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil) + + val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(389)) + val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(89329)) + + val attrSetB = AttributeSet(attr4 :: attr5 :: Nil) + (attrSetA ++ attrSetB).toSeq.map(_.name) + } + + val attrSeqB = { + val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(392)) + val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(92)) + val attr3 = AttributeReference("c3", IntegerType)(exprId = ExprId(87)) + val attrSetA = AttributeSet(attr1 :: attr2 :: attr3 :: Nil) + + val attr4 = AttributeReference("c4", IntegerType)(exprId = ExprId(9023920)) + val attr5 = AttributeReference("c5", IntegerType)(exprId = ExprId(522)) + val attrSetB = AttributeSet(attr4 :: attr5 :: Nil) + + (attrSetA ++ attrSetB).toSeq.map(_.name) + } + + assert(attrSeqA === attrSeqB) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index d535bef4cc78..517b97b9df28 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -162,7 +162,8 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { }.head assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch") - assert(actualScannedColumns === expectedScannedColumns, "Scanned columns mismatch") + assert(actualScannedColumns.sorted === expectedScannedColumns.sorted, + "Scanned columns mismatch") val actualPartitions = actualPartValues.map(_.asScala.mkString(",")).sorted val expectedPartitions = expectedPartValues.map(_.mkString(",")).sorted From eba844e5fa5ee1f4faa59770112a61e8f5493830 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Wed, 16 Aug 2017 22:25:33 +0900 Subject: [PATCH 2/4] toArray to toSeq --- .../apache/spark/sql/catalyst/expressions/AttributeSet.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala index f643120b65fd..7420b6b57d8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala @@ -125,7 +125,7 @@ class AttributeSet private (val baseSet: Set[AttributeEquals]) // We need to keep a deterministic output order for `baseSet` because this affects a variable // order in generated code (e.g., `GenerateColumnAccessor`). // See SPARK-18394 for details. - baseSet.map(_.a).toArray.sortBy { a => (a.name, a.exprId.id) } + baseSet.map(_.a).toSeq.sortBy { a => (a.name, a.exprId.id) } } override def toString: String = "{" + baseSet.map(_.a).mkString(", ") + "}" From b33fde86ecd6a0be5f4a55c408ab10e0ac44101a Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 17 Aug 2017 12:20:31 +0900 Subject: [PATCH 3/4] Apply review --- .../sql/catalyst/expressions/AttributeSetSuite.scala | 12 +++++++++++- .../spark/sql/hive/execution/PruningSuite.scala | 5 +++++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala index 578bb817de98..b6e8b667a240 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/AttributeSetSuite.scala @@ -79,7 +79,8 @@ class AttributeSetSuite extends SparkFunSuite { assert(aSet == AttributeSet(aUpper :: Nil)) } - test("SPARK-18394 keep a deterministic output order along with attribute names") { + test("SPARK-18394 keep a deterministic output order along with attribute names and exprIds") { + // Checks a simple case val attrSeqA = { val attr1 = AttributeReference("c1", IntegerType)(exprId = ExprId(1098)) val attr2 = AttributeReference("c2", IntegerType)(exprId = ExprId(107)) @@ -107,5 +108,14 @@ class AttributeSetSuite extends SparkFunSuite { } assert(attrSeqA === attrSeqB) + + // Checks the same column names having different exprIds + val attr1 = AttributeReference("c", IntegerType)(exprId = ExprId(1098)) + val attr2 = AttributeReference("c", IntegerType)(exprId = ExprId(107)) + val attrSetA = AttributeSet(attr1 :: attr2 :: Nil) + val attr3 = AttributeReference("c", IntegerType)(exprId = ExprId(389)) + val attrSetB = AttributeSet(attr3 :: Nil) + + assert((attrSetA ++ attrSetB).toSeq === attr2 :: attr3 :: attr1 :: Nil) } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index 517b97b9df28..edd3722ab5b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -162,6 +162,11 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { }.head assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch") + + // Scanned columns in `HiveTableScanExec` are generated by the `pruneFilterProject` method + // in `SparkPlanner` that internally uses `AttributeSet.toSeq`. + // Since we change an output order of `AttributeSet.toSeq` in SPARK-18394, + // we need to sort column names for a test below. assert(actualScannedColumns.sorted === expectedScannedColumns.sorted, "Scanned columns mismatch") From 973402bd822c05f8895405fbcaf918edbaad9d23 Mon Sep 17 00:00:00 2001 From: Takeshi Yamamuro Date: Thu, 17 Aug 2017 15:09:18 +0900 Subject: [PATCH 4/4] Update comments --- .../org/apache/spark/sql/hive/execution/PruningSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala index edd3722ab5b0..cc592cf6ca62 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruningSuite.scala @@ -164,9 +164,8 @@ class PruningSuite extends HiveComparisonTest with BeforeAndAfter { assert(actualOutputColumns === expectedOutputColumns, "Output columns mismatch") // Scanned columns in `HiveTableScanExec` are generated by the `pruneFilterProject` method - // in `SparkPlanner` that internally uses `AttributeSet.toSeq`. - // Since we change an output order of `AttributeSet.toSeq` in SPARK-18394, - // we need to sort column names for a test below. + // in `SparkPlanner`. This method internally uses `AttributeSet.toSeq`, in which + // the returned output columns are sorted by the names and expression ids. assert(actualScannedColumns.sorted === expectedScannedColumns.sorted, "Scanned columns mismatch")