@@ -86,9 +86,9 @@ import org.apache.spark.storage.{BlockId, BlockObjectWriter}
8686 * - Users are expected to call stop() at the end to delete all the intermediate files.
8787 */
8888private [spark] abstract class ExternalSorter [K , V , C ](
89- partitioner : Option [Partitioner ] = None ,
90- ordering : Option [Ordering [K ]] = None ,
91- serializer : Option [Serializer ] = None )
89+ partitioner : Option [Partitioner ] = None ,
90+ ordering : Option [Ordering [K ]] = None ,
91+ serializer : Option [Serializer ] = None )
9292 extends Logging
9393 with Spillable [WritablePartitionedPairCollection [K , C ]]
9494 with SortShuffleFileWriter [K , V ] {
@@ -324,10 +324,10 @@ private[spark] abstract class ExternalSorter[K, V, C](
324324 * they're not), we still merge them by doing equality tests for all keys that compare as equal.
325325 */
326326 protected def mergeWithAggregation (
327- iterators : Seq [Iterator [Product2 [K , C ]]],
328- mergeCombiners : (C , C ) => C ,
329- comparator : Comparator [K ],
330- totalOrder : Boolean ): Iterator [Product2 [K , C ]] =
327+ iterators : Seq [Iterator [Product2 [K , C ]]],
328+ mergeCombiners : (C , C ) => C ,
329+ comparator : Comparator [K ],
330+ totalOrder : Boolean ): Iterator [Product2 [K , C ]] =
331331 {
332332 if (! totalOrder) {
333333 // We only have a partial ordering, e.g. comparing the keys by hash code, which means that
@@ -558,8 +558,8 @@ private[spark] abstract class ExternalSorter[K, V, C](
558558 @ VisibleForTesting
559559 def partitionedIterator : Iterator [(Int , Iterator [Product2 [K , C ]])]
560560
561- protected def partitionedIterator (collection : WritablePartitionedPairCollection [K , C ]):
562- Iterator [(Int , Iterator [Product2 [K , C ]])] = {
561+ protected def partitionedIterator (collection : WritablePartitionedPairCollection [K , C ])
562+ : Iterator [(Int , Iterator [Product2 [K , C ]])] = {
563563 if (spills.isEmpty) {
564564 // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps
565565 // we don't even need to sort by anything other than partition ID
@@ -592,15 +592,15 @@ private[spark] abstract class ExternalSorter[K, V, C](
592592 * This interface abstracts away aggregator dependence.
593593 */
594594 override def writePartitionedFile (
595- blockId : BlockId ,
596- context : TaskContext ,
597- outputFile : File ): Array [Long ]
595+ blockId : BlockId ,
596+ context : TaskContext ,
597+ outputFile : File ): Array [Long ]
598598
599599 protected def writePartitionedFile (
600- blockId : BlockId ,
601- context : TaskContext ,
602- outputFile : File ,
603- collection : WritablePartitionedPairCollection [K , C ]): Array [Long ] = {
600+ blockId : BlockId ,
601+ context : TaskContext ,
602+ outputFile : File ,
603+ collection : WritablePartitionedPairCollection [K , C ]): Array [Long ] = {
604604
605605 // Track location of each range in the output file
606606 val lengths = new Array [Long ](numPartitions)
@@ -664,8 +664,8 @@ private[spark] abstract class ExternalSorter[K, V, C](
664664 * stream, assuming this partition is the next one to be read. Used to make it easier to return
665665 * partitioned iterators from our in-memory collection.
666666 */
667- protected [this ] class IteratorForPartition (partitionId : Int , data : BufferedIterator [(( Int , K ), C )])
668- extends Iterator [Product2 [K , C ]]
667+ protected [this ] class IteratorForPartition (partitionId : Int ,
668+ data : BufferedIterator [(( Int , K ), C )]) extends Iterator [Product2 [K , C ]]
669669 {
670670 override def hasNext : Boolean = data.hasNext && data.head._1._1 == partitionId
671671
0 commit comments