Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

13 changes: 0 additions & 13 deletions core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,6 @@ private[spark] abstract class ZippedPartitionsBaseRDD[V: ClassTag](
super.clearDependencies()
rdds = null
}

/**
* Call the prepare method of every parent that has one.
* This is needed for reserving execution memory in advance.
*/
protected def tryPrepareParents(): Unit = {
rdds.collect {
case rdd: MapPartitionsWithPreparationRDD[_, _, _] => rdd.prepare()
}
}
}

private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag](
Expand All @@ -94,7 +84,6 @@ private[spark] class ZippedPartitionsRDD2[A: ClassTag, B: ClassTag, V: ClassTag]
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context), rdd2.iterator(partitions(1), context))
}
Expand All @@ -118,7 +107,6 @@ private[spark] class ZippedPartitionsRDD3
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
Expand Down Expand Up @@ -146,7 +134,6 @@ private[spark] class ZippedPartitionsRDD4
extends ZippedPartitionsBaseRDD[V](sc, List(rdd1, rdd2, rdd3, rdd4), preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[V] = {
tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdd1.iterator(partitions(0), context),
rdd2.iterator(partitions(1), context),
Expand Down

This file was deleted.

6 changes: 5 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,11 @@ object MimaExcludes {
"org.apache.spark.sql.SQLContext.createSession")
) ++ Seq(
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.SparkContext.preferredNodeLocationData_=")
"org.apache.spark.SparkContext.preferredNodeLocationData_="),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.rdd.MapPartitionsWithPreparationRDD"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.rdd.MapPartitionsWithPreparationRDD$")
)
case v if v.startsWith("1.5") =>
Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

import java.io.IOException;

import com.google.common.annotations.VisibleForTesting;

import org.apache.spark.SparkEnv;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.UnsafeProjection;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
Expand All @@ -31,7 +30,6 @@
import org.apache.spark.unsafe.Platform;
import org.apache.spark.unsafe.map.BytesToBytesMap;
import org.apache.spark.unsafe.memory.MemoryLocation;
import org.apache.spark.memory.TaskMemoryManager;

/**
* Unsafe-based HashMap for performing aggregations where the aggregated values are fixed-width.
Expand Down Expand Up @@ -218,11 +216,6 @@ public long getPeakMemoryUsedBytes() {
return map.getPeakMemoryUsedBytes();
}

@VisibleForTesting
public int getNumDataPages() {
return map.getNumDataPages();
}

/**
* Free the memory associated with this map. This is idempotent and can be called multiple times.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,14 @@

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.TaskContext
import org.apache.spark.rdd.{MapPartitionsWithPreparationRDD, RDD}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression2
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{UnsafeFixedWidthAggregationMap, UnaryNode, SparkPlan}
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.execution.{SparkPlan, UnaryNode, UnsafeFixedWidthAggregationMap}
import org.apache.spark.sql.types.StructType

case class TungstenAggregate(
Expand Down Expand Up @@ -84,59 +83,39 @@ case class TungstenAggregate(
val dataSize = longMetric("dataSize")
val spillSize = longMetric("spillSize")

/**
* Set up the underlying unsafe data structures used before computing the parent partition.
* This makes sure our iterator is not starved by other operators in the same task.
*/
def preparePartition(): TungstenAggregationIterator = {
new TungstenAggregationIterator(
groupingExpressions,
nonCompleteAggregateExpressions,
nonCompleteAggregateAttributes,
completeAggregateExpressions,
completeAggregateAttributes,
initialInputBufferOffset,
resultExpressions,
newMutableProjection,
child.output,
testFallbackStartsAt,
numInputRows,
numOutputRows,
dataSize,
spillSize)
}
child.execute().mapPartitions { iter =>

/** Compute a partition using the iterator already set up previously. */
def executePartition(
context: TaskContext,
partitionIndex: Int,
aggregationIterator: TungstenAggregationIterator,
parentIterator: Iterator[InternalRow]): Iterator[UnsafeRow] = {
val hasInput = parentIterator.hasNext
if (!hasInput) {
// We're not using the underlying map, so we just can free it here
aggregationIterator.free()
if (groupingExpressions.isEmpty) {
val hasInput = iter.hasNext
if (!hasInput && groupingExpressions.nonEmpty) {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
} else {
val aggregationIterator =
new TungstenAggregationIterator(
groupingExpressions,
nonCompleteAggregateExpressions,
nonCompleteAggregateAttributes,
completeAggregateExpressions,
completeAggregateAttributes,
initialInputBufferOffset,
resultExpressions,
newMutableProjection,
child.output,
iter,
testFallbackStartsAt,
numInputRows,
numOutputRows,
dataSize,
spillSize)
if (!hasInput && groupingExpressions.isEmpty) {
numOutputRows += 1
Iterator.single[UnsafeRow](aggregationIterator.outputForEmptyGroupingKeyWithoutInput())
} else {
// This is a grouped aggregate and the input iterator is empty,
// so return an empty iterator.
Iterator.empty
aggregationIterator
}
} else {
aggregationIterator.start(parentIterator)
aggregationIterator
}
}

// Note: we need to set up the iterator in each partition before computing the
// parent partition, so we cannot simply use `mapPartitions` here (SPARK-9747).
val resultRdd = {
new MapPartitionsWithPreparationRDD[UnsafeRow, InternalRow, TungstenAggregationIterator](
child.execute(), preparePartition, executePartition, preservesPartitioning = true)
}
resultRdd.asInstanceOf[RDD[InternalRow]]
}

override def simpleString: String = {
Expand Down
Loading