Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Shwo aggregation attributes
  • Loading branch information
Eric5553 committed Feb 8, 2020
commit 70cb2df407d788c8dd730d5aa669039948d71192
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.aggregate

import org.apache.spark.sql.catalyst.expressions.NamedExpression
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.execution.{ExplainUtils, UnaryExecNode}

Expand All @@ -27,24 +27,21 @@ import org.apache.spark.sql.execution.{ExplainUtils, UnaryExecNode}
abstract class BaseAggregateExec extends UnaryExecNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we make it trait?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, changed to trait to make it consistent with other operators, e.g. HashJoin BaseLimitExec.

val groupingExpressions: Seq[NamedExpression]
val aggregateExpressions: Seq[AggregateExpression]
val aggregateAttributes: Seq[Attribute]
val resultExpressions: Seq[NamedExpression]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These can be def, then we don't need to add override val in the aggregate classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @HyukjinKwon Thanks for review, updated to def in dd0988a.


protected val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
}

override def verboseStringWithOperatorId(): String = {
val inputString = child.output.mkString("[", ", ", "]")
val keyString = groupingExpressions.mkString("[", ", ", "]")
val functionString = aggregateExpressions.mkString("[", ", ", "]")
val funcBufferAttrString = aggregateBufferAttributes.mkString("[", ", ", "]")
val aggregateAttributeString = aggregateAttributes.mkString("[", ", ", "]")
val resultString = resultExpressions.mkString("[", ", ", "]")
s"""
|(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)}
|Input: $inputString
|Keys: $keyString
|Functions: $functionString
|FuncBufferAttrs: $funcBufferAttrString
|Aggregate Attributes: $aggregateAttributeString
|Results: $resultString
""".stripMargin
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ case class HashAggregateExec(
child: SparkPlan)
extends BaseAggregateExec with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
}

require(HashAggregateExec.supportsAggregate(aggregateBufferAttributes))

override lazy val allAttributes: AttributeSeq =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ case class ObjectHashAggregateExec(
child: SparkPlan)
extends BaseAggregateExec with AliasAwareOutputPartitioning {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
}

override lazy val allAttributes: AttributeSeq =
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ case class SortAggregateExec(
child: SparkPlan)
extends BaseAggregateExec with AliasAwareOutputPartitioning {

private[this] val aggregateBufferAttributes = {
aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
}

override def producedAttributes: AttributeSet =
AttributeSet(aggregateAttributes) ++
AttributeSet(resultExpressions.diff(groupingExpressions).map(_.toAttribute)) ++
Expand Down
50 changes: 25 additions & 25 deletions sql/core/src/test/resources/sql-tests/results/explain.sql.out
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ Input : [key#x, val#x]
Input: [key#x, val#x]
Keys: [key#x]
Functions: [partial_max(val#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max#x]
Results: [key#x, max#x]

(6) Exchange
Expand All @@ -96,7 +96,7 @@ Input: [key#x, max#x]
Input: [key#x, max#x]
Keys: [key#x]
Functions: [max(val#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max(val#x)#x]
Results: [key#x, max(val#x)#x AS max(val)#x]

(8) Exchange
Expand Down Expand Up @@ -150,7 +150,7 @@ Input : [key#x, val#x]
Input: [key#x, val#x]
Keys: [key#x]
Functions: [partial_max(val#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max#x]
Results: [key#x, max#x]

(6) Exchange
Expand All @@ -160,7 +160,7 @@ Input: [key#x, max#x]
Input: [key#x, max#x]
Keys: [key#x]
Functions: [max(val#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max(val#x)#x]
Results: [key#x, max(val#x)#x AS max(val)#x, max(val#x)#x AS max(val#x)#x]

(8) Filter [codegen id : 2]
Expand Down Expand Up @@ -237,7 +237,7 @@ Input : [key#x, val#x]
Input: [key#x, val#x]
Keys: [key#x, val#x]
Functions: []
FuncBufferAttrs: []
Aggregate Attributes: []
Results: [key#x, val#x]

(11) Exchange
Expand All @@ -247,7 +247,7 @@ Input: [key#x, val#x]
Input: [key#x, val#x]
Keys: [key#x, val#x]
Functions: []
FuncBufferAttrs: []
Aggregate Attributes: []
Results: [key#x, val#x]


Expand Down Expand Up @@ -447,7 +447,7 @@ Input : [key#x, val#x]
Input: [key#x]
Keys: []
Functions: [partial_max(key#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max#x]
Results: [max#x]

(10) Exchange
Expand All @@ -457,7 +457,7 @@ Input: [max#x]
Input: [max#x]
Keys: []
Functions: [max(key#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max(key#x)#x]
Results: [max(key#x)#x AS max(key)#x]

Subquery:2 Hosting operator id = 7 Hosting Expression = Subquery scalar-subquery#x, [id=#x]
Expand Down Expand Up @@ -492,7 +492,7 @@ Input : [key#x, val#x]
Input: [key#x]
Keys: []
Functions: [partial_max(key#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max#x]
Results: [max#x]

(17) Exchange
Expand All @@ -502,7 +502,7 @@ Input: [max#x]
Input: [max#x]
Keys: []
Functions: [max(key#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max(key#x)#x]
Results: [max(key#x)#x AS max(key)#x]


Expand Down Expand Up @@ -573,7 +573,7 @@ Input : [key#x, val#x]
Input: [key#x]
Keys: []
Functions: [partial_max(key#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max#x]
Results: [max#x]

(9) Exchange
Expand All @@ -583,7 +583,7 @@ Input: [max#x]
Input: [max#x]
Keys: []
Functions: [max(key#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max(key#x)#x]
Results: [max(key#x)#x AS max(key)#x]

Subquery:2 Hosting operator id = 3 Hosting Expression = Subquery scalar-subquery#x, [id=#x]
Expand Down Expand Up @@ -618,7 +618,7 @@ Input : [key#x, val#x]
Input: [key#x]
Keys: []
Functions: [partial_avg(cast(key#x as bigint))]
FuncBufferAttrs: [sum#x, count#xL]
Aggregate Attributes: [sum#x, count#xL]
Results: [sum#x, count#xL]

(16) Exchange
Expand All @@ -628,7 +628,7 @@ Input: [sum#x, count#xL]
Input: [sum#x, count#xL]
Keys: []
Functions: [avg(cast(key#x as bigint))]
FuncBufferAttrs: [sum#x, count#xL]
Aggregate Attributes: [avg(cast(key#x as bigint))#x]
Results: [avg(cast(key#x as bigint))#x AS avg(key)#x]


Expand Down Expand Up @@ -681,7 +681,7 @@ Input: [key#x]
Input: [key#x]
Keys: []
Functions: [partial_avg(cast(key#x as bigint))]
FuncBufferAttrs: [sum#x, count#xL]
Aggregate Attributes: [sum#x, count#xL]
Results: [sum#x, count#xL]

(7) Exchange
Expand All @@ -691,7 +691,7 @@ Input: [sum#x, count#xL]
Input: [sum#x, count#xL]
Keys: []
Functions: [avg(cast(key#x as bigint))]
FuncBufferAttrs: [sum#x, count#xL]
Aggregate Attributes: [avg(cast(key#x as bigint))#x]
Results: [avg(cast(key#x as bigint))#x AS avg(key)#x]

Subquery:2 Hosting operator id = 3 Hosting Expression = ReusedSubquery Subquery scalar-subquery#x, [id=#x]
Expand Down Expand Up @@ -814,7 +814,7 @@ Input : [key#x, val#x]
Input: [key#x, val#x]
Keys: [key#x]
Functions: [partial_max(val#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max#x]
Results: [key#x, max#x]

(6) Exchange
Expand All @@ -824,7 +824,7 @@ Input: [key#x, max#x]
Input: [key#x, max#x]
Keys: [key#x]
Functions: [max(val#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max(val#x)#x]
Results: [key#x, max(val#x)#x AS max(val)#x]

(8) ReusedExchange [Reuses operator id: 6]
Expand All @@ -834,7 +834,7 @@ Output : ArrayBuffer(key#x, max#x)
Input: [key#x, max#x]
Keys: [key#x]
Functions: [max(val#x)]
FuncBufferAttrs: [max#x]
Aggregate Attributes: [max(val#x)#x]
Results: [key#x, max(val#x)#x AS max(val)#x]

(10) BroadcastExchange
Expand Down Expand Up @@ -900,7 +900,7 @@ Input: [key#x, val#x]
Input: [key#x, val#x]
Keys: []
Functions: [partial_count(val#x), partial_sum(cast(key#x as bigint)), partial_count(key#x) FILTER (WHERE (val#x > 1))]
FuncBufferAttrs: [count#xL, sum#xL, count#xL]
Aggregate Attributes: [count#xL, sum#xL, count#xL]
Results: [count#xL, sum#xL, count#xL]

(4) Exchange
Expand All @@ -910,7 +910,7 @@ Input: [count#xL, sum#xL, count#xL]
Input: [count#xL, sum#xL, count#xL]
Keys: []
Functions: [count(val#x), sum(cast(key#x as bigint)), count(key#x)]
FuncBufferAttrs: [count#xL, sum#xL, count#xL]
Aggregate Attributes: [count(val#x)#xL, sum(cast(key#x as bigint))#xL, count(key#x)#xL]
Results: [(count(val#x)#xL + sum(cast(key#x as bigint))#xL) AS TOTAL#xL, count(key#x)#xL AS count(key) FILTER (WHERE (val > 1))#xL]


Expand Down Expand Up @@ -943,7 +943,7 @@ Input: [key#x, val#x]
Input: [key#x, val#x]
Keys: [key#x]
Functions: [partial_collect_set(val#x, 0, 0)]
FuncBufferAttrs: [buf#x]
Aggregate Attributes: [buf#x]
Results: [key#x, buf#x]

(4) Exchange
Expand All @@ -953,7 +953,7 @@ Input: [key#x, buf#x]
Input: [key#x, buf#x]
Keys: [key#x]
Functions: [collect_set(val#x, 0, 0)]
FuncBufferAttrs: [buf#x]
Aggregate Attributes: [collect_set(val#x, 0, 0)#x]
Results: [key#x, sort_array(collect_set(val#x, 0, 0)#x, true)[0] AS sort_array(collect_set(val), true)[0]#x]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the attribute names are automatically generated, it is hard to tell it is a name or an expression. A few observations:

  • Using comma as the separator is not clear, especially commas are used inside the expressions too.
  • Show the column counts first? For example, Results [4]: ...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a separate PR if this is a general issue for all the other operator. We should make all of them consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for review! I'll follow up with another PR for these observations.

Copy link
Member

@maropu maropu Feb 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the idea to show column counts first looks nice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR #27509 has opened to address these potential readability improvements.



Expand Down Expand Up @@ -991,7 +991,7 @@ Input: [key#x, val#x]
Input: [key#x, val#x]
Keys: [key#x]
Functions: [partial_min(val#x)]
FuncBufferAttrs: [min#x]
Aggregate Attributes: [min#x]
Results: [key#x, min#x]

(5) Exchange
Expand All @@ -1004,7 +1004,7 @@ Input: [key#x, min#x]
Input: [key#x, min#x]
Keys: [key#x]
Functions: [min(val#x)]
FuncBufferAttrs: [min#x]
Aggregate Attributes: [min(val#x)#x]
Results: [key#x, min(val#x)#x AS min(val)#x]


Expand Down