Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
e14b545
[SPARK-7977] [BUILD] Disallowing println
jonalter Jul 10, 2015
11e22b7
[SPARK-7944] [SPARK-8013] Remove most of the Spark REPL fork for Scal…
dragos Jul 10, 2015
5dd45bd
[SPARK-8958] Dynamic allocation: change cached timeout to infinity
Jul 10, 2015
db6d57f
[CORE] [MINOR] change the log level to info
chenghao-intel Jul 10, 2015
c185f3a
[SPARK-8675] Executors created by LocalBackend won't get the same cla…
coderplay Jul 10, 2015
05ac023
[HOTFIX] fix flaky test in PySpark SQL
Jul 10, 2015
0772026
[SPARK-8923] [DOCUMENTATION, MLLIB] Add @since tags to mllib.fpm
rahulpalamuttam Jul 10, 2015
fb8807c
[SPARK-7078] [SPARK-7079] Binary processing sort for Spark SQL
JoshRosen Jul 10, 2015
857e325
[SPARK-8990] [SQL] SPARK-8990 DataFrameReader.parquet() should respec…
liancheng Jul 10, 2015
b6fc0ad
add inline comment for python tests
davies Jul 11, 2015
3363088
[SPARK-8961] [SQL] Makes BaseWriterContainer.outputWriterForRow accep…
liancheng Jul 11, 2015
6e1c7e2
[SPARK-7735] [PYSPARK] Raise Exception on non-zero exit from pipe com…
megatron-me-uk Jul 11, 2015
9c50757
[SPARK-8598] [MLLIB] Implementation of 1-sample, two-sided, Kolmogoro…
Jul 11, 2015
7f6be1f
[SPARK-6487] [MLLIB] Add sequential pattern mining algorithm PrefixSp…
zhangjiajin Jul 11, 2015
0c5207c
[SPARK-8994] [ML] tiny cleanups to Params, Pipeline
jkbradley Jul 11, 2015
c472eb1
[SPARK-8970][SQL] remove unnecessary abstraction for ExtractValue
cloud-fan Jul 11, 2015
3009088
[SPARK-8880] Fix confusing Stage.attemptId member variable
kayousterhout Jul 13, 2015
20b4743
[SPARK-9006] [PYSPARK] fix microsecond loss in Python 3
Jul 13, 2015
92540d2
[SPARK-8203] [SPARK-8204] [SQL] conditional function: least/greatest
adrian-wang Jul 13, 2015
6b89943
[SPARK-8944][SQL] Support casting between IntervalType and StringType
cloud-fan Jul 13, 2015
a5bc803
[SPARK-8596] Add module for rstudio link to spark
koaning Jul 13, 2015
7f487c8
[SPARK-6797] [SPARKR] Add support for YARN cluster mode.
Jul 13, 2015
9b62e93
[SPARK-8706] [PYSPARK] [PROJECT INFRA] Add pylint checks to PySpark
MechCoder Jul 13, 2015
5ca26fb
[SPARK-8950] [WEBUI] Correct the calculation of SchedulerDelay in Sta…
carsonwang Jul 13, 2015
79c3582
Revert "[SPARK-8706] [PYSPARK] [PROJECT INFRA] Add pylint checks to P…
davies Jul 13, 2015
5c41691
[SPARK-8954] [BUILD] Remove unneeded deb repository from Dockerfile t…
yongtang Jul 13, 2015
714fc55
[SPARK-8991] [ML] Update SharedParamsCodeGen's Generated Documentation
Jul 13, 2015
4c797f2
[SPARK-8636] [SQL] Fix equalNullSafe comparison
Jul 13, 2015
0aed38e
[SPARK-8533] [STREAMING] Upgrade Flume to 1.6.0
harishreedharan Jul 13, 2015
b7bcbe2
[SPARK-8743] [STREAMING] Deregister Codahale metrics for streaming wh…
Jul 13, 2015
408b384
[SPARK-6910] [SQL] Support for pushing predicates down to metastore f…
Jul 14, 2015
20c1434
[SPARK-9001] Fixing errors in javadocs that lead to failed build/sbt doc
jegonzal Jul 14, 2015
c1feebd
[SPARK-9010] [DOCUMENTATION] Improve the Spark Configuration document…
stanzhai Jul 14, 2015
257236c
[SPARK-6851] [SQL] function least/greatest follow up
adrian-wang Jul 14, 2015
59d820a
[SPARK-9029] [SQL] shortcut CaseKeyWhen if key is null
cloud-fan Jul 14, 2015
37f2d96
[SPARK-9027] [SQL] Generalize metastore predicate pushdown
marmbrus Jul 14, 2015
c4e98ff
[SPARK-8933] [BUILD] Provide a --force flag to build/mvn that always …
Jul 14, 2015
8fb3a65
[SPARK-8911] Fix local mode endless heartbeats
Jul 14, 2015
d267c28
[SPARK-9031] Merge BlockObjectWriter and DiskBlockObject writer to re…
JoshRosen Jul 14, 2015
0a4071e
[SPARK-8718] [GRAPHX] Improve EdgePartition2D for non perfect square …
aray Jul 14, 2015
fb1d06f
[SPARK-4072] [CORE] Display Streaming blocks in Streaming UI
zsxwing Jul 14, 2015
4b5cfc9
[SPARK-8800] [SQL] Fix inaccurate precision/scale of Decimal division…
viirya Jul 14, 2015
740b034
[SPARK-4362] [MLLIB] Make prediction probability available in NaiveBa…
srowen Jul 14, 2015
11e5c37
[SPARK-8962] Add Scalastyle rule to ban direct use of Class.forName; …
JoshRosen Jul 14, 2015
e965a79
[SPARK-9045] Fix Scala 2.11 build break in UnsafeExternalRowSorter
JoshRosen Jul 15, 2015
cc57d70
[SPARK-9050] [SQL] Remove unused newOrdering argument from Exchange (…
JoshRosen Jul 15, 2015
f957796
[SPARK-8820] [STREAMING] Add a configuration to set checkpoint dir.
SaintBacchus Jul 15, 2015
bb870e7
[SPARK-5523] [CORE] [STREAMING] Add a cache for hostname in TaskMetri…
jerryshao Jul 15, 2015
5572fd0
[HOTFIX] Adding new names to known contributors
pwendell Jul 15, 2015
f650a00
[SPARK-8808] [SPARKR] Fix assignments in SparkR.
Jul 15, 2015
f23a721
[SPARK-8993][SQL] More comprehensive type checking in expressions.
rxin Jul 15, 2015
c6b1a9e
Revert SPARK-6910 and SPARK-9027
marmbrus Jul 15, 2015
4692769
[SPARK-6259] [MLLIB] Python API for LDA
yu-iskw Jul 15, 2015
3f6296f
[SPARK-8018] [MLLIB] KMeans should accept initial cluster centers as …
FlytxtRnD Jul 15, 2015
f0e1297
[SPARK-8279][SQL]Add math function round
yjshen Jul 15, 2015
1bb8acc
[SPARK-8997] [MLLIB] Performance improvements in LocalPrefixSpan
Jul 15, 2015
14935d8
[HOTFIX][SQL] Unit test breaking.
rxin Jul 15, 2015
adb33d3
[SPARK-9012] [WEBUI] Escape Accumulators in the task table
zsxwing Jul 15, 2015
20bb10f
[SPARK-8706] [PYSPARK] [PROJECT INFRA] Add pylint checks to PySpark
MechCoder Jul 15, 2015
6f69025
[SPARK-8840] [SPARKR] Add float coercion on SparkR
viirya Jul 15, 2015
fa4ec36
[SPARK-9020][SQL] Support mutable state in code gen expressions
cloud-fan Jul 15, 2015
a938527
[SPARK-8221][SQL]Add pmod function
zhichao-li Jul 15, 2015
9716a72
[Minor][SQL] Allow spaces in the beginning and ending of string for I…
viirya Jul 15, 2015
303c120
[SPARK-7555] [DOCS] Add doc for elastic net in ml-guide and mllib-guide
coderxiang Jul 15, 2015
ec9b621
SPARK-9070 JavaDataFrameSuite teardown NPEs if setup failed
steveloughran Jul 15, 2015
536533c
[SPARK-9005] [MLLIB] Fix RegressionMetrics computation of explainedVa…
Jul 15, 2015
b9a922e
[SPARK-6602][Core]Replace Akka Serialization with Spark Serializer
zsxwing Jul 15, 2015
674eb2a
[SPARK-8974] Catch exceptions in allocation schedule task.
Jul 15, 2015
affbe32
[SPARK-9071][SQL] MonotonicallyIncreasingID and SparkPartitionID shou…
rxin Jul 15, 2015
5599cc4
Predicate pushdown to hive metastore
Jul 15, 2015
b3cb5af
Synchronize getPartitionsByFilter
Jul 17, 2015
acf96d1
Synchronize on hive client
Jul 17, 2015
f897087
Synchronize on this
Jul 17, 2015
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-7078] [SPARK-7079] Binary processing sort for Spark SQL
This patch adds a cache-friendly external sorter which operates on serialized bytes and uses this sorter to implement a new sort operator for Spark SQL and DataFrames.

### Overview of the new sorter

The new sorter design is inspired by [Alphasort](http://research.microsoft.com/pubs/68249/alphasort.doc) and implements a key-prefix optimization in order to improve the cache friendliness of the sort.  In naive sort implementations, the sorting algorithm operates on an array of record pointers.  To compare two records for ordering, the sorter must dereference these pointers, which likely involves random memory access, then compare the objects themselves.

![image](https://cloud.githubusercontent.com/assets/50748/8611390/3b1402ae-2675-11e5-8308-1a10bf347e6e.png)

In a key-prefix sort, the sort operates on an array which stores the record pointer alongside a prefix of the record's key. When comparing two records for ordering, the sorter first compares the the stored key prefixes. If the ordering can be determined from the key prefixes (i.e. the prefixes are unequal), then the sort can avoid directly comparing the records, avoiding random memory accesses and full record comparisons. For example, if we're sorting a list of strings then we can store the first 8 bytes of the UTF-8 encoded string as the key-prefix and can perform unsigned byte-at-a-time comparisons to determine the ordering of strings based on their prefixes, only resorting to full comparisons for strings that share a common prefix.  In cases where the sort key can fit entirely in the space allotted for the key prefix (e.g. the sorting key is an integer), we completely avoid direct record comparison.

In this patch's implementation of key-prefix sorting, our sorter's internal array stores a 64-bit long and 64-bit pointer for each record being sorted. The key prefixes are generated by the user when inserting records into the sorter, which uses a user-defined comparison function for comparing them.  The `PrefixComparators` object implements a set of comparators for many common types, including primitive numeric types and UTF-8 strings.

The actual sorting is implemented by `UnsafeInMemorySorter`.  Most consumers will not use this directly, but instead will use `UnsafeExternalSorter`, a class which implements a sort that can spill to disk in response to memory pressure.  Internally, `UnsafeExternalSorter` creates `UnsafeInMemorySorters` to perform sorting and uses `UnsafeSortSpillReader/Writer` to spill and read back runs of sorted records and `UnsafeSortSpillMerger` to merge multiple sorted spills into a single sorted iterator.  This external sorter integrates with Spark's existing ShuffleMemoryManager for controlling spilling.

Many parts of this sorter's design are based on / copied from the more specialized external sort implementation that I designed for the new UnsafeShuffleManager write path; see apache#5868 for more details on that patch.

### Sorting rows in Spark SQL

For now, `UnsafeExternalSorter` is only used by Spark SQL, which uses it to implement a new sort operator, `UnsafeExternalSort`.  This sort operator uses a SQL-specific class called `UnsafeExternalRowSorter` that configures an `UnsafeExternalSorter` to use prefix generators and comparators that operate on rows encoded in the UnsafeRow format that was designed for Project Tungsten.

I used some interesting unit-testing techniques to test this patch's SQL-specific components.  `UnsafeExternalSortSuite` uses the SQL random data generators introduced in apache#7176 to test the UnsafeSort operator with all atomic types both with and without nullability and in both ascending and descending sort orders.  `PrefixComparatorsSuite` contains a cool use of ScalaCheck + ScalaTest's `GeneratorDrivenPropertyChecks` in order to test UTF8String prefix comparison.

### Misc. additional improvements made in this patch

This patch made several miscellaneous improvements to related code in Spark SQL:

- The logic for selecting physical sort operator implementations, which was partially duplicated in both `Exchange` and `SparkStrategies, has now been consolidated into a `getSortOperator()` helper function in `SparkStrategies`.
- The `SparkPlanTest` unit testing helper trait has been extended with new methods for comparing the output produced by two different physical plans. This makes it easy to write tests which assert that two physical operator implementations should produce the same output.  I also added a method for disabling the implicit sorting of outputs prior to comparing them, a change which is necessary in order to be able to write proper SparkPlan tests for sort operators.

### Tasks deferred to followup patches

While most of this patch's features are reasonably well-tested and complete, there are a number of tasks that are intentionally being deferred to followup patches:

- Add tests which mock the ShuffleMemoryManager to check that memory pressure properly triggers spilling (there are examples of this type of test in apache#5868).
- Add tests to ensure that spill files are properly cleaned up after errors.  I'd like to do this in the context of a patch which introduces more general metrics for ensuring proper cleanup of tasks' temporary files; see https://issues.apache.org/jira/browse/SPARK-8966 for more details.
- Metrics integration: there are some open questions regarding how to track / report spill metrics for non-shuffle operations, so I've deferred most of the IO / shuffle metrics integration for now.
- Performance profiling.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/6444)
<!-- Reviewable:end -->

Author: Josh Rosen <[email protected]>

Closes apache#6444 from JoshRosen/sql-external-sort and squashes the following commits:

6beb467 [Josh Rosen] Remove a bunch of overloaded methods to avoid default args. issue
2bbac9c [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
35dad9f [Josh Rosen] Make sortAnswers = false the default in SparkPlanTest
5135200 [Josh Rosen] Fix spill reading for large rows; add test
2f48777 [Josh Rosen] Add test and fix bug for sorting empty arrays
d1e28bc [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
cd05866 [Josh Rosen] Fix scalastyle
3947fc1 [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
d13ac55 [Josh Rosen] Hacky approach to copying of UnsafeRows for sort followed by limit.
845bea3 [Josh Rosen] Remove unnecessary zeroing of row conversion buffer
c56ec18 [Josh Rosen] Clean up final row copying code.
d31f180 [Josh Rosen] Re-enable NullType sorting test now that SPARK-8868 is fixed
844f4ca [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
293f109 [Josh Rosen] Add missing license header.
f99a612 [Josh Rosen] Fix bugs in string prefix comparison.
9d00afc [Josh Rosen] Clean up prefix comparators for integral types
88aff18 [Josh Rosen] NULL_PREFIX has to be negative infinity for floating point types
613e16f [Josh Rosen] Test with larger data.
1d7ffaa [Josh Rosen] Somewhat hacky fix for descending sorts
08701e7 [Josh Rosen] Fix prefix comparison of null primitives.
b86e684 [Josh Rosen] Set global = true in UnsafeExternalSortSuite.
1c7bad8 [Josh Rosen] Make sorting of answers explicit in SparkPlanTest.checkAnswer().
b81a920 [Josh Rosen] Temporarily enable only the passing sort tests
5d6109d [Josh Rosen] Fix inconsistent handling / encoding of record lengths.
87b6ed9 [Josh Rosen] Fix critical issues in test which led to false negatives.
8d7fbe7 [Josh Rosen] Fixes to multiple spilling-related bugs.
82e21c1 [Josh Rosen] Force spilling in UnsafeExternalSortSuite.
88b72db [Josh Rosen] Test ascending and descending sort orders.
f27be09 [Josh Rosen] Fix tests by binding attributes.
0a79d39 [Josh Rosen] Revert "Undo part of a SparkPlanTest change in apache#7162 that broke my test."
7c3c864 [Josh Rosen] Undo part of a SparkPlanTest change in apache#7162 that broke my test.
9969c14 [Josh Rosen] Merge remote-tracking branch 'origin/master' into sql-external-sort
5822e6f [Josh Rosen] Fix test compilation issue
939f824 [Josh Rosen] Remove code gen experiment.
0dfe919 [Josh Rosen] Implement prefix sort for strings (albeit inefficiently).
66a813e [Josh Rosen] Prefix comparators for float and double
b310c88 [Josh Rosen] Integrate prefix comparators for Int and Long (others coming soon)
95058d9 [Josh Rosen] Add missing SortPrefixUtils file
4c37ba6 [Josh Rosen] Add tests for sorting on all primitive types.
6890863 [Josh Rosen] Fix memory leak on empty inputs.
d246e29 [Josh Rosen] Fix consideration of column types when choosing sort implementation.
6b156fb [Josh Rosen] Some WIP work on prefix comparison.
7f875f9 [Josh Rosen] Commit failing test demonstrating bug in handling objects in spills
41b8881 [Josh Rosen] Get UnsafeInMemorySorterSuite to pass (WIP)
90c2b6a [Josh Rosen] Update test name
6d6a1e6 [Josh Rosen] Centralize logic for picking sort operator implementations
9869ec2 [Josh Rosen] Clean up Exchange code a bit
82bb0ec [Josh Rosen] Fix IntelliJ complaint due to negated if condition
1db845a [Josh Rosen] Many more changes to harmonize with shuffle sorter
ebf9eea [Josh Rosen] Harmonization with shuffle's unsafe sorter
206bfa2 [Josh Rosen] Add some missing newlines at the ends of files
26c8931 [Josh Rosen] Back out some Hive changes that aren't needed anymore
62f0bb8 [Josh Rosen] Update to reflect SparkPlanTest changes
21d7d93 [Josh Rosen] Back out of BlockObjectWriter change
7eafecf [Josh Rosen] Port test to SparkPlanTest
d468a88 [Josh Rosen] Update for InternalRow refactoring
269cf86 [Josh Rosen] Back out SMJ operator change; isolate changes to selection of sort op.
1b841ca [Josh Rosen] WIP towards copying
b420a71 [Josh Rosen] Move most of the existing SMJ code into Java.
dfdb93f [Josh Rosen] SparkFunSuite change
73cc761 [Josh Rosen] Fix whitespace
9cc98f5 [Josh Rosen] Move more code to Java; fix bugs in UnsafeRowConverter length type.
c8792de [Josh Rosen] Remove some debug logging
dda6752 [Josh Rosen] Commit some missing code from an old git stash.
58f36d0 [Josh Rosen] Merge in a sketch of a unit test for the new sorter (now failing).
2bd8c9a [Josh Rosen] Import my original tests and get them to pass.
d5d3106 [Josh Rosen] WIP towards external sorter for Spark SQL.
  • Loading branch information
JoshRosen authored and rxin committed Jul 10, 2015
commit fb8807c9b04f27467b36fc9d0177ef92dd012670
20 changes: 10 additions & 10 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -343,28 +343,28 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<groupId>org.scalacheck</groupId>
<artifactId>scalacheck_${scala.binary.version}</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.shuffle.unsafe;
package org.apache.spark.serializer;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -24,9 +24,7 @@

import scala.reflect.ClassTag;

import org.apache.spark.serializer.DeserializationStream;
import org.apache.spark.serializer.SerializationStream;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.PlatformDependent;

/**
Expand All @@ -35,7 +33,8 @@
* `write() OutputStream methods), but DiskBlockObjectWriter still calls some methods on it. To work
* around this, we pass a dummy no-op serializer.
*/
final class DummySerializerInstance extends SerializerInstance {
@Private
public final class DummySerializerInstance extends SerializerInstance {

public static final DummySerializerInstance INSTANCE = new DummySerializerInstance();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.serializer.DummySerializerInstance;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.*;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

import org.apache.spark.annotation.Private;

/**
* Compares 8-byte key prefixes in prefix sort. Subclasses may implement type-specific
* comparisons, such as lexicographic comparison for strings.
*/
@Private
public abstract class PrefixComparator {
public abstract int compare(long prefix1, long prefix2);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

import com.google.common.base.Charsets;
import com.google.common.primitives.Longs;
import com.google.common.primitives.UnsignedBytes;

import org.apache.spark.annotation.Private;
import org.apache.spark.unsafe.types.UTF8String;

@Private
public class PrefixComparators {
private PrefixComparators() {}

public static final StringPrefixComparator STRING = new StringPrefixComparator();
public static final IntegralPrefixComparator INTEGRAL = new IntegralPrefixComparator();
public static final FloatPrefixComparator FLOAT = new FloatPrefixComparator();
public static final DoublePrefixComparator DOUBLE = new DoublePrefixComparator();

public static final class StringPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
// TODO: can done more efficiently
byte[] a = Longs.toByteArray(aPrefix);
byte[] b = Longs.toByteArray(bPrefix);
for (int i = 0; i < 8; i++) {
int c = UnsignedBytes.compare(a[i], b[i]);
if (c != 0) return c;
}
return 0;
}

public long computePrefix(byte[] bytes) {
if (bytes == null) {
return 0L;
} else {
byte[] padded = new byte[8];
System.arraycopy(bytes, 0, padded, 0, Math.min(bytes.length, 8));
return Longs.fromByteArray(padded);
}
}

public long computePrefix(String value) {
return value == null ? 0L : computePrefix(value.getBytes(Charsets.UTF_8));
}

public long computePrefix(UTF8String value) {
return value == null ? 0L : computePrefix(value.getBytes());
}
}

/**
* Prefix comparator for all integral types (boolean, byte, short, int, long).
*/
public static final class IntegralPrefixComparator extends PrefixComparator {
@Override
public int compare(long a, long b) {
return (a < b) ? -1 : (a > b) ? 1 : 0;
}

public final long NULL_PREFIX = Long.MIN_VALUE;
}

public static final class FloatPrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
float a = Float.intBitsToFloat((int) aPrefix);
float b = Float.intBitsToFloat((int) bPrefix);
return (a < b) ? -1 : (a > b) ? 1 : 0;
}

public long computePrefix(float value) {
return Float.floatToIntBits(value) & 0xffffffffL;
}

public final long NULL_PREFIX = computePrefix(Float.NEGATIVE_INFINITY);
}

public static final class DoublePrefixComparator extends PrefixComparator {
@Override
public int compare(long aPrefix, long bPrefix) {
double a = Double.longBitsToDouble(aPrefix);
double b = Double.longBitsToDouble(bPrefix);
return (a < b) ? -1 : (a > b) ? 1 : 0;
}

public long computePrefix(double value) {
return Double.doubleToLongBits(value);
}

public final long NULL_PREFIX = computePrefix(Double.NEGATIVE_INFINITY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

/**
* Compares records for ordering. In cases where the entire sorting key can fit in the 8-byte
* prefix, this may simply return 0.
*/
public abstract class RecordComparator {

/**
* Compare two records for order.
*
* @return a negative integer, zero, or a positive integer as the first record is less than,
* equal to, or greater than the second.
*/
public abstract int compare(
Object leftBaseObject,
long leftBaseOffset,
Object rightBaseObject,
long rightBaseOffset);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.util.collection.unsafe.sort;

final class RecordPointerAndKeyPrefix {
/**
* A pointer to a record; see {@link org.apache.spark.unsafe.memory.TaskMemoryManager} for a
* description of how these addresses are encoded.
*/
public long recordPointer;

/**
* A key prefix, for use in comparisons.
*/
public long keyPrefix;
}
Loading