Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,16 @@ object BindReferences extends Logging {

def bindReference[A <: Expression](
expression: A,
input: Seq[Attribute],
input: AttributeSeq,
allowFailures: Boolean = false): A = {
expression.transform { case a: AttributeReference =>
attachTree(a, "Binding attribute") {
val ordinal = input.indexWhere(_.exprId == a.exprId)
val ordinal = input.getOrdinal(a.exprId)
if (ordinal == -1) {
if (allowFailures) {
a
} else {
sys.error(s"Couldn't find $a in ${input.mkString("[", ",", "]")}")
sys.error(s"Couldn't find $a in ${input.attrs.mkString("[", ",", "]")}")
}
} else {
BoundReference(ordinal, a.dataType, input(ordinal).nullable)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,31 @@ package object expressions {
/**
* Helper functions for working with `Seq[Attribute]`.
*/
implicit class AttributeSeq(attrs: Seq[Attribute]) {
implicit class AttributeSeq(val attrs: Seq[Attribute]) {
/** Creates a StructType with a schema matching this `Seq[Attribute]`. */
def toStructType: StructType = {
StructType(attrs.map(a => StructField(a.name, a.dataType, a.nullable)))
}

private lazy val inputArr = attrs.toArray

private lazy val inputToOrdinal = {
val map = new java.util.HashMap[ExprId, Int](inputArr.length * 2)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why *2 is necessary?
I think that the size of map's entry is up to attrs.size since the max number of calling map.put() is equal to `attrs.size. Isattrs.size``equal to``inputArr.legnth``?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal was to avoid having to rehash the elements of the hash map once the number of inserted keys exceeded the default 0.75 load factor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on withExpectedSize

var index = 0
attrs.foreach { attr =>
if (!map.containsKey(attr.exprId)) {
map.put(attr.exprId, index)
}
index += 1
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which style is better, this style or a style to use zipWithIndex?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was just a minor perf. optimization since this method was in a hot path.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the other hand, accessing a variable that's outside of the inner closure is likely to be expensive, too. It's probably fastest to just iterate over the elements of inputArr.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I did not have no preference. Good to hear a reason for this decision.

}
map
}

def apply(ordinal: Int): Attribute = inputArr(ordinal)

def getOrdinal(exprId: ExprId): Int = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose this needs documentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup ...

Option(inputToOrdinal.get(exprId)).getOrElse(-1)
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
/**
* All the attributes that are used for this plan.
*/
lazy val allAttributes: Seq[Attribute] = children.flatMap(_.output)
lazy val allAttributes: AttributeSeq = children.flatMap(_.output)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ericl and I found another layer of polynomial looping: in QueryPlan.cleanArgs we take every expression in the query plan and bind its references against allAttributes, which can be huge. If we turn this into an AttributeSeq once and build the map inside of that wrapper then we amortize that cost and remove this expensive loop.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably construct the AttributeSeq outside of the loop in the various projection operators, too, although that doesn't appear to be as serious a bottleneck yet.


private def cleanExpression(e: Expression): Expression = e match {
case a: Alias =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ case class HashAggregateExec(

require(HashAggregateExec.supportsAggregate(aggregateBufferAttributes))

override lazy val allAttributes: Seq[Attribute] =
override lazy val allAttributes: AttributeSeq =
child.output ++ aggregateBufferAttributes ++ aggregateAttributes ++
aggregateExpressions.flatMap(_.aggregateFunction.inputAggBufferAttributes)

Expand Down