Skip to content

Commit d3a4fa9

Browse files
committed
Removed Ordering[T] in ColumnStats for better performance
1 parent 5034453 commit d3a4fa9

File tree

3 files changed

+153
-40
lines changed

3 files changed

+153
-40
lines changed

sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala

Lines changed: 148 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -69,36 +69,13 @@ private[sql] sealed abstract class NativeColumnStats[T <: NativeType]
6969

7070
protected var (_lower, _upper) = initialBounds
7171

72-
val ordering: Ordering[JvmType]
73-
7472
def initialBounds: (JvmType, JvmType)
7573

7674
protected def columnType: NativeColumnType[T]
7775

78-
override def lowerBound = _lower
79-
80-
override def upperBound = _upper
76+
override def lowerBound: T#JvmType = _lower
8177

82-
override def gatherStats(row: Row, ordinal: Int) {
83-
val field = columnType.getField(row, ordinal)
84-
if (upperBound == null || ordering.gt(field, upperBound)) _upper = field
85-
if (lowerBound == null || ordering.lt(field, lowerBound)) _lower = field
86-
}
87-
88-
override def contains(row: Row, ordinal: Int) = {
89-
val field = columnType.getField(row, ordinal)
90-
ordering.lteq(lowerBound, field) && ordering.lteq(field, upperBound)
91-
}
92-
93-
override def isAbove(row: Row, ordinal: Int) = {
94-
val field = columnType.getField(row, ordinal)
95-
ordering.lt(field, upperBound)
96-
}
97-
98-
override def isBelow(row: Row, ordinal: Int) = {
99-
val field = columnType.getField(row, ordinal)
100-
ordering.lt(lowerBound, field)
101-
}
78+
override def upperBound: T#JvmType = _upper
10279

10380
override def isAtOrAbove(row: Row, ordinal: Int) = {
10481
contains(row, ordinal) || isAbove(row, ordinal)
@@ -132,33 +109,141 @@ private[sql] abstract class BasicColumnStats[T <: NativeType](
132109
extends NativeColumnStats[T]
133110

134111
private[sql] class BooleanColumnStats extends BasicColumnStats(BOOLEAN) {
135-
override val ordering = implicitly[Ordering[JvmType]]
136112
override def initialBounds = (true, false)
113+
114+
override def isBelow(row: Row, ordinal: Int) = {
115+
lowerBound < columnType.getField(row, ordinal)
116+
}
117+
118+
override def isAbove(row: Row, ordinal: Int) = {
119+
columnType.getField(row, ordinal) < upperBound
120+
}
121+
122+
override def contains(row: Row, ordinal: Int) = {
123+
val field = columnType.getField(row, ordinal)
124+
lowerBound <= field && field <= upperBound
125+
}
126+
127+
override def gatherStats(row: Row, ordinal: Int) {
128+
val field = columnType.getField(row, ordinal)
129+
if (field > upperBound) _upper = field
130+
if (field < lowerBound) _lower = field
131+
}
137132
}
138133

139134
private[sql] class ByteColumnStats extends BasicColumnStats(BYTE) {
140-
override val ordering = implicitly[Ordering[JvmType]]
141135
override def initialBounds = (Byte.MaxValue, Byte.MinValue)
136+
137+
override def isBelow(row: Row, ordinal: Int) = {
138+
lowerBound < columnType.getField(row, ordinal)
139+
}
140+
141+
override def isAbove(row: Row, ordinal: Int) = {
142+
columnType.getField(row, ordinal) < upperBound
143+
}
144+
145+
override def contains(row: Row, ordinal: Int) = {
146+
val field = columnType.getField(row, ordinal)
147+
lowerBound <= field && field <= upperBound
148+
}
149+
150+
override def gatherStats(row: Row, ordinal: Int) {
151+
val field = columnType.getField(row, ordinal)
152+
if (field > upperBound) _upper = field
153+
if (field < lowerBound) _lower = field
154+
}
142155
}
143156

144157
private[sql] class ShortColumnStats extends BasicColumnStats(SHORT) {
145-
override val ordering = implicitly[Ordering[JvmType]]
146158
override def initialBounds = (Short.MaxValue, Short.MinValue)
159+
160+
override def isBelow(row: Row, ordinal: Int) = {
161+
lowerBound < columnType.getField(row, ordinal)
162+
}
163+
164+
override def isAbove(row: Row, ordinal: Int) = {
165+
columnType.getField(row, ordinal) < upperBound
166+
}
167+
168+
override def contains(row: Row, ordinal: Int) = {
169+
val field = columnType.getField(row, ordinal)
170+
lowerBound <= field && field <= upperBound
171+
}
172+
173+
override def gatherStats(row: Row, ordinal: Int) {
174+
val field = columnType.getField(row, ordinal)
175+
if (field > upperBound) _upper = field
176+
if (field < lowerBound) _lower = field
177+
}
147178
}
148179

149180
private[sql] class LongColumnStats extends BasicColumnStats(LONG) {
150-
override val ordering = implicitly[Ordering[JvmType]]
151181
override def initialBounds = (Long.MaxValue, Long.MinValue)
182+
183+
override def isBelow(row: Row, ordinal: Int) = {
184+
lowerBound < columnType.getField(row, ordinal)
185+
}
186+
187+
override def isAbove(row: Row, ordinal: Int) = {
188+
columnType.getField(row, ordinal) < upperBound
189+
}
190+
191+
override def contains(row: Row, ordinal: Int) = {
192+
val field = columnType.getField(row, ordinal)
193+
lowerBound <= field && field <= upperBound
194+
}
195+
196+
override def gatherStats(row: Row, ordinal: Int) {
197+
val field = columnType.getField(row, ordinal)
198+
if (field > upperBound) _upper = field
199+
if (field < lowerBound) _lower = field
200+
}
152201
}
153202

154203
private[sql] class DoubleColumnStats extends BasicColumnStats(DOUBLE) {
155-
override val ordering = implicitly[Ordering[JvmType]]
156204
override def initialBounds = (Double.MaxValue, Double.MinValue)
205+
206+
override def isBelow(row: Row, ordinal: Int) = {
207+
lowerBound < columnType.getField(row, ordinal)
208+
}
209+
210+
override def isAbove(row: Row, ordinal: Int) = {
211+
columnType.getField(row, ordinal) < upperBound
212+
}
213+
214+
override def contains(row: Row, ordinal: Int) = {
215+
val field = columnType.getField(row, ordinal)
216+
lowerBound <= field && field <= upperBound
217+
}
218+
219+
override def gatherStats(row: Row, ordinal: Int) {
220+
val field = columnType.getField(row, ordinal)
221+
if (field > upperBound) _upper = field
222+
if (field < lowerBound) _lower = field
223+
}
157224
}
158225

159226
private[sql] class FloatColumnStats extends BasicColumnStats(FLOAT) {
160-
override val ordering = implicitly[Ordering[JvmType]]
161227
override def initialBounds = (Float.MaxValue, Float.MinValue)
228+
229+
override def isBelow(row: Row, ordinal: Int) = {
230+
lowerBound < columnType.getField(row, ordinal)
231+
}
232+
233+
override def isAbove(row: Row, ordinal: Int) = {
234+
columnType.getField(row, ordinal) < upperBound
235+
}
236+
237+
override def contains(row: Row, ordinal: Int) = {
238+
val field = columnType.getField(row, ordinal)
239+
lowerBound <= field && field <= upperBound
240+
}
241+
242+
override def gatherStats(row: Row, ordinal: Int) {
243+
val field = columnType.getField(row, ordinal)
244+
if (field > upperBound) _upper = field
245+
if (field < lowerBound) _lower = field
246+
}
162247
}
163248

164249
object IntColumnStats {
@@ -181,10 +266,22 @@ private[sql] class IntColumnStats extends BasicColumnStats(INT) {
181266
def isOrdered = isAscending || isDescending
182267
def maxDelta = _maxDelta
183268

184-
override val ordering = implicitly[Ordering[JvmType]]
185269
override def initialBounds = (Int.MaxValue, Int.MinValue)
186270

187-
override def gatherStats(row: Row, ordinal: Int) = {
271+
override def isBelow(row: Row, ordinal: Int) = {
272+
lowerBound < columnType.getField(row, ordinal)
273+
}
274+
275+
override def isAbove(row: Row, ordinal: Int) = {
276+
columnType.getField(row, ordinal) < upperBound
277+
}
278+
279+
override def contains(row: Row, ordinal: Int) = {
280+
val field = columnType.getField(row, ordinal)
281+
lowerBound <= field && field <= upperBound
282+
}
283+
284+
override def gatherStats(row: Row, ordinal: Int) {
188285
val field = columnType.getField(row, ordinal)
189286

190287
if (field > upperBound) _upper = field
@@ -221,18 +318,32 @@ private[sql] class IntColumnStats extends BasicColumnStats(INT) {
221318
}
222319

223320
private[sql] class StringColumnStats extends BasicColumnStats(STRING) {
224-
override val ordering = implicitly[Ordering[JvmType]]
225321
override def initialBounds = (null, null)
226322

323+
override def gatherStats(row: Row, ordinal: Int) {
324+
val field = columnType.getField(row, ordinal)
325+
if ((upperBound eq null) || field.compareTo(upperBound) > 0) _upper = field
326+
if ((lowerBound eq null) || field.compareTo(lowerBound) < 0) _lower = field
327+
}
328+
227329
override def contains(row: Row, ordinal: Int) = {
228-
!(upperBound eq null) && super.contains(row, ordinal)
330+
!(upperBound eq null) && {
331+
val field = columnType.getField(row, ordinal)
332+
lowerBound.compareTo(field) <= 0 && field.compareTo(upperBound) <= 0
333+
}
229334
}
230335

231336
override def isAbove(row: Row, ordinal: Int) = {
232-
!(upperBound eq null) && super.isAbove(row, ordinal)
337+
!(upperBound eq null) && {
338+
val field = columnType.getField(row, ordinal)
339+
field.compareTo(upperBound) < 0
340+
}
233341
}
234342

235343
override def isBelow(row: Row, ordinal: Int) = {
236-
!(lowerBound eq null) && super.isBelow(row, ordinal)
344+
!(lowerBound eq null) && {
345+
val field = columnType.getField(row, ordinal)
346+
lowerBound.compareTo(field) < 0
347+
}
237348
}
238349
}

sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/compressionSchemes.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ private[sql] object RunLengthEncoding extends CompressionScheme {
7777
case _ => false
7878
}
7979

80-
class Encoder extends compression.Encoder{
80+
class Encoder extends compression.Encoder {
8181
private var _uncompressedSize = 0
8282
private var _compressedSize = 0
8383

sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,10 @@ class ColumnStatsSuite extends FunSuite {
5050
rows.foreach(columnStats.gatherStats(_, 0))
5151

5252
val values = rows.map(_.head.asInstanceOf[T#JvmType])
53-
assert(columnStats.lowerBound === values.min(columnStats.ordering))
54-
assert(columnStats.upperBound === values.max(columnStats.ordering))
53+
val ordering = columnType.dataType.ordering.asInstanceOf[Ordering[T#JvmType]]
54+
55+
assert(columnStats.lowerBound === values.min(ordering))
56+
assert(columnStats.upperBound === values.max(ordering))
5557
}
5658
}
5759
}

0 commit comments

Comments
 (0)