Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
output UnsafeRow from Hive
  • Loading branch information
Davies Liu committed Dec 29, 2015
commit cba393448c2d581bd62e31d3181a11e290a2a83d
Original file line number Diff line number Diff line change
Expand Up @@ -257,16 +257,16 @@ case class Window(
* @return the final resulting projection.
*/
private[this] def createResultProjection(
expressions: Seq[Expression]): MutableProjection = {
expressions: Seq[Expression]): UnsafeProjection = {
val references = expressions.zipWithIndex.map{ case (e, i) =>
// Results of window expressions will be on the right side of child's output
BoundReference(child.output.size + i, e.dataType, e.nullable)
}
val unboundToRefMap = expressions.zip(references).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
newMutableProjection(
UnsafeProjection.create(
projectList ++ patchedWindowExpression,
child.output)()
child.output)
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,11 +132,17 @@ case class HiveTableScan(
}
}

protected override def doExecute(): RDD[InternalRow] = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
protected override def doExecute(): RDD[InternalRow] = {
val rdd = if (!relation.hiveQlTable.isPartitioned) {
hadoopReader.makeRDDForTable(relation.hiveQlTable)
} else {
hadoopReader.makeRDDForPartitionedTable(
prunePartitions(relation.getHiveQlPartitions(partitionPruningPred)))
}
rdd.mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(schema)
iter.map(proj)
}
}

override def output: Seq[Attribute] = attributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,8 @@ case class ScriptTransformation(

child.execute().mapPartitions { iter =>
if (iter.hasNext) {
processIterator(iter)
val proj = UnsafeProjection.create(schema)
processIterator(iter).map(proj)
} else {
// If the input iterator has no rows then do not launch the external script.
Iterator.empty
Expand Down