-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-7076][SPARK-7077][SPARK-7080][SQL] Use managed memory for aggregations #5725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
480a74a
ab68e08
f03e9c1
5d55cef
8a8f9df
1ff814d
53ba9b7
fc4c3a8
1a483c5
079f1bf
f764d13
c754ae1
ae39694
c7f0b56
62ab054
c55bf66
738fa33
c1b3813
7df6008
58ac393
d2bb986
b3eaccd
bade966
d85eeff
1f4b716
92d5a06
628f936
23a440a
765243d
b26f1d3
49aed30
29a7575
ef6b3d3
06e929d
854201a
f3dcbfe
afe8dca
a95291e
31eaabc
6ffdaa1
9c19fc0
cde4132
0925847
a8e4a3f
b45f070
162caf7
3ca84b2
529e571
ce3c565
78a5b84
a19e066
de5e001
6e4b192
70a39e4
50e9671
017b2dc
1bc36cc
81f34f8
eeee512
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
The configuration of HEAP vs UNSAFE is now done at the Spark core level. The translation of encoded 64-bit addresses into base object + offset pairs is now handled by MemoryManager, allowing this pointers to be safely passed between operators that exchange data pages.
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
|
|
||
| package org.apache.spark.sql.execution | ||
|
|
||
| import org.apache.spark.SparkEnv | ||
| import org.apache.spark.annotation.DeveloperApi | ||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.trees._ | ||
|
|
@@ -43,16 +44,14 @@ case class AggregateEvaluation( | |
| * @param aggregateExpressions expressions that are computed for each group. | ||
| * @param child the input data source. | ||
| * @param unsafeEnabled whether to allow Unsafe-based aggregation buffers to be used. | ||
| * @param useOffHeap whether to use off-heap allocation (only takes effect if unsafeEnabled=true) | ||
| */ | ||
| @DeveloperApi | ||
| case class GeneratedAggregate( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should consider breaking this into two operators at some point so the explain is more readable.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed. I'd like to avoid duplication of the code generation logic, though. |
||
| partial: Boolean, | ||
| groupingExpressions: Seq[Expression], | ||
| aggregateExpressions: Seq[NamedExpression], | ||
| child: SparkPlan, | ||
| unsafeEnabled: Boolean, | ||
| useOffHeap: Boolean) | ||
| unsafeEnabled: Boolean) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: typically place children last (although we aren't great about following this convention)... |
||
| extends UnaryNode { | ||
|
|
||
| override def requiredChildDistribution: Seq[Distribution] = | ||
|
|
@@ -291,7 +290,7 @@ case class GeneratedAggregate( | |
| newAggregationBuffer(EmptyRow), | ||
| aggregationBufferSchema, | ||
| groupKeySchema, | ||
| if (useOffHeap) MemoryAllocator.UNSAFE else MemoryAllocator.HEAP, | ||
| SparkEnv.get.unsafeMemoryManager, | ||
| 1024 * 16, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. use named argument for this and the next line
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK we can't use the Scala named argument syntax because
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ok nvm. then let's add a comment like 1024 * 16, // initial capacity
... |
||
| false | ||
| ) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,10 +141,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
| groupingExpressions, | ||
| partialComputation, | ||
| planLater(child), | ||
| unsafeEnabled, | ||
| unsafeUseOffHeap), | ||
| unsafeEnabled, | ||
| unsafeUseOffHeap) :: Nil | ||
| unsafeEnabled), | ||
| unsafeEnabled) :: Nil | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indent off here |
||
|
|
||
| // Cases where some aggregate can not be codegened | ||
| case PartialAggregation( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unless we think there are going to be other options under unsafe, we might consider getting rid of
enabledto matchcodegenThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was an option for choosing heap vs. offheap, but that option got moved into Spark core. I think we use the
enabledconvention a bit more in core. I don't have strong feelings on this one.