Skip to content

Commit 46f7d78

Browse files
shahidki31maropu
authored andcommitted
[SPARK-35368][SQL] Update histogram statistics for RANGE operator for stats estimation
### What changes were proposed in this pull request? Update histogram statistics for RANGE operator stats estimation ### Why are the changes needed? If histogram optimization is enabled, this statistics can be used in various cost based optimizations. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UTs. Manual test. Closes #32498 from shahidki31/shahid/histogram. Lead-authored-by: shahid <shahidki31@gmail.com> Co-authored-by: Shahid <shahidki31@gmail.com> Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
1 parent a72d05c commit 46f7d78

File tree

2 files changed

+108
-16
lines changed

2 files changed

+108
-16
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -774,13 +774,21 @@ case class Range(
774774
} else {
775775
(start + (numElements - 1) * step, start)
776776
}
777+
778+
val histogram = if (conf.histogramEnabled) {
779+
Some(computeHistogramStatistics())
780+
} else {
781+
None
782+
}
783+
777784
val colStat = ColumnStat(
778785
distinctCount = Some(numElements),
779786
max = Some(maxVal),
780787
min = Some(minVal),
781788
nullCount = Some(0),
782789
avgLen = Some(LongType.defaultSize),
783-
maxLen = Some(LongType.defaultSize))
790+
maxLen = Some(LongType.defaultSize),
791+
histogram = histogram)
784792

785793
Statistics(
786794
sizeInBytes = LongType.defaultSize * numElements,
@@ -789,6 +797,39 @@ case class Range(
789797
}
790798
}
791799

800+
private def computeHistogramStatistics(): Histogram = {
801+
val numBins = conf.histogramNumBins
802+
val height = numElements.toDouble / numBins
803+
val percentileArray = (0 to numBins).map(i => i * height).toArray
804+
805+
val lowerIndexInitial: Double = percentileArray.head
806+
val lowerBinValueInitial: Long = getRangeValue(0)
807+
val (_, _, binArray) = percentileArray.tail
808+
.foldLeft((lowerIndexInitial, lowerBinValueInitial, Seq.empty[HistogramBin])) {
809+
case ((lowerIndex, lowerBinValue, binAr), upperIndex) =>
810+
// Integer index for upper and lower values in the bin.
811+
val upperIndexPos = math.ceil(upperIndex).toInt - 1
812+
val lowerIndexPos = math.ceil(lowerIndex).toInt - 1
813+
814+
val upperBinValue = getRangeValue(math.max(upperIndexPos, 0))
815+
val ndv = math.max(upperIndexPos - lowerIndexPos, 1)
816+
// Update the lowerIndex and lowerBinValue with upper ones for the next iteration.
817+
(upperIndex, upperBinValue, binAr :+ HistogramBin(lowerBinValue, upperBinValue, ndv))
818+
}
819+
Histogram(height, binArray.toArray)
820+
}
821+
822+
// Utility method to compute histogram
823+
private def getRangeValue(index: Int): Long = {
824+
assert(index >= 0, "index must be greater than and equal to 0")
825+
if (step < 0) {
826+
// Reverse the range values for computing histogram, if the step size is negative.
827+
start + (numElements.toLong - index - 1) * step
828+
} else {
829+
start + index * step
830+
}
831+
}
832+
792833
override def outputOrdering: Seq[SortOrder] = {
793834
val order = if (step > 0) {
794835
Ascending

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala

Lines changed: 66 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
4343

4444
test("range with positive step") {
4545
val range = Range(1, 5, 1, None)
46+
val histogramBins = Array(
47+
HistogramBin(1.0, 2.0, 2),
48+
HistogramBin(2.0, 3.0, 1),
49+
HistogramBin(3.0, 4.0, 1))
50+
val histogram = Some(Histogram(4.toDouble / 3, histogramBins))
51+
// Number of range elements should be same as number of distinct values
52+
assert(range.numElements === 4)
53+
4654
val rangeStats = Statistics(
4755
sizeInBytes = 4 * 8,
4856
rowCount = Some(4),
@@ -57,12 +65,23 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
5765
max = Some(4),
5866
nullCount = Some(0),
5967
maxLen = Some(LongType.defaultSize),
60-
avgLen = Some(LongType.defaultSize))))))
61-
checkStats(range, expectedStatsCboOn = rangeStats, expectedStatsCboOff = rangeStats)
68+
avgLen = Some(LongType.defaultSize),
69+
histogram = histogram)))))
70+
val extraConfig = Map(SQLConf.HISTOGRAM_ENABLED.key -> "true",
71+
SQLConf.HISTOGRAM_NUM_BINS.key -> "3")
72+
checkStats(range, expectedStatsCboOn = rangeStats,
73+
expectedStatsCboOff = rangeStats, extraConfig)
6274
}
6375

6476
test("range with positive step where end minus start not divisible by step") {
6577
val range = Range(-4, 5, 2, None)
78+
val histogramBins = Array(
79+
HistogramBin(-4.0, -2.0, 2),
80+
HistogramBin(-2.0, 2.0, 2),
81+
HistogramBin(2.0, 4.0, 1))
82+
val histogram = Some(Histogram(5.toDouble / 3, histogramBins))
83+
// Number of range elements should be same as number of distinct values
84+
assert(range.numElements === 5)
6685
val rangeStats = Statistics(
6786
sizeInBytes = 5 * 8,
6887
rowCount = Some(5),
@@ -77,12 +96,23 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
7796
max = Some(4),
7897
nullCount = Some(0),
7998
maxLen = Some(LongType.defaultSize),
80-
avgLen = Some(LongType.defaultSize))))))
81-
checkStats(range, expectedStatsCboOn = rangeStats, expectedStatsCboOff = rangeStats)
99+
avgLen = Some(LongType.defaultSize),
100+
histogram = histogram)))))
101+
val extraConfig = Map(SQLConf.HISTOGRAM_ENABLED.key -> "true",
102+
SQLConf.HISTOGRAM_NUM_BINS.key -> "3")
103+
checkStats(range, expectedStatsCboOn = rangeStats,
104+
expectedStatsCboOff = rangeStats, extraConfig)
82105
}
83106

84107
test("range with negative step") {
85108
val range = Range(-10, -20, -2, None)
109+
val histogramBins = Array(
110+
HistogramBin(-18.0, -16.0, 2),
111+
HistogramBin(-16.0, -12.0, 2),
112+
HistogramBin(-12.0, -10.0, 1))
113+
val histogram = Some(Histogram(5.toDouble / 3, histogramBins))
114+
// Number of range elements should be same as number of distinct values
115+
assert(range.numElements === 5)
86116
val rangeStats = Statistics(
87117
sizeInBytes = 5 * 8,
88118
rowCount = Some(5),
@@ -97,12 +127,24 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
97127
max = Some(-10),
98128
nullCount = Some(0),
99129
maxLen = Some(LongType.defaultSize),
100-
avgLen = Some(LongType.defaultSize))))))
101-
checkStats(range, expectedStatsCboOn = rangeStats, expectedStatsCboOff = rangeStats)
130+
avgLen = Some(LongType.defaultSize),
131+
histogram = histogram)))))
132+
val extraConfig = Map(SQLConf.HISTOGRAM_ENABLED.key -> "true",
133+
SQLConf.HISTOGRAM_NUM_BINS.key -> "3")
134+
checkStats(range, expectedStatsCboOn = rangeStats,
135+
expectedStatsCboOff = rangeStats, extraConfig)
102136
}
103137

104138
test("range with negative step where end minus start not divisible by step") {
105139
val range = Range(-10, -20, -3, None)
140+
val histogramBins = Array(
141+
HistogramBin(-19.0, -16.0, 2),
142+
HistogramBin(-16.0, -13.0, 1),
143+
HistogramBin(-13.0, -10.0, 1))
144+
val histogram = Some(Histogram(4.toDouble / 3, histogramBins))
145+
// Number of range elements should be same as number of distinct values
146+
assert(range.numElements === 4)
147+
106148
val rangeStats = Statistics(
107149
sizeInBytes = 4 * 8,
108150
rowCount = Some(4),
@@ -117,14 +159,21 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
117159
max = Some(-10),
118160
nullCount = Some(0),
119161
maxLen = Some(LongType.defaultSize),
120-
avgLen = Some(LongType.defaultSize))))))
121-
checkStats(range, expectedStatsCboOn = rangeStats, expectedStatsCboOff = rangeStats)
162+
avgLen = Some(LongType.defaultSize),
163+
histogram = histogram)))))
164+
val extraConfig = Map(SQLConf.HISTOGRAM_ENABLED.key -> "true",
165+
SQLConf.HISTOGRAM_NUM_BINS.key -> "3")
166+
checkStats(range, expectedStatsCboOn = rangeStats,
167+
expectedStatsCboOff = rangeStats, extraConfig)
122168
}
123169

124170
test("range with empty output") {
125-
val range = Range(-10, -10, -1, None)
126-
val rangeStats = Statistics(sizeInBytes = 0, rowCount = Some(0))
127-
checkStats(range, expectedStatsCboOn = rangeStats, expectedStatsCboOff = rangeStats)
171+
val range = Range(-10, -10, -1, None)
172+
val rangeStats = Statistics(sizeInBytes = 0, rowCount = Some(0))
173+
val extraConfig = Map(SQLConf.HISTOGRAM_ENABLED.key -> "true",
174+
SQLConf.HISTOGRAM_NUM_BINS.key -> "3")
175+
checkStats(range, expectedStatsCboOn = rangeStats,
176+
expectedStatsCboOff = rangeStats, extraConfig)
128177
}
129178

130179
test("windows") {
@@ -283,14 +332,16 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
283332
private def checkStats(
284333
plan: LogicalPlan,
285334
expectedStatsCboOn: Statistics,
286-
expectedStatsCboOff: Statistics): Unit = {
287-
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
335+
expectedStatsCboOff: Statistics,
336+
extraConfigs: Map[String, String] = Map.empty): Unit = {
337+
val cboEnabledConfig = Seq(SQLConf.CBO_ENABLED.key -> "true") ++ extraConfigs.toSeq
338+
withSQLConf(cboEnabledConfig: _*) {
288339
// Invalidate statistics
289340
plan.invalidateStatsCache()
290341
assert(plan.stats == expectedStatsCboOn)
291342
}
292-
293-
withSQLConf(SQLConf.CBO_ENABLED.key -> "false") {
343+
val cboDisabledConfig = Seq(SQLConf.CBO_ENABLED.key -> "false") ++ extraConfigs.toSeq
344+
withSQLConf(cboDisabledConfig: _*) {
294345
plan.invalidateStatsCache()
295346
assert(plan.stats == expectedStatsCboOff)
296347
}

0 commit comments

Comments
 (0)