Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
[SPARK-7263] Add new shuffle manager which stores shuffle blocks in P…
…arquet

This commit adds a new Spark shuffle manager which reads and writes shuffle data to Apache
Parquet files. Parquet has a File interface (not a streaming interface) because it is
column-oriented and seeks in a File for metadata information, e.g. schemas, statistics.
As such, this implementation fetches remote data to local, temporary blocks before the
data is passed to Parquet for reading.

This managers uses the following spark configuration parameters to configure Parquet:
spark.shuffle.parquet.{compression, blocksize, pagesize, enabledictionary}.

There is a spark.shuffle.parquet.fallback configuration option which allows users to
specify a fallback shuffle manager. If the Parquet manager finds that the classes
being shuffled have no schema information, and therefore can't be used, it will
fallback to the specified fallback manager. With this PR, only Avro IndexedRecords
are supported in the Parquet shuffle; however, it is straight-forward to extend
this to other serialization systems that Parquet supports, e.g. Apache Thrift.
If there is no spark.shuffle.parquet.fallback defined, any shuffle objects which are
not compatible with Parquet will cause an error to be thrown which lists the
incompatible objects.

Because the ShuffleDependency forwards the key, value and combined class information,
a full schema can be generated before the first read/write. This allows for less
errors (since reflection isn't used) and makes support for null values possible without
complex code.

The ExternalSorter, if needed, is setup to not spill to disk if Parquet is used. In
the future, an ExternalSorter would need to be created that can read/write Parquet.

Only record-level metrics are supported at this time. Byte-level metrics are not
currently supported and are complicated somewhat by column compression.
  • Loading branch information
massie committed Sep 11, 2015
commit 0a4c0289176b09c0934d73411c42dca983daf264
8 changes: 8 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@
<groupId>com.twitter</groupId>
<artifactId>chill-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
Expand Down
15 changes: 9 additions & 6 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,13 @@ class SparkEnv (
object SparkEnv extends Logging {
@volatile private var env: SparkEnv = _

// Let the user specify short names for shuffle managers
val shuffleManagerAliases = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager",
"parquet" -> "org.apache.spark.shuffle.parquet.ParquetShuffleManager")

private[spark] val driverActorSystemName = "sparkDriver"
private[spark] val executorActorSystemName = "sparkExecutor"

Expand Down Expand Up @@ -314,13 +321,9 @@ object SparkEnv extends Logging {
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))

// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager",
"sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager",
"tungsten-sort" -> "org.apache.spark.shuffle.unsafe.UnsafeShuffleManager")
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleMgrClass = shuffleManagerAliases
.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

val shuffleMemoryManager = ShuffleMemoryManager.create(conf, numUsableCores)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.parquet

import org.apache.spark.network.buffer.ManagedBuffer
import org.apache.spark.shuffle._
import org.apache.spark.storage.ShuffleBlockId
import org.apache.spark.{ShuffleDependency, TaskContext}

class ErrorShuffleManager extends ShuffleManager {

private def throwError(error: String) = {
throw new NotImplementedError(
s"${ParquetShuffleConfig.fallbackShuffleManager} not defined: ${error}")
}

/**
* Register a shuffle with the manager and obtain a handle for it to pass to tasks.
*/
override def registerShuffle[K, V, C](shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
throwError(s"Unable to register shuffle for keyClass=${dependency.keyClassName} " +
s"valueClass=${dependency.valueClassName} combineClass=${dependency.combinerClassName}")
}

/**
* Return a resolver capable of retrieving shuffle block data based on block coordinates.
*/
override def shuffleBlockResolver: ShuffleBlockResolver = new ShuffleBlockResolver {

override def stop(): Unit = {} // no-op

/**
* Retrieve the data for the specified block. If the data for that block is not available,
* throws an unspecified exception.
*/
override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
throwError(s"Unable to get block data for ${blockId}")
}
}

/** Shut down this ShuffleManager. */
override def stop(): Unit = {} // no-op

/**
* Remove a shuffle's metadata from the ShuffleManager.
* @return true if the metadata removed successfully, otherwise false.
*/
override def unregisterShuffle(shuffleId: Int): Boolean = {
throwError("Unable to unregister shuffle")
}

/** Get a writer for a given partition. Called on executors by map tasks. */
override def getWriter[K, V](handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V] = {
throwError("Unable to get a writer")
}

/**
* Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive).
* Called on executors by reduce tasks.
*/
override def getReader[K, C](handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C] = {
throwError("Unable to get a reader")
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle.parquet

import org.apache.parquet.hadoop.ParquetWriter
import org.apache.parquet.hadoop.metadata.CompressionCodecName

import org.apache.spark.shuffle.ShuffleManager
import org.apache.spark.util.Utils
import org.apache.spark.{SparkConf, SparkEnv}

object ParquetShuffleConfig {
private val sparkManagerConfKey = "spark.shuffle.manager"
private val parquetManagerAlias = "parquet"
private val namespace = "spark.shuffle.parquet."
private val compressionKey = namespace + "compression"
private val blocksizeKey = namespace + "blocksize"
private val pagesizeKey = namespace + "pagesize"
private val enableDictionaryKey = namespace + "enabledictionary"
private[parquet] val fallbackShuffleManager = namespace + "fallback"

def isParquetShuffleEnabled: Boolean = {
isParquetShuffleEnabled(SparkEnv.get.conf)
}

def isParquetShuffleEnabled(conf: SparkConf): Boolean = {
val confValue = conf.get(sparkManagerConfKey, "")
confValue == parquetManagerAlias || confValue == classOf[ParquetShuffleManager].getName
}

def enableParquetShuffle(): Unit = {
enableParquetShuffle(SparkEnv.get.conf)
}

def enableParquetShuffle(conf: SparkConf): Unit = {
conf.set(ParquetShuffleConfig.sparkManagerConfKey, classOf[ParquetShuffleManager].getName)
}

def getCompression: CompressionCodecName = {
getCompression(SparkEnv.get.conf)
}

def getCompression(conf: SparkConf): CompressionCodecName = {
val confValue = conf.get(compressionKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_COMPRESSION_CODEC_NAME
} else {
CompressionCodecName.fromConf(confValue)
}
}

def getBlockSize: Int = {
getBlockSize(SparkEnv.get.conf)
}

def getBlockSize(conf: SparkConf): Int = {
val confValue = conf.get(blocksizeKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_BLOCK_SIZE
} else {
confValue.toInt
}
}

def getPageSize: Int = {
getPageSize(SparkEnv.get.conf)
}

def getPageSize(conf: SparkConf): Int = {
val confValue = conf.get(pagesizeKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_PAGE_SIZE
} else {
confValue.toInt
}
}

def isDictionaryEnabled: Boolean = {
isDictionaryEnabled(SparkEnv.get.conf)
}

def isDictionaryEnabled(conf: SparkConf): Boolean = {
val confValue = conf.get(enableDictionaryKey, null)
if (confValue == null) {
ParquetWriter.DEFAULT_IS_DICTIONARY_ENABLED
} else {
confValue.toBoolean
}
}

def setFallbackShuffleManager(managerName: String): Unit = {
setFallbackShuffleManager(SparkEnv.get.conf, managerName)
}

def setFallbackShuffleManager(conf: SparkConf, managerName: String): Unit = {
conf.set(fallbackShuffleManager, managerName)
}

def getFallbackShuffleManager: ShuffleManager = {
getFallbackShuffleManager(SparkEnv.get.conf)
}

def getFallbackShuffleManager(conf: SparkConf): ShuffleManager = {
val confValue = conf.get(fallbackShuffleManager, null)
if (confValue == null) {
new ErrorShuffleManager
} else {
val fullName = SparkEnv.shuffleManagerAliases.getOrElse(confValue, confValue)
val cls = Utils.classForName(fullName)
cls.getConstructor(classOf[SparkConf]).newInstance(conf).asInstanceOf[ShuffleManager]
}
}

}
Loading