Skip to content

Commit ac5d5d7

Browse files
kiszkgatorsmile
authored andcommitted
[SPARK-21344][SQL] BinaryType comparison does signed byte array comparison
## What changes were proposed in this pull request? This PR fixes a wrong comparison for `BinaryType`. This PR enables unsigned comparison and unsigned prefix generation for an array for `BinaryType`. Previous implementations uses signed operations. ## How was this patch tested? Added a test suite in `OrderingSuite`. Author: Kazuaki Ishizaki <[email protected]> Closes #18571 from kiszk/SPARK-21344.
1 parent 2d968a0 commit ac5d5d7

File tree

6 files changed

+60
-3
lines changed

6 files changed

+60
-3
lines changed

common/unsafe/src/main/java/org/apache/spark/unsafe/types/ByteArray.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static long getPrefix(byte[] bytes) {
4444
final int minLen = Math.min(bytes.length, 8);
4545
long p = 0;
4646
for (int i = 0; i < minLen; ++i) {
47-
p |= (128L + Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i))
47+
p |= ((long) Platform.getByte(bytes, Platform.BYTE_ARRAY_OFFSET + i) & 0xff)
4848
<< (56 - 8 * i);
4949
}
5050
return p;

core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/PrefixComparatorsSuite.scala

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
6363

6464
def compareBinary(x: Array[Byte], y: Array[Byte]): Int = {
6565
for (i <- 0 until x.length; if i < y.length) {
66-
val res = x(i).compare(y(i))
66+
val v1 = x(i) & 0xff
67+
val v2 = y(i) & 0xff
68+
val res = v1 - v2
6769
if (res != 0) return res
6870
}
6971
x.length - y.length
@@ -80,6 +82,19 @@ class PrefixComparatorsSuite extends SparkFunSuite with PropertyChecks {
8082
(prefixComparisonResult > 0 && compareBinary(x, y) > 0))
8183
}
8284

85+
val binaryRegressionTests = Seq(
86+
(Array[Byte](1), Array[Byte](-1)),
87+
(Array[Byte](1, 1, 1, 1, 1), Array[Byte](1, 1, 1, 1, -1)),
88+
(Array[Byte](1, 1, 1, 1, 1, 1, 1, 1, 1), Array[Byte](1, 1, 1, 1, 1, 1, 1, 1, -1)),
89+
(Array[Byte](1), Array[Byte](1, 1, 1, 1)),
90+
(Array[Byte](1, 1, 1, 1, 1), Array[Byte](1, 1, 1, 1, 1, 1, 1, 1, 1)),
91+
(Array[Byte](-1), Array[Byte](-1, -1, -1, -1)),
92+
(Array[Byte](-1, -1, -1, -1, -1), Array[Byte](-1, -1, -1, -1, -1, -1, -1, -1, -1))
93+
)
94+
binaryRegressionTests.foreach { case (b1, b2) =>
95+
testPrefixComparison(b1, b2)
96+
}
97+
8398
// scalastyle:off
8499
val regressionTests = Table(
85100
("s1", "s2"),

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TypeUtils.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,9 @@ object TypeUtils {
7070

7171
def compareBinary(x: Array[Byte], y: Array[Byte]): Int = {
7272
for (i <- 0 until x.length; if i < y.length) {
73-
val res = x(i).compareTo(y(i))
73+
val v1 = x(i) & 0xff
74+
val v2 = y(i) & 0xff
75+
val res = v1 - v2
7476
if (res != 0) return res
7577
}
7678
x.length - y.length

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/OrderingSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,23 @@ class OrderingSuite extends SparkFunSuite with ExpressionEvalHelper {
137137
// verify that we can support up to 5000 ordering comparisons, which should be sufficient
138138
GenerateOrdering.generate(Array.fill(5000)(sortOrder))
139139
}
140+
141+
test("SPARK-21344: BinaryType comparison does signed byte array comparison") {
142+
val data = Seq(
143+
(Array[Byte](1), Array[Byte](-1)),
144+
(Array[Byte](1, 1, 1, 1, 1), Array[Byte](1, 1, 1, 1, -1)),
145+
(Array[Byte](1, 1, 1, 1, 1, 1, 1, 1, 1), Array[Byte](1, 1, 1, 1, 1, 1, 1, 1, -1))
146+
)
147+
data.foreach { case (b1, b2) =>
148+
val rowOrdering = InterpretedOrdering.forSchema(Seq(BinaryType))
149+
val genOrdering = GenerateOrdering.generate(
150+
BoundReference(0, BinaryType, nullable = true).asc :: Nil)
151+
val rowType = StructType(StructField("b", BinaryType, nullable = true) :: Nil)
152+
val toCatalyst = CatalystTypeConverters.createToCatalystConverter(rowType)
153+
val rowB1 = toCatalyst(Row(b1)).asInstanceOf[InternalRow]
154+
val rowB2 = toCatalyst(Row(b2)).asInstanceOf[InternalRow]
155+
assert(rowOrdering.compare(rowB1, rowB2) < 0)
156+
assert(genOrdering.compare(rowB1, rowB2) < 0)
157+
}
158+
}
140159
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
-- binary type
2+
select x'00' < x'0f';
3+
select x'00' < x'ff';
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
-- Automatically generated by SQLQueryTestSuite
2+
-- Number of queries: 2
3+
4+
5+
-- !query 0
6+
select x'00' < x'0f'
7+
-- !query 0 schema
8+
struct<(X'00' < X'0F'):boolean>
9+
-- !query 0 output
10+
true
11+
12+
13+
-- !query 1
14+
select x'00' < x'ff'
15+
-- !query 1 schema
16+
struct<(X'00' < X'FF'):boolean>
17+
-- !query 1 output
18+
true

0 commit comments

Comments
 (0)