Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
0092abb
Some minor cleanup after SPARK-4550.
sryza May 6, 2015
1fd31ba
[SPARK-6231][SQL/DF] Automatically resolve join condition ambiguity f…
rxin May 6, 2015
51b3d41
Revert "[SPARK-3454] separate json endpoints for data in the UI"
rxin May 6, 2015
a466944
[SPARK-6841] [SPARKR] add support for mean, median, stdev etc.
hqzizania May 6, 2015
ba2b566
[SPARK-7358][SQL] Move DataFrame mathfunctions into functions
brkyvz May 6, 2015
7b14578
[SPARK-6267] [MLLIB] Python API for IsotonicRegression
yanboliang May 6, 2015
9f019c7
[SPARK-7384][Core][Tests] Fix flaky tests for distributed mode in Bro…
zsxwing May 6, 2015
32cdc81
[SPARK-6940] [MLLIB] Add CrossValidator to Python ML pipeline API
mengxr May 6, 2015
322e7e7
[SQL] JavaDoc update for various DataFrame functions.
rxin May 6, 2015
150f671
[SPARK-5456] [SQL] fix decimal compare for jdbc rdd
adrian-wang May 6, 2015
c3eb441
[SPARK-6201] [SQL] promote string and do widen types for IN
adrian-wang May 6, 2015
f2c4708
[SPARK-1442] [SQL] Window Function Support for Spark SQL
yhuai May 6, 2015
002c123
[SPARK-7311] Introduce internal Serializer API for determining if ser…
JoshRosen May 6, 2015
845d1d4
Add `Private` annotation.
JoshRosen May 6, 2015
7740996
[HOT-FIX] Move HiveWindowFunctionQuerySuite.scala to hive compatibili…
yhuai May 6, 2015
1ad04da
[SPARK-5995] [ML] Make Prediction dev API public
jkbradley May 6, 2015
fbf1f34
[HOT FIX] [SPARK-7418] Ignore flaky SparkSubmitUtilsSuite test
May 7, 2015
4e93042
[SPARK-6799] [SPARKR] Remove SparkR RDD examples, add dataframe examples
shivaram May 7, 2015
316a5c0
[SPARK-7396] [STREAMING] [EXAMPLE] Update KafkaWordCountProducer to u…
jerryshao May 7, 2015
8fa6829
[SPARK-7371] [SPARK-7377] [SPARK-7408] DAG visualization addendum (#5…
May 7, 2015
71a452b
[HOT FIX] For DAG visualization #5954
May 7, 2015
14502d5
[SPARK-7405] [STREAMING] Fix the bug that ReceiverInputDStream doesn'…
zsxwing May 7, 2015
773aa25
[SPARK-7432] [MLLIB] disable cv doctest
mengxr May 7, 2015
9cfa9a5
[SPARK-6812] [SPARKR] filter() on DataFrame does not work as expected.
May 7, 2015
2d6612c
[SPARK-5938] [SPARK-5443] [SQL] Improve JsonRDD performance
May 7, 2015
cfdadcb
[SPARK-7430] [STREAMING] [TEST] General improvements to streaming tes…
tdas May 7, 2015
01187f5
[SPARK-7217] [STREAMING] Add configuration to control the default beh…
tdas May 7, 2015
fa8fddf
[SPARK-7295][SQL] bitwise operations for DataFrame DSL
Shiti May 7, 2015
fae4e2d
[SPARK-7035] Encourage __getitem__ over __getattr__ on column access …
ksonj May 7, 2015
8b6b46e
[SPARK-7421] [MLLIB] OnlineLDA cleanups
jkbradley May 7, 2015
4f87e95
[SPARK-7429] [ML] Params cleanups
jkbradley May 7, 2015
ed9be06
[SPARK-7330] [SQL] avoid NPE at jdbc rdd
adrian-wang May 7, 2015
9e2ffb1
[SPARK-7388] [SPARK-7383] wrapper for VectorAssembler in Python
brkyvz May 7, 2015
068c315
[SPARK-7118] [Python] Add the coalesce Spark SQL function available i…
May 7, 2015
1712a7c
[SPARK-6093] [MLLIB] Add RegressionMetrics in PySpark/MLlib
yanboliang May 7, 2015
5784c8d
[SPARK-1442] [SQL] [FOLLOW-UP] Address minor comments in Window Funct…
yhuai May 7, 2015
dec8f53
[SPARK-7116] [SQL] [PYSPARK] Remove cache() causing memory leak
ksonj May 7, 2015
074d75d
[SPARK-5213] [SQL] Remove the duplicated SparkSQLParser
chenghao-intel May 7, 2015
0c33bf8
[SPARK-7399] [SPARK CORE] Fixed compilation error in scala 2.11
May 7, 2015
4eecf55
[SPARK-7373] [MESOS] Add docker support for launching drivers in meso…
tnachen May 7, 2015
f121651
[SPARK-7391] DAG visualization: auto expand if linked from another viz
May 7, 2015
88717ee
[SPARK-7347] DAG visualization: add tooltips to RDDs
May 7, 2015
347a329
[SPARK-7328] [MLLIB] [PYSPARK] Pyspark.mllib.linalg.Vectors: Missing …
MechCoder May 7, 2015
658a478
[SPARK-5726] [MLLIB] Elementwise (Hadamard) Vector Product Transformer
ogeagla May 7, 2015
e43803b
[SPARK-6948] [MLLIB] compress vectors in VectorAssembler
mengxr May 7, 2015
97d1182
[SQL] [MINOR] make star and multialias extend NamedExpression
scwf May 7, 2015
ea3077f
[SPARK-7277] [SQL] Throw exception if the property mapred.reduce.task…
viirya May 7, 2015
937ba79
[SPARK-5281] [SQL] Registering table on RDD is giving MissingRequirem…
dragos May 7, 2015
35f0173
[SPARK-2155] [SQL] [WHEN D THEN E] [ELSE F] add CaseKeyWhen for "CASE…
cloud-fan May 7, 2015
88063c6
[SPARK-7450] Use UNSAFE.getLong() to speed up BitSetMethods#anySet()
tedyu May 7, 2015
22ab70e
[SPARK-7305] [STREAMING] [WEBUI] Make BatchPage show friendly informa…
zsxwing May 8, 2015
cd1d411
[SPARK-6908] [SQL] Use isolated Hive client
marmbrus May 8, 2015
92f8f80
[SPARK-7452] [MLLIB] fix bug in topBykey and update test
coderxiang May 8, 2015
3af423c
[SPARK-6986] [SQL] Use Serializer2 in more cases.
yhuai May 8, 2015
714db2e
[SPARK-7470] [SQL] Spark shell SQLContext crashes without hive
May 8, 2015
f496bf3
[SPARK-7232] [SQL] Add a Substitution batch for spark sql analyzer
scwf May 8, 2015
c2f0821
[SPARK-7392] [CORE] bugfix: Kryo buffer size cannot be larger than 2M
liyezhang556520 May 8, 2015
ebff732
[SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH
lianhuiwang May 8, 2015
c796be7
[SPARK-3454] separate json endpoints for data in the UI
squito May 8, 2015
f5ff4a8
[SPARK-7383] [ML] Feature Parity in PySpark for ml.features
brkyvz May 8, 2015
65afd3c
[SPARK-7474] [MLLIB] update ParamGridBuilder doctest
mengxr May 8, 2015
008a60d
[SPARK-6824] Fill the docs for DataFrame API in SparkR
hqzizania May 8, 2015
35d6a99
[SPARK-7436] Fixed instantiation of custom recovery mode factory and …
jacek-lewandowski May 8, 2015
a1ec08f
[SPARK-7298] Harmonize style of new visualizations
mateiz May 8, 2015
2d05f32
[SPARK-7133] [SQL] Implement struct, array, and map field accessor
cloud-fan May 8, 2015
4b3bb0e
[SPARK-6627] Finished rename to ShuffleBlockResolver
kayousterhout May 8, 2015
25889d8
[SPARK-7490] [CORE] [Minor] MapOutputTracker.deserializeMapStatuses: …
May 8, 2015
dc71e47
[MINOR] Ignore python/lib/pyspark.zip
zsxwing May 8, 2015
c45c09b
[WEBUI] Remove debug feature for vis.js
sarutak May 8, 2015
4e7360e
[SPARK-7489] [SPARK SHELL] Spark shell crashes when compiled with sca…
vinodkc May 8, 2015
31da40d
[MINOR] Defeat early garbage collection of test suite variable
tellison May 8, 2015
3b0c5e7
[SPARK-7466] DAG visualization: fix orphan nodes
May 8, 2015
9042f8f
[MINOR] [CORE] Allow History Server to read kerberos opts from config…
May 8, 2015
5467c34
[SPARK-7378] [CORE] Handle deep links to unloaded apps.
May 8, 2015
90527f5
[SPARK-7390] [SQL] Only merge other CovarianceCounter when its count …
viirya May 8, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-6627] Finished rename to ShuffleBlockResolver
The previous cleanup-commit for SPARK-6627 renamed ShuffleBlockManager
to ShuffleBlockResolver, but didn't rename the associated subclasses and
variables; this commit does that.

I'm unsure whether it's ok to rename ExternalShuffleBlockManager, since that's technically a public class?

cc pwendell

Author: Kay Ousterhout <[email protected]>

Closes apache#5764 from kayousterhout/SPARK-6627 and squashes the following commits:

43add1e [Kay Ousterhout] Spacing fix
96080bf [Kay Ousterhout] Test fixes
d8a5d36 [Kay Ousterhout] [SPARK-6627] Finished rename to ShuffleBlockResolver
  • Loading branch information
kayousterhout authored and JoshRosen committed May 8, 2015
commit 4b3bb0e43ca7e1a27308516608419487b6a844e6
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle

import java.io.File
import java.nio.ByteBuffer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -29,7 +28,7 @@ import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.FileShuffleBlockManager.ShuffleFileGroup
import org.apache.spark.shuffle.FileShuffleBlockResolver.ShuffleFileGroup
import org.apache.spark.storage._
import org.apache.spark.util.{MetadataCleaner, MetadataCleanerType, TimeStampedHashMap}
import org.apache.spark.util.collection.{PrimitiveKeyOpenHashMap, PrimitiveVector}
Expand Down Expand Up @@ -64,9 +63,8 @@ private[spark] trait ShuffleWriterGroup {
* files within a ShuffleFileGroups associated with the block's reducer.
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getHashBasedShuffleBlockData().
private[spark]
class FileShuffleBlockManager(conf: SparkConf)
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getHashBasedShuffleBlockData().
private[spark] class FileShuffleBlockResolver(conf: SparkConf)
extends ShuffleBlockResolver with Logging {

private val transportConf = SparkTransportConf.fromSparkConf(conf)
Expand Down Expand Up @@ -242,8 +240,7 @@ class FileShuffleBlockManager(conf: SparkConf)
}
}

private[spark]
object FileShuffleBlockManager {
private[spark] object FileShuffleBlockResolver {
/**
* A group of shuffle files, one per reducer.
* A particular mapper will be assigned a single ShuffleFileGroup to write its output to.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.shuffle

import java.io._
import java.nio.ByteBuffer

import com.google.common.io.ByteStreams

Expand All @@ -28,7 +27,7 @@ import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.storage._
import org.apache.spark.util.Utils

import IndexShuffleBlockManager.NOOP_REDUCE_ID
import IndexShuffleBlockResolver.NOOP_REDUCE_ID

/**
* Create and maintain the shuffle blocks' mapping between logic block and physical file location.
Expand All @@ -40,9 +39,8 @@ import IndexShuffleBlockManager.NOOP_REDUCE_ID
*
*/
// Note: Changes to the format in this file should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getSortBasedShuffleBlockData().
private[spark]
class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getSortBasedShuffleBlockData().
private[spark] class IndexShuffleBlockResolver(conf: SparkConf) extends ShuffleBlockResolver {

private lazy val blockManager = SparkEnv.get.blockManager

Expand Down Expand Up @@ -115,7 +113,7 @@ class IndexShuffleBlockManager(conf: SparkConf) extends ShuffleBlockResolver {
override def stop(): Unit = {}
}

private[spark] object IndexShuffleBlockManager {
private[spark] object IndexShuffleBlockResolver {
// No-op reduce ID used in interactions with disk store and BlockObjectWriter.
// The disk store currently expects puts to relate to a (map, reduce) pair, but in the sort
// shuffle outputs for several reduces are glommed into a single file.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.shuffle._
*/
private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager {

private val fileShuffleBlockManager = new FileShuffleBlockManager(conf)
private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)

/* Register a shuffle with the manager and obtain a handle for it to pass to tasks. */
override def registerShuffle[K, V, C](
Expand Down Expand Up @@ -61,8 +61,8 @@ private[spark] class HashShuffleManager(conf: SparkConf) extends ShuffleManager
shuffleBlockResolver.removeShuffle(shuffleId)
}

override def shuffleBlockResolver: FileShuffleBlockManager = {
fileShuffleBlockManager
override def shuffleBlockResolver: FileShuffleBlockResolver = {
fileShuffleBlockResolver
}

/** Shut down this ShuffleManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle._
import org.apache.spark.storage.BlockObjectWriter

private[spark] class HashShuffleWriter[K, V](
shuffleBlockManager: FileShuffleBlockManager,
shuffleBlockResolver: FileShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, _],
mapId: Int,
context: TaskContext)
Expand All @@ -45,7 +45,7 @@ private[spark] class HashShuffleWriter[K, V](

private val blockManager = SparkEnv.get.blockManager
private val ser = Serializer.getSerializer(dep.serializer.getOrElse(null))
private val shuffle = shuffleBlockManager.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
private val shuffle = shuffleBlockResolver.forMapTask(dep.shuffleId, mapId, numOutputSplits, ser,
writeMetrics)

/** Write a bunch of records to this task's output */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.shuffle.hash.HashShuffleReader

private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager {

private val indexShuffleBlockManager = new IndexShuffleBlockManager(conf)
private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()

/**
Expand Down Expand Up @@ -72,8 +72,8 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager
true
}

override def shuffleBlockResolver: IndexShuffleBlockManager = {
indexShuffleBlockManager
override def shuffleBlockResolver: IndexShuffleBlockResolver = {
indexShuffleBlockResolver
}

/** Shut down this ShuffleManager. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package org.apache.spark.shuffle.sort
import org.apache.spark.{MapOutputTracker, SparkEnv, Logging, TaskContext}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.scheduler.MapStatus
import org.apache.spark.shuffle.{IndexShuffleBlockManager, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.shuffle.{IndexShuffleBlockResolver, ShuffleWriter, BaseShuffleHandle}
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.util.collection.ExternalSorter

private[spark] class SortShuffleWriter[K, V, C](
shuffleBlockManager: IndexShuffleBlockManager,
shuffleBlockResolver: IndexShuffleBlockResolver,
handle: BaseShuffleHandle[K, V, C],
mapId: Int,
context: TaskContext)
Expand Down Expand Up @@ -65,10 +65,10 @@ private[spark] class SortShuffleWriter[K, V, C](
// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val outputFile = shuffleBlockManager.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockManager.NOOP_REDUCE_ID)
val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, context, outputFile)
shuffleBlockManager.writeIndexFile(dep.shuffleId, mapId, partitionLengths)
shuffleBlockResolver.writeIndexFile(dep.shuffleId, mapId, partitionLengths)

mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
}
Expand All @@ -84,7 +84,7 @@ private[spark] class SortShuffleWriter[K, V, C](
return Option(mapStatus)
} else {
// The map task failed, so delete our output data.
shuffleBlockManager.removeDataByMap(dep.shuffleId, mapId)
shuffleBlockResolver.removeDataByMap(dep.shuffleId, mapId)
return None
}
} finally {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/storage/BlockId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ case class RDDBlockId(rddId: Int, splitIndex: Int) extends BlockId {
}

// Format of the shuffle block ids (including data and index) should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getBlockData().
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getBlockData().
@DeveloperApi
case class ShuffleBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,11 @@ private[spark] class BlockManager(
// As an optimization for map output fetches, if the block is for a shuffle, return it
// without acquiring a lock; the disk store never deletes (recent) items so this should work
if (blockId.isShuffle) {
val shuffleBlockManager = shuffleManager.shuffleBlockResolver
val shuffleBlockResolver = shuffleManager.shuffleBlockResolver
// TODO: This should gracefully handle case where local block is not available. Currently
// downstream code will throw an exception.
Option(shuffleBlockManager.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
Option(
shuffleBlockResolver.getBlockData(blockId.asInstanceOf[ShuffleBlockId]).nioByteBuffer())
} else {
doGetLocal(blockId, asBlockResult = false).asInstanceOf[Option[ByteBuffer]]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon

/** Looks up a file by hashing it into one of our local subdirectories. */
// This method should be kept in sync with
// org.apache.spark.network.shuffle.StandaloneShuffleBlockManager#getFile().
// org.apache.spark.network.shuffle.ExternalShuffleBlockResolver#getFile().
def getFile(filename: String): File = {
// Figure out which local directory it hashes to, and which subdirectory in that
val hash = Utils.nonNegativeHash(filename)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{SparkEnv, SparkContext, LocalSparkContext, SparkConf}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.shuffle.FileShuffleBlockManager
import org.apache.spark.shuffle.FileShuffleBlockResolver
import org.apache.spark.storage.{ShuffleBlockId, FileSegment}

class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
Expand All @@ -53,10 +53,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {

sc = new SparkContext("local", "test", conf)

val shuffleBlockManager =
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockManager]
val shuffleBlockResolver =
SparkEnv.get.shuffleManager.shuffleBlockResolver.asInstanceOf[FileShuffleBlockResolver]

val shuffle1 = shuffleBlockManager.forMapTask(1, 1, 1, new JavaSerializer(conf),
val shuffle1 = shuffleBlockResolver.forMapTask(1, 1, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)
for (writer <- shuffle1.writers) {
writer.write("test1", "value")
Expand All @@ -69,7 +69,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
val shuffle1Segment = shuffle1.writers(0).fileSegment()
shuffle1.releaseWriters(success = true)

val shuffle2 = shuffleBlockManager.forMapTask(1, 2, 1, new JavaSerializer(conf),
val shuffle2 = shuffleBlockResolver.forMapTask(1, 2, 1, new JavaSerializer(conf),
new ShuffleWriteMetrics)

for (writer <- shuffle2.writers) {
Expand All @@ -88,7 +88,7 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
// of block based on remaining data in file : which could mess things up when there is
// concurrent read and writes happening to the same shuffle group.

val shuffle3 = shuffleBlockManager.forMapTask(1, 3, 1, new JavaSerializer(testConf),
val shuffle3 = shuffleBlockResolver.forMapTask(1, 3, 1, new JavaSerializer(testConf),
new ShuffleWriteMetrics)
for (writer <- shuffle3.writers) {
writer.write("test3", "value")
Expand All @@ -98,10 +98,10 @@ class HashShuffleManagerSuite extends FunSuite with LocalSparkContext {
writer.commitAndClose()
}
// check before we register.
checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffle3.releaseWriters(success = true)
checkSegments(shuffle2Segment, shuffleBlockManager.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffleBlockManager.removeShuffle(1)
checkSegments(shuffle2Segment, shuffleBlockResolver.getBlockData(ShuffleBlockId(1, 2, 0)))
shuffleBlockResolver.removeShuffle(1)
}

def writeToFile(file: File, numBytes: Int) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,18 @@
public class ExternalShuffleBlockHandler extends RpcHandler {
private final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockHandler.class);

private final ExternalShuffleBlockManager blockManager;
private final ExternalShuffleBlockResolver blockManager;
private final OneForOneStreamManager streamManager;

public ExternalShuffleBlockHandler(TransportConf conf) {
this(new OneForOneStreamManager(), new ExternalShuffleBlockManager(conf));
this(new OneForOneStreamManager(), new ExternalShuffleBlockResolver(conf));
}

/** Enables mocking out the StreamManager and BlockManager. */
@VisibleForTesting
ExternalShuffleBlockHandler(
OneForOneStreamManager streamManager,
ExternalShuffleBlockManager blockManager) {
ExternalShuffleBlockResolver blockManager) {
this.streamManager = streamManager;
this.blockManager = blockManager;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@
* Manages converting shuffle BlockIds into physical segments of local files, from a process outside
* of Executors. Each Executor must register its own configuration about where it stores its files
* (local dirs) and how (shuffle manager). The logic for retrieval of individual files is replicated
* from Spark's FileShuffleBlockManager and IndexShuffleBlockManager.
* from Spark's FileShuffleBlockResolver and IndexShuffleBlockResolver.
*
* Executors with shuffle file consolidation are not currently supported, as the index is stored in
* the Executor's memory, unlike the IndexShuffleBlockManager.
* the Executor's memory, unlike the IndexShuffleBlockResolver.
*/
public class ExternalShuffleBlockManager {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockManager.class);
public class ExternalShuffleBlockResolver {
private static final Logger logger = LoggerFactory.getLogger(ExternalShuffleBlockResolver.class);

// Map containing all registered executors' metadata.
private final ConcurrentMap<AppExecId, ExecutorShuffleInfo> executors;
Expand All @@ -60,15 +60,15 @@ public class ExternalShuffleBlockManager {

private final TransportConf conf;

public ExternalShuffleBlockManager(TransportConf conf) {
public ExternalShuffleBlockResolver(TransportConf conf) {
this(conf, Executors.newSingleThreadExecutor(
// Add `spark` prefix because it will run in NM in Yarn mode.
NettyUtils.createThreadFactory("spark-shuffle-directory-cleaner")));
}

// Allows tests to have more control over when directories are cleaned up.
@VisibleForTesting
ExternalShuffleBlockManager(TransportConf conf, Executor directoryCleaner) {
ExternalShuffleBlockResolver(TransportConf conf, Executor directoryCleaner) {
this.conf = conf;
this.executors = Maps.newConcurrentMap();
this.directoryCleaner = directoryCleaner;
Expand Down Expand Up @@ -168,7 +168,7 @@ private void deleteExecutorDirs(String[] dirs) {

/**
* Hash-based shuffle data is simply stored as one file per block.
* This logic is from FileShuffleBlockManager.
* This logic is from FileShuffleBlockResolver.
*/
// TODO: Support consolidated hash shuffle files
private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor, String blockId) {
Expand All @@ -178,7 +178,7 @@ private ManagedBuffer getHashBasedShuffleBlockData(ExecutorShuffleInfo executor,

/**
* Sort-based shuffle data uses an index called "shuffle_ShuffleId_MapId_0.index" into a data file
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockManager,
* called "shuffle_ShuffleId_MapId_0.data". This logic is from IndexShuffleBlockResolver,
* and the block id format is from ShuffleDataBlockId and ShuffleIndexBlockId.
*/
private ManagedBuffer getSortBasedShuffleBlockData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ public class ExternalShuffleBlockHandlerSuite {
TransportClient client = mock(TransportClient.class);

OneForOneStreamManager streamManager;
ExternalShuffleBlockManager blockManager;
ExternalShuffleBlockResolver blockResolver;
RpcHandler handler;

@Before
public void beforeEach() {
streamManager = mock(OneForOneStreamManager.class);
blockManager = mock(ExternalShuffleBlockManager.class);
handler = new ExternalShuffleBlockHandler(streamManager, blockManager);
blockResolver = mock(ExternalShuffleBlockResolver.class);
handler = new ExternalShuffleBlockHandler(streamManager, blockResolver);
}

@Test
Expand All @@ -62,7 +62,7 @@ public void testRegisterExecutor() {
ExecutorShuffleInfo config = new ExecutorShuffleInfo(new String[] {"/a", "/b"}, 16, "sort");
byte[] registerMessage = new RegisterExecutor("app0", "exec1", config).toByteArray();
handler.receive(client, registerMessage, callback);
verify(blockManager, times(1)).registerExecutor("app0", "exec1", config);
verify(blockResolver, times(1)).registerExecutor("app0", "exec1", config);

verify(callback, times(1)).onSuccess((byte[]) any());
verify(callback, never()).onFailure((Throwable) any());
Expand All @@ -75,12 +75,12 @@ public void testOpenShuffleBlocks() {

ManagedBuffer block0Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[3]));
ManagedBuffer block1Marker = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
when(blockManager.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
when(blockManager.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
when(blockResolver.getBlockData("app0", "exec1", "b0")).thenReturn(block0Marker);
when(blockResolver.getBlockData("app0", "exec1", "b1")).thenReturn(block1Marker);
byte[] openBlocks = new OpenBlocks("app0", "exec1", new String[] { "b0", "b1" }).toByteArray();
handler.receive(client, openBlocks, callback);
verify(blockManager, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockManager, times(1)).getBlockData("app0", "exec1", "b1");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b0");
verify(blockResolver, times(1)).getBlockData("app0", "exec1", "b1");

ArgumentCaptor<byte[]> response = ArgumentCaptor.forClass(byte[].class);
verify(callback, times(1)).onSuccess(response.capture());
Expand Down
Loading