Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-9043] Serialize key, value and combiner classes in ShuffleDepe…
…ndency

ShuffleManager implementations are currently not given type information for
the key, value and combiner classes. Serialization of shuffle objects relies
on objects being JavaSerializable, with methods defined for reading/writing
the object or, alternatively, serialization via Kryo which uses reflection.

Serialization systems like Avro, Thrift and Protobuf generate classes with
zero argument constructors and explicit schema information
(e.g. IndexedRecords in Avro have get, put and getSchema methods).

By serializing the key, value and combiner class names in ShuffleDependency,
shuffle implementations will have access to schema information when
registerShuffle() is called.
  • Loading branch information
massie committed Sep 9, 2015
commit ed1afac5e4b43b6e9bae43ee3c18d6144d8e1729
2 changes: 1 addition & 1 deletion bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ object Bagel extends Logging {
val startTime = System.currentTimeMillis

val aggregated = agg(verts, aggregator)
val combinedMsgs = msgs.combineByKey(
val combinedMsgs = msgs.combineByKeyWithClassTag(
combiner.createCombiner _, combiner.mergeMsg _, combiner.mergeCombiners _, partitioner)
val grouped = combinedMsgs.groupWith(verts)
val superstep_ = superstep // Create a read-only copy of superstep for capture in closure
Expand Down
13 changes: 12 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark

import scala.reflect.ClassTag

import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
Expand Down Expand Up @@ -65,7 +67,7 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
* @param mapSideCombine whether to perform partial aggregation (also known as map-side combine)
*/
@DeveloperApi
class ShuffleDependency[K, V, C](
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FYI, this is a binary-incompatible change to a DeveloperAPI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the source for DeveloperApi, a Developer API is unstable and can change between minor releases.

@transient private val _rdd: RDD[_ <: Product2[K, V]],
val partitioner: Partitioner,
val serializer: Option[Serializer] = None,
Expand All @@ -76,6 +78,15 @@ class ShuffleDependency[K, V, C](

override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]

/**
* The key, value and combiner classes are serialized so that shuffle manager
* implementation can use the information to build
*/
val keyClassName: String = reflect.classTag[K].runtimeClass.getName
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this used? it might require the key class to have a 0-arg ctor right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's an example of how I use this in the Parquet shuffle manager to create a schema for the (key, value) or (key, combiner) pairs for the shuffle files.

val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
val combinerClassName: Option[String] = Option(reflect.classTag[C]).map(_.runtimeClass.getName)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these should be private[spark] right? We don't want to expose them if we don't have to.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make this private[spark], that will prevent developers from creating pluggable ShuffleManager implementations, see ShuffleManager.registerShuffle(). The ShuffleDependency is annotated with @DeveloperApi which should signal to developers that these attributes can (and likely will) change over time, hopefully minimally.

Another approach would be to change the registerShuffle signature to be something like, e.g.

def registerShuffle[K, V, C](
      shuffleId: Int,
      numMaps: Int,
      dependency: ShuffleDependency[K, V, C],
      keyClassName: String,
      valueClassName: String,
      combinerClassName: Option[String]): ShuffleHandle

..instead of passing that information in the dependency.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 Let me know which approach you prefer -- (a) keeping the class names public or (b) changing the registerShuffle arguments.

The first approach has the advantage that the data types are available everywhere the ShuffleDependency is used. The latter approach doesn't require the class names be public -- they would exist at all (and instead be passed to registerShuffle).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's not change the registerShuffle API

val shuffleId: Int = _rdd.context.newShuffleId()

val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
mapSideCombine: Boolean,
serializer: Serializer): JavaPairRDD[K, C] = {
implicit val ctag: ClassTag[C] = fakeClassTag
fromRDD(rdd.combineByKey(
fromRDD(rdd.combineByKeyWithClassTag(
createCombiner,
mergeValue,
mergeCombiners,
Expand Down
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import scala.language.existentials
import java.io.{IOException, ObjectOutputStream}

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
Expand Down Expand Up @@ -74,7 +75,9 @@ private[spark] class CoGroupPartition(
* @param part partitioner used to partition the shuffle output
*/
@DeveloperApi
class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner)
class CoGroupedRDD[K: ClassTag](
@transient var rdds: Seq[RDD[_ <: Product2[K, _]]],
part: Partitioner)
extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) {

// For example, `(k, a) cogroup (k, b)` produces k -> Array(ArrayBuffer as, ArrayBuffer bs).
Expand Down
84 changes: 70 additions & 14 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
with SparkHadoopMapReduceUtil
with Serializable
{

/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C
Expand All @@ -70,12 +71,13 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* In addition, users can control the partitioning of the output RDD, and whether to perform
* map-side aggregation (if a mapper can produce multiple items with the same key).
*/
def combineByKey[C](createCombiner: V => C,
def combineByKeyWithClassTag[C](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually, why call this something else? Does it not compile if you just called this combineByKey as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because PairRDDFunctions is a stable API, we can't change the method signature of combineByKey. Adding the ClassTag, would add an implicit argument. If we leave the old combineByKey methods and add new combineByKey methods with ClassTags, then we get compiler errors being unable to resolve the combineByKey symbol.

If you know as way of doing this more cleanly, I would be happy to make that change.

createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
Expand Down Expand Up @@ -103,13 +105,48 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* Simplified version of combineByKey that hash-partitions the output RDD.
* This method is here for backward compatibility. It
* does not provide combiner classtag information to
* the shuffle.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The java doc should start with a description of what the method does. We should use the old one and add that it exists backward compatibility after the first sentence.

*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
partitioner, mapSideCombine, serializer)(null)
}

/**
* This method is here for backward compatibility. It
* does not provide combiner classtag information to
* the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](createCombiner: V => C,
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int): RDD[(K, C)] = self.withScope {
combineByKey(createCombiner, mergeValue, mergeCombiners, new HashPartitioner(numPartitions))
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, numPartitions)(null)
}

/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the output RDD.
*/
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
numPartitions: Int)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
new HashPartitioner(numPartitions))
}

/**
Expand All @@ -133,7 +170,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

// We will clean the combiner closure later in `combineByKey`
val cleanedSeqOp = self.context.clean(seqOp)
combineByKey[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner)
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),
cleanedSeqOp, combOp, partitioner)
}

/**
Expand Down Expand Up @@ -182,7 +220,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray))

val cleanedFunc = self.context.clean(func)
combineByKey[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner)
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),
cleanedFunc, cleanedFunc, partitioner)
}

/**
Expand Down Expand Up @@ -268,7 +307,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
combineByKey[V]((v: V) => v, func, func, partitioner)
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}

/**
Expand Down Expand Up @@ -392,7 +431,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
h1
}

combineByKey(createHLL, mergeValueHLL, mergeHLL, partitioner).mapValues(_.cardinality())
combineByKeyWithClassTag(createHLL, mergeValueHLL, mergeHLL, partitioner)
.mapValues(_.cardinality())
}

/**
Expand Down Expand Up @@ -466,7 +506,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKey[CompactBuffer[V]](
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
Expand Down Expand Up @@ -565,12 +605,28 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
}

/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the
* This method is here for backward compatibility. It
* does not provide combiner classtag information to
* the shuffle.
*
* @see [[combineByKeyWithClassTag]]
*/
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}

/**
* Simplified version of combineByKeyWithClassTag that hash-partitions the resulting RDD using the
* existing partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
: RDD[(K, C)] = self.withScope {
combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
}

/**
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.rdd

import scala.reflect.ClassTag

import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.serializer.Serializer
Expand All @@ -37,7 +39,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
*/
// TODO: Make this return RDD[Product2[K, C]] or have some way to configure mutable pairs
@DeveloperApi
class ShuffledRDD[K, V, C](
class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
@transient var prev: RDD[_ <: Product2[K, V]],
part: Partitioner)
extends RDD[(K, C)](prev.context, Nil) {
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,17 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
}

override def getDependencies: Seq[Dependency[_]] = {
Seq(rdd1, rdd2).map { rdd =>
def rddDependency[T1: ClassTag, T2: ClassTag](rdd: RDD[_ <: Product2[T1, T2]])
: Dependency[_] = {
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency(rdd, part, serializer)
new ShuffleDependency[T1, T2, Any](rdd, part, serializer)
}
}
Seq(rddDependency[K, V](rdd1), rddDependency[K, W](rdd2))
}

override def getPartitions: Array[Partition] = {
Expand Down Expand Up @@ -105,7 +107,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
seq
}
}
def integrate(depNum: Int, op: Product2[K, V] => Unit) = {
def integrate(depNum: Int, op: Product2[K, V] => Unit): Unit = {
dependencies(depNum) match {
case oneToOneDependency: OneToOneDependency[_] =>
val dependencyPartition = partition.narrowDeps(depNum).get.split
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/CheckpointSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,7 +483,7 @@ class FatPairRDD(parent: RDD[Int], _partitioner: Partitioner) extends RDD[(Int,
object CheckpointSuite {
// This is a custom cogroup function that does not use mapValues like
// the PairRDDFunctions.cogroup()
def cogroup[K, V](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
def cogroup[K: ClassTag, V: ClassTag](first: RDD[(K, V)], second: RDD[(K, V)], part: Partitioner)
: RDD[(K, Array[Iterable[V]])] = {
new CoGroupedRDD[K](
Seq(first.asInstanceOf[RDD[(K, _)]], second.asInstanceOf[RDD[(K, _)]]),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.shuffle

import org.apache.spark._

case class KeyClass()

case class ValueClass()

case class CombinerClass()

class ShuffleDependencySuite extends SparkFunSuite with LocalSparkContext {

val conf = new SparkConf(loadDefaults = false)

test("key, value, and combiner classes correct in shuffle dependency without aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 5, 4)
.map(key => (KeyClass(), ValueClass()))
.groupByKey()
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
assert(!dep.mapSideCombine, "Test requires that no map-side aggregator is defined")
assert(dep.keyClassName == classOf[KeyClass].getName)
assert(dep.valueClassName == classOf[ValueClass].getName)
}

test("key, value, and combiner classes available in shuffle dependency with aggregation") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 5, 4)
.map(key => (KeyClass(), ValueClass()))
.aggregateByKey(CombinerClass())({ case (a, b) => a }, { case (a, b) => a })
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
assert(dep.mapSideCombine && dep.aggregator.isDefined, "Test requires map-side aggregation")
assert(dep.keyClassName == classOf[KeyClass].getName)
assert(dep.valueClassName == classOf[ValueClass].getName)
assert(dep.combinerClassName == Some(classOf[CombinerClass].getName))
}

test("combineByKey null combiner class tag handled correctly") {
sc = new SparkContext("local", "test", conf.clone())
val rdd = sc.parallelize(1 to 5, 4)
.map(key => (KeyClass(), ValueClass()))
.combineByKey((v: ValueClass) => v,
(c: AnyRef, v: ValueClass) => c,
(c1: AnyRef, c2: AnyRef) => c1)
val dep = rdd.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]]
assert(dep.keyClassName == classOf[KeyClass].getName)
assert(dep.valueClassName == classOf[ValueClass].getName)
assert(dep.combinerClassName == None)
}

}