Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
2f6c80d
Merge remote-tracking branch 'upstream/master' into outerJoinElimination
gatorsmile Dec 31, 2015
90576aa
outer join conversion
gatorsmile Jan 1, 2016
5adec63
[SPARK-10359][PROJECT-INFRA] Multiple fixes to dev/test-dependencies.…
JoshRosen Jan 1, 2016
192ab19
added test cases.
gatorsmile Jan 1, 2016
c9dbfcc
[SPARK-11743][SQL] Move the test for arrayOfUDT
viirya Jan 1, 2016
a59a357
[SPARK-3873][MLLIB] Import order fixes.
Jan 1, 2016
ad5b7cf
[SPARK-12409][SPARK-12387][SPARK-12391][SQL] Refactor filter pushdown…
viirya Jan 1, 2016
c04b53b
renaming
gatorsmile Jan 1, 2016
01a2986
[SPARK-12592][SQL][TEST] Don't mute Spark loggers in TestHive.reset()
liancheng Jan 1, 2016
6c20b3c
Disable test-dependencies.sh.
rxin Jan 1, 2016
0da7bd5
[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] always outp…
Jan 1, 2016
44ee920
Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] alw…
rxin Jan 2, 2016
970635a
[SPARK-12362][SQL][WIP] Inline Hive Parser
hvanhovell Jan 2, 2016
94f7a12
[SPARK-10180][SQL] JDBC datasource are not processing EqualNullSafe f…
HyukjinKwon Jan 2, 2016
15bd736
[SPARK-12481][CORE][STREAMING][SQL] Remove usage of Hadoop deprecated…
srowen Jan 2, 2016
65f9125
extend the condition to cover more cases in non null predicates.
gatorsmile Jan 3, 2016
513e3b0
[SPARK-12599][MLLIB][SQL] Remove the use of callUDF in MLlib
rxin Jan 3, 2016
6c5bbd6
Revert "Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][…
rxin Jan 3, 2016
9398644
added three more expressions: and, or and not
gatorsmile Jan 3, 2016
0bb07cb
style fix.
gatorsmile Jan 3, 2016
c5ff632
support non-local predicates and bug fix.
gatorsmile Jan 3, 2016
c3d5056
[SPARK-12327][SPARKR] fix code for lintr warning for commented code
felixcheung Jan 3, 2016
ee29dd2
scala style fix.
gatorsmile Jan 3, 2016
c82924d
[SPARK-12533][SQL] hiveContext.table() throws the wrong exception
thomastechs Jan 3, 2016
7b92922
Update MimaExcludes now Spark 1.6 is in Maven.
rxin Jan 4, 2016
b8410ff
[SPARK-12537][SQL] Add option to accept quoting of all character back…
Cazen Jan 4, 2016
13dab9c
[SPARK-12611][SQL][PYSPARK][TESTS] Fix test_infer_schema_to_local
holdenk Jan 4, 2016
7b7ea90
outer join conversion
gatorsmile Jan 1, 2016
7558e70
added test cases.
gatorsmile Jan 1, 2016
d3cbf46
renaming
gatorsmile Jan 1, 2016
2535cb1
extend the condition to cover more cases in non null predicates.
gatorsmile Jan 3, 2016
6c3f4b0
added three more expressions: and, or and not
gatorsmile Jan 3, 2016
fcd757c
style fix.
gatorsmile Jan 3, 2016
5bc7f52
support non-local predicates and bug fix.
gatorsmile Jan 3, 2016
34a0056
scala style fix.
gatorsmile Jan 3, 2016
ee7db1a
code refactoring and code merge
gatorsmile Jan 4, 2016
63d5d62
Merge remote-tracking branch 'origin/outerJoinConversion' into outerJ…
gatorsmile Jan 4, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "[SPARK-12286][SPARK-12290][SPARK-12294][SPARK-12284][SQL] alw…
…ays output UnsafeRow"

This reverts commit 0da7bd5.
  • Loading branch information
rxin committed Jan 2, 2016
commit 44ee920fd49d35b421ae562ea99bcc8f2b98ced6
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,8 @@ class SQLContext private[sql](
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches = Seq(
Batch("Add exchange", Once, EnsureRequirements(self))
Batch("Add exchange", Once, EnsureRequirements(self)),
Batch("Add row converters", Once, EnsureRowFormats)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.errors.attachTree
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.util.MutablePair
Expand All @@ -49,14 +50,26 @@ case class Exchange(
case None => ""
}

val simpleNodeName = "Exchange"
val simpleNodeName = if (tungstenMode) "TungstenExchange" else "Exchange"
s"$simpleNodeName$extraInfo"
}

/**
* Returns true iff we can support the data type, and we are not doing range partitioning.
*/
private lazy val tungstenMode: Boolean = !newPartitioning.isInstanceOf[RangePartitioning]

override def outputPartitioning: Partitioning = newPartitioning

override def output: Seq[Attribute] = child.output

// This setting is somewhat counterintuitive:
// If the schema works with UnsafeRow, then we tell the planner that we don't support safe row,
// so the planner inserts a converter to convert data into UnsafeRow if needed.
override def outputsUnsafeRows: Boolean = tungstenMode
override def canProcessSafeRows: Boolean = !tungstenMode
override def canProcessUnsafeRows: Boolean = tungstenMode

/**
* Determines whether records must be defensively copied before being sent to the shuffle.
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The
Expand Down Expand Up @@ -117,7 +130,15 @@ case class Exchange(
}
}

private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf

private val serializer: Serializer = {
if (tungstenMode) {
new UnsafeRowSerializer(child.output.size)
} else {
new SparkSqlSerializer(sparkConf)
}
}

override protected def doPrepare(): Unit = {
// If an ExchangeCoordinator is needed, we register this Exchange operator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, Attribute, AttributeSet, GenericMutableRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, GenericMutableRow}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Statistics}
import org.apache.spark.sql.sources.{BaseRelation, HadoopFsRelation}
import org.apache.spark.sql.types.DataType
Expand Down Expand Up @@ -99,19 +99,10 @@ private[sql] case class PhysicalRDD(
rdd: RDD[InternalRow],
override val nodeName: String,
override val metadata: Map[String, String] = Map.empty,
isUnsafeRow: Boolean = false)
override val outputsUnsafeRows: Boolean = false)
extends LeafNode {

protected override def doExecute(): RDD[InternalRow] = {
if (isUnsafeRow) {
rdd
} else {
rdd.mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(schema)
iter.map(proj)
}
}
}
protected override def doExecute(): RDD[InternalRow] = rdd

override def simpleString: String = {
val metadataEntries = for ((key, value) <- metadata.toSeq.sorted) yield s"$key: $value"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,20 @@ case class Expand(
// as UNKNOWN partitioning
override def outputPartitioning: Partitioning = UnknownPartitioning(0)

override def outputsUnsafeRows: Boolean = child.outputsUnsafeRows
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def references: AttributeSet =
AttributeSet(projections.flatten.flatMap(_.references))

private[this] val projection =
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
private[this] val projection = {
if (outputsUnsafeRows) {
(exprs: Seq[Expression]) => UnsafeProjection.create(exprs, child.output)
} else {
(exprs: Seq[Expression]) => newMutableProjection(exprs, child.output)()
}
}

protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") {
child.execute().mapPartitions { iter =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ case class Generate(
child.execute().mapPartitionsInternal { iter =>
val generatorNullRow = InternalRow.fromSeq(Seq.fill[Any](generator.elementTypes.size)(null))
val joinedRow = new JoinedRow
val proj = UnsafeProjection.create(output, output)

iter.flatMap { row =>
// we should always set the left (child output)
Expand All @@ -78,14 +77,13 @@ case class Generate(
} ++ LazyIterator(() => boundGenerator.terminate()).map { row =>
// we leave the left side as the last element of its child output
// keep it the same as Hive does
proj(joinedRow.withRight(row))
joinedRow.withRight(row)
}
}
} else {
child.execute().mapPartitionsInternal { iter =>
val proj = UnsafeProjection.create(output, output)
(iter.flatMap(row => boundGenerator.eval(row)) ++
LazyIterator(() => boundGenerator.terminate())).map(proj)
iter.flatMap(row => boundGenerator.eval(row)) ++
LazyIterator(() => boundGenerator.terminate())
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.catalyst.expressions.Attribute


/**
Expand All @@ -29,20 +29,15 @@ private[sql] case class LocalTableScan(
output: Seq[Attribute],
rows: Seq[InternalRow]) extends LeafNode {

private val unsafeRows: Array[InternalRow] = {
val proj = UnsafeProjection.create(output, output)
rows.map(r => proj(r).copy()).toArray
}

private lazy val rdd = sqlContext.sparkContext.parallelize(unsafeRows)
private lazy val rdd = sqlContext.sparkContext.parallelize(rows)

protected override def doExecute(): RDD[InternalRow] = rdd

override def executeCollect(): Array[InternalRow] = {
unsafeRows
rows.toArray
}

override def executeTake(limit: Int): Array[InternalRow] = {
unsafeRows.take(limit)
rows.take(limit).toArray
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ case class Sort(
testSpillFrequency: Int = 0)
extends UnaryNode {

override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = false

override def output: Seq[Attribute] = child.output

override def outputOrdering: Seq[SortOrder] = sortOrder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,36 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
/** Specifies sort order for each partition requirements on the input data for this operator. */
def requiredChildOrdering: Seq[Seq[SortOrder]] = Seq.fill(children.size)(Nil)

/** Specifies whether this operator outputs UnsafeRows */
def outputsUnsafeRows: Boolean = false

/** Specifies whether this operator is capable of processing UnsafeRows */
def canProcessUnsafeRows: Boolean = false

/**
* Specifies whether this operator is capable of processing Java-object-based Rows (i.e. rows
* that are not UnsafeRows).
*/
def canProcessSafeRows: Boolean = true

/**
* Returns the result of this query as an RDD[InternalRow] by delegating to doExecute
* after adding query plan information to created RDDs for visualization.
* Concrete implementations of SparkPlan should override doExecute instead.
*/
final def execute(): RDD[InternalRow] = {
if (children.nonEmpty) {
val hasUnsafeInputs = children.exists(_.outputsUnsafeRows)
val hasSafeInputs = children.exists(!_.outputsUnsafeRows)
assert(!(hasSafeInputs && hasUnsafeInputs),
"Child operators should output rows in the same format")
assert(canProcessSafeRows || canProcessUnsafeRows,
"Operator must be able to process at least one row format")
assert(!hasSafeInputs || canProcessSafeRows,
"Operator will receive safe rows as input but cannot process safe rows")
assert(!hasUnsafeInputs || canProcessUnsafeRows,
"Operator will receive unsafe rows as input but cannot process unsafe rows")
}
RDDOperationScope.withScope(sparkContext, nodeName, false, true) {
prepare()
doExecute()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ case class Window(

override def outputOrdering: Seq[SortOrder] = child.outputOrdering

override def canProcessUnsafeRows: Boolean = true

/**
* Create a bound ordering object for a given frame type and offset. A bound ordering object is
* used to determine which input row lies within the frame boundaries of an output row.
Expand Down Expand Up @@ -257,16 +259,16 @@ case class Window(
* @return the final resulting projection.
*/
private[this] def createResultProjection(
expressions: Seq[Expression]): UnsafeProjection = {
expressions: Seq[Expression]): MutableProjection = {
val references = expressions.zipWithIndex.map{ case (e, i) =>
// Results of window expressions will be on the right side of child's output
BoundReference(child.output.size + i, e.dataType, e.nullable)
}
val unboundToRefMap = expressions.zip(references).toMap
val patchedWindowExpression = windowExpression.map(_.transform(unboundToRefMap))
UnsafeProjection.create(
newMutableProjection(
projectList ++ patchedWindowExpression,
child.output)
child.output)()
}

protected override def doExecute(): RDD[InternalRow] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ case class SortBasedAggregate(
"numInputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of input rows"),
"numOutputRows" -> SQLMetrics.createLongMetric(sparkContext, "number of output rows"))

override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = false
override def canProcessSafeRows: Boolean = true

override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

override def requiredChildDistribution: List[Distribution] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ class SortBasedAggregationIterator(
// The aggregation buffer used by the sort-based aggregation.
private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer

// An SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
// compared to MutableRow (aggregation buffer) directly.
private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType))

protected def initialize(): Unit = {
if (inputIterator.hasNext) {
initializeBuffer(sortBasedAggregationBuffer)
Expand All @@ -114,7 +110,7 @@ class SortBasedAggregationIterator(
// We create a variable to track if we see the next group.
var findNextPartition = false
// firstRowInNextGroup is the first row of this group. We first process it.
processRow(sortBasedAggregationBuffer, safeProj(firstRowInNextGroup))
processRow(sortBasedAggregationBuffer, firstRowInNextGroup)

// The search will stop when we see the next group or there is no
// input row left in the iter.
Expand All @@ -126,7 +122,7 @@ class SortBasedAggregationIterator(

// Check if the current row belongs the current input row.
if (currentGroupingKey == groupingKey) {
processRow(sortBasedAggregationBuffer, safeProj(currentRow))
processRow(sortBasedAggregationBuffer, currentRow)
} else {
// We find a new group.
findNextPartition = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ case class TungstenAggregate(
"dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size"),
"spillSize" -> SQLMetrics.createSizeMetric(sparkContext, "spill size"))

override def outputsUnsafeRows: Boolean = true
override def canProcessUnsafeRows: Boolean = true
override def canProcessSafeRows: Boolean = true

override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute)

override def producedAttributes: AttributeSet =
Expand Down
Loading