From 500011092c0765ad20c8806f1ea500198eaf93d4 Mon Sep 17 00:00:00 2001 From: "rahul.aggarwal" Date: Thu, 1 Jan 2015 18:26:49 +0530 Subject: [PATCH 1/2] SPARK-5049: ParquetTableScan always prepends the values of partition columns in output rows irrespective of the order of the partition columns in the original SELECT query - forming a Generic row by inserting column values are correct indexes --- .../sql/parquet/ParquetTableOperations.scala | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index 96bace1769f71..ea8f0491c4214 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -77,6 +77,21 @@ case class ParquetTableScan( assert(normalOutput.size + partOutput.size == attributes.size, s"$normalOutput + $partOutput != $attributes, ${relation.output}") + // Keep indexes of the attributes to fill output rows correctly + val attributesWithIndex = attributes.zipWithIndex + + // Indexes of the non-partition columns + val normalOutputIndexes = + normalOutput + .flatMap(a => attributesWithIndex.find(aI => aI._1.exprId == a.exprId)) + .map(_._2) + + // Indexes of the partition columns + val partOutputIndexes = + partOutput + .flatMap(a => attributesWithIndex.find(aI => aI._1.exprId == a.exprId)) + .map(_._2) + override def execute(): RDD[Row] = { import parquet.filter2.compat.FilterCompat.FilterPredicateCompat @@ -142,11 +157,23 @@ case class ParquetTableScan( partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow)) new Iterator[Row] { - private[this] val joinedRow = new JoinedRow5(Row(partitionRowValues:_*), null) + private[this] val outputRow = new Array[Any](attributes.length) + + // Fill outputRow with partitionRowValues at the correct indexes using partOutputIndexes + partitionRowValues + .zipWithIndex + .foreach(pI => outputRow(partOutputIndexes(pI._2)) = pI._1) def hasNext = iter.hasNext - def next() = joinedRow.withRight(iter.next()._2) + def next() = { + // Fill outputRow with iter.next()._2 at the correct indexes using normalOutputIndexes + iter.next()._2 + .zipWithIndex + .foreach(nI => outputRow(normalOutputIndexes(nI._2)) = nI._1) + + new GenericRow(outputRow) + } } } } else { From 8253a7cfc4cdddee6001321b22e635eb04805bd0 Mon Sep 17 00:00:00 2001 From: "rahul.aggarwal" Date: Sun, 4 Jan 2015 17:05:30 +0530 Subject: [PATCH 2/2] SPARK-5049: ParquetTableScan always prepends the values of partition columns in output rows irrespective of the order of the partition columns in the original SELECT query - passing newOutput(correct sequence of attributes) in OutputFaker --- .../org/apache/spark/sql/hive/HiveStrategies.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index d3f6381b69a4d..a670ac225fdbd 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -71,13 +71,15 @@ private[hive] trait HiveStrategies { } implicit class PhysicalPlanHacks(originalPlan: SparkPlan) { - def fakeOutput(newOutput: Seq[Attribute]) = + def fakeOutput(newOutput: Seq[Attribute]) = { + originalPlan.output.foreach(a => + newOutput.find(a.name.toLowerCase == _.name.toLowerCase) + .getOrElse( + sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))) OutputFaker( - originalPlan.output.map(a => - newOutput.find(a.name.toLowerCase == _.name.toLowerCase) - .getOrElse( - sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))), + newOutput, originalPlan) + } } def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {