Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ case class ParquetTableScan(
assert(normalOutput.size + partOutput.size == attributes.size,
s"$normalOutput + $partOutput != $attributes, ${relation.output}")

// Keep indexes of the attributes to fill output rows correctly
val attributesWithIndex = attributes.zipWithIndex

// Indexes of the non-partition columns
val normalOutputIndexes =
normalOutput
.flatMap(a => attributesWithIndex.find(aI => aI._1.exprId == a.exprId))
.map(_._2)

// Indexes of the partition columns
val partOutputIndexes =
partOutput
.flatMap(a => attributesWithIndex.find(aI => aI._1.exprId == a.exprId))
.map(_._2)

override def execute(): RDD[Row] = {
import parquet.filter2.compat.FilterCompat.FilterPredicateCompat

Expand Down Expand Up @@ -142,11 +157,23 @@ case class ParquetTableScan(
partOutput.map(a => Cast(Literal(partValues(a.name)), a.dataType).eval(EmptyRow))

new Iterator[Row] {
private[this] val joinedRow = new JoinedRow5(Row(partitionRowValues:_*), null)
private[this] val outputRow = new Array[Any](attributes.length)

// Fill outputRow with partitionRowValues at the correct indexes using partOutputIndexes
partitionRowValues
.zipWithIndex
.foreach(pI => outputRow(partOutputIndexes(pI._2)) = pI._1)

def hasNext = iter.hasNext

def next() = joinedRow.withRight(iter.next()._2)
def next() = {
// Fill outputRow with iter.next()._2 at the correct indexes using normalOutputIndexes
iter.next()._2
.zipWithIndex
.foreach(nI => outputRow(normalOutputIndexes(nI._2)) = nI._1)

new GenericRow(outputRow)
}
}
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ private[hive] trait HiveStrategies {
}

implicit class PhysicalPlanHacks(originalPlan: SparkPlan) {
def fakeOutput(newOutput: Seq[Attribute]) =
def fakeOutput(newOutput: Seq[Attribute]) = {
originalPlan.output.foreach(a =>
newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
.getOrElse(
sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}")))
OutputFaker(
originalPlan.output.map(a =>
newOutput.find(a.name.toLowerCase == _.name.toLowerCase)
.getOrElse(
sys.error(s"Can't find attribute $a to fake in set ${newOutput.mkString(",")}"))),
newOutput,
originalPlan)
}
}

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
Expand Down