diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala index ac61568af..1579f92ce 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala @@ -76,7 +76,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) { val poolName = listener.stageIdToPool.get(s.stageId) val nameLink = - {s.name} + {s.name + " (%s)".format(s.rddInfo.name)} val description = listener.stageIdToDescription.get(s.stageId) .map(d =>
{d}
{nameLink}
).getOrElse(nameLink) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 6d04bf790..2997f2e69 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -19,18 +19,20 @@ package org.apache.spark.graphx import scala.reflect.{classTag, ClassTag} +import org.apache.spark.Logging import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.graphx.impl.EdgePartition import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel /** - * `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition + * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition * for performance. */ -class EdgeRDD[@specialized ED: ClassTag]( - val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])]) - extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { +class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( + val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]) + extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) + with Logging { partitionsRDD.setName("EdgeRDD") @@ -45,33 +47,41 @@ class EdgeRDD[@specialized ED: ClassTag]( partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD))) override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = { - val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context) - p.next._2.iterator.map(_.copy()) + val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context) + if (p.hasNext) { + p.next._2.iterator.map(_.copy()) + } else { + Iterator.empty + } } override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() - override def persist(newLevel: StorageLevel): EdgeRDD[ED] = { + override def persist(newLevel: StorageLevel): EdgeRDD[ED, VD] = { partitionsRDD.persist(newLevel) this } /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY) + override def persist(): EdgeRDD[ED, VD] = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - override def cache(): EdgeRDD[ED] = persist() + override def cache(): EdgeRDD[ED, VD] = persist() - override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = { + override def unpersist(blocking: Boolean = true): EdgeRDD[ED, VD] = { partitionsRDD.unpersist(blocking) this } - private[graphx] def mapEdgePartitions[ED2: ClassTag]( - f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = { - new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter => - val (pid, ep) = iter.next() - Iterator(Tuple2(pid, f(pid, ep))) + private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( + f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { + new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => + if (iter.hasNext) { + val (pid, ep) = iter.next() + Iterator(Tuple2(pid, f(pid, ep))) + } else { + Iterator.empty + } }, preservesPartitioning = true)) } @@ -82,7 +92,7 @@ class EdgeRDD[@specialized ED: ClassTag]( * @param f the function from an edge to a new edge value * @return a new EdgeRDD containing the new edge values */ - def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] = + def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] = mapEdgePartitions((pid, part) => part.map(f)) /** @@ -90,7 +100,13 @@ class EdgeRDD[@specialized ED: ClassTag]( * * @return a new EdgeRDD containing all the edges reversed */ - def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse) + def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse) + + def filter( + epred: EdgeTriplet[VD, ED] => Boolean, + vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = { + mapEdgePartitions((pid, part) => part.filter(epred, vpred)) + } /** * Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same @@ -102,19 +118,27 @@ class EdgeRDD[@specialized ED: ClassTag]( * with values supplied by `f` */ def innerJoin[ED2: ClassTag, ED3: ClassTag] - (other: EdgeRDD[ED2]) - (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = { + (other: EdgeRDD[ED2, _]) + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = { val ed2Tag = classTag[ED2] val ed3Tag = classTag[ED3] - new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) { + new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { (thisIter, otherIter) => - val (pid, thisEPart) = thisIter.next() - val (_, otherEPart) = otherIter.next() - Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) + if (thisIter.hasNext && otherIter.hasNext) { + val (pid, thisEPart) = thisIter.next() + val (_, otherEPart) = otherIter.next() + Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) + } else { + if (thisIter.hasNext != otherIter.hasNext) { + logError("innerJoin: Dropped non-empty edge partition from `%s`".format( + if (thisIter.hasNext) "this" else "other")) + } + Iterator.empty + } }) } private[graphx] def collectVertexIds(): RDD[VertexId] = { - partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) } + partitionsRDD.flatMap { case (_, p) => p.vidIterator } } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala index dfc6a8015..d9b19f88f 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeTriplet.scala @@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] { if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr } override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString() + + def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index ef05623d7..3d3368901 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -58,7 +58,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * along with their vertex data. * */ - val edges: EdgeRDD[ED] + val edges: EdgeRDD[ED, VD] /** * An RDD containing the edge triplets, which are edges along with the vertex data associated with diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala index dd380d8c1..e91eff11e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphKryoRegistrator.scala @@ -33,8 +33,9 @@ class GraphKryoRegistrator extends KryoRegistrator { kryo.register(classOf[Edge[Object]]) kryo.register(classOf[MessageToPartition[Object]]) kryo.register(classOf[VertexBroadcastMsg[Object]]) + kryo.register(classOf[RoutingTableMessage]) kryo.register(classOf[(VertexId, Object)]) - kryo.register(classOf[EdgePartition[Object]]) + kryo.register(classOf[EdgePartition[Object, Object]]) kryo.register(classOf[BitSet]) kryo.register(classOf[VertexIdToIndexMap]) kryo.register(classOf[VertexAttributeBlock[Object]]) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 18858466d..5c943f932 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -60,8 +60,8 @@ object GraphLoader extends Logging { val startTime = System.currentTimeMillis // Parse the edge data table directly into edge partitions - val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) => - val builder = new EdgePartitionBuilder[Int] + val edges = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions).mapPartitionsWithIndex { (pid, iter) => + val builder = new EdgePartitionBuilder[Int, Int] iter.foreach { line => if (!line.isEmpty && line(0) != '#') { val lineArray = line.split("\\s+") @@ -78,7 +78,7 @@ object GraphLoader extends Logging { } } Iterator((pid, builder.toEdgePartition)) - }.cache() + }.cache().setName("GraphLoader.edgeListFile - raw edges (%s)".format(path)) edges.count() logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala index 377d9d6bd..297e56ee6 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala @@ -43,19 +43,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali * The in-degree of each vertex in the graph. * @note Vertices with no in-edges are not returned in the resulting RDD. */ - lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In) + lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees") /** * The out-degree of each vertex in the graph. * @note Vertices with no out-edges are not returned in the resulting RDD. */ - lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out) + lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees") /** * The degree of each vertex in the graph. * @note Vertices with no edges are not returned in the resulting RDD. */ - lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either) + lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees") /** * Computes the neighboring vertex degrees. diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala index ac07a594a..4572eab28 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala @@ -18,6 +18,7 @@ package org.apache.spark.graphx import scala.reflect.ClassTag +import org.apache.spark.Logging /** @@ -52,7 +53,7 @@ import scala.reflect.ClassTag * }}} * */ -object Pregel { +object Pregel extends Logging { /** * Execute a Pregel-like iterative vertex-parallel abstraction. The @@ -142,6 +143,9 @@ object Pregel { // hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the // vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g). activeMessages = messages.count() + + logInfo("Pregel finished iteration " + i) + // Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking=false) newVerts.unpersist(blocking=false) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index d6788d4d4..79da7b82b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -23,9 +23,15 @@ import org.apache.spark._ import org.apache.spark.SparkContext._ import org.apache.spark.rdd._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.collection.PrimitiveVector import org.apache.spark.graphx.impl.MsgRDDFunctions -import org.apache.spark.graphx.impl.VertexPartition +import org.apache.spark.graphx.impl.RoutingTableMessage +import org.apache.spark.graphx.impl.RoutingTableMessageRDDFunctions._ +import org.apache.spark.graphx.impl.VertexRDDFunctions._ +import org.apache.spark.graphx.impl.RoutingTablePartition +import org.apache.spark.graphx.impl.ShippableVertexPartition +import org.apache.spark.graphx.impl.VertexAttributeBlock /** * Extends `RDD[(VertexId, VD)]` by ensuring that there is only one entry for each vertex and by @@ -50,13 +56,11 @@ import org.apache.spark.graphx.impl.VertexPartition * @tparam VD the vertex attribute associated with each vertex in the set. */ class VertexRDD[@specialized VD: ClassTag]( - val partitionsRDD: RDD[VertexPartition[VD]]) + val partitionsRDD: RDD[ShippableVertexPartition[VD]]) extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { require(partitionsRDD.partitioner.isDefined) - partitionsRDD.setName("VertexRDD") - /** * Construct a new VertexRDD that is indexed by only the visible vertices. The resulting * VertexRDD will be based on a different index and can no longer be quickly joined with this @@ -71,6 +75,16 @@ class VertexRDD[@specialized VD: ClassTag]( override protected def getPreferredLocations(s: Partition): Seq[String] = partitionsRDD.preferredLocations(s) + override def setName(_name: String): VertexRDD[VD] = { + if (partitionsRDD.name != null) { + partitionsRDD.setName(partitionsRDD.name + ", " + _name) + } else { + partitionsRDD.setName(_name) + } + this + } + setName("VertexRDD") + override def persist(newLevel: StorageLevel): VertexRDD[VD] = { partitionsRDD.persist(newLevel) this @@ -96,14 +110,14 @@ class VertexRDD[@specialized VD: ClassTag]( * Provides the `RDD[(VertexId, VD)]` equivalent output. */ override def compute(part: Partition, context: TaskContext): Iterator[(VertexId, VD)] = { - firstParent[VertexPartition[VD]].iterator(part, context).next.iterator + firstParent[ShippableVertexPartition[VD]].iterator(part, context).next.iterator } /** * Applies a function to each `VertexPartition` of this RDD and returns a new VertexRDD. */ private[graphx] def mapVertexPartitions[VD2: ClassTag]( - f: VertexPartition[VD] => VertexPartition[VD2]) + f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) : VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) new VertexRDD(newPartitionsRDD) @@ -214,10 +228,8 @@ class VertexRDD[@specialized VD: ClassTag]( case _ => new VertexRDD[VD3]( partitionsRDD.zipPartitions( - other.partitionBy(this.partitioner.get), preservesPartitioning = true) - { (part, msgs) => - val vertexPartition: VertexPartition[VD] = part.next() - Iterator(vertexPartition.leftJoin(msgs)(f)) + other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { + (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f)) } ) } @@ -260,10 +272,8 @@ class VertexRDD[@specialized VD: ClassTag]( case _ => new VertexRDD( partitionsRDD.zipPartitions( - other.partitionBy(this.partitioner.get), preservesPartitioning = true) - { (part, msgs) => - val vertexPartition: VertexPartition[VD] = part.next() - Iterator(vertexPartition.innerJoin(msgs)(f)) + other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { + (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f)) } ) } @@ -282,14 +292,24 @@ class VertexRDD[@specialized VD: ClassTag]( */ def aggregateUsingIndex[VD2: ClassTag]( messages: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] = { - val shuffled = MsgRDDFunctions.partitionForAggregation(messages, this.partitioner.get) + val shuffled = messages.copartitionWithVertices(this.partitioner.get) val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => - val vertexPartition: VertexPartition[VD] = thisIter.next() - Iterator(vertexPartition.aggregateUsingIndex(msgIter, reduceFunc)) + thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) } new VertexRDD[VD2](parts) } + // Ship vertex attributes to edge partitions according to vertexPlacement + def shipVertexAttributes( + shipSrc: Boolean, shipDst: Boolean): RDD[(PartitionID, VertexAttributeBlock[VD])] = { + partitionsRDD.mapPartitions(_.flatMap(_.shipVertexAttributes(shipSrc, shipDst))) + } + + // Ship vertex attributes to edge partitions according to vertexPlacement + def shipVertexIds(): RDD[(PartitionID, Array[VertexId])] = { + partitionsRDD.mapPartitions(_.flatMap(_.shipVertexIds())) + } + } // end of VertexRDD @@ -299,24 +319,30 @@ class VertexRDD[@specialized VD: ClassTag]( object VertexRDD { /** - * Construct a `VertexRDD` from an RDD of vertex-attribute pairs. - * Duplicate entries are removed arbitrarily. + * Construct a standalone `VertexRDD` from an RDD of vertex-attribute pairs. Duplicate entries are + * removed arbitrarily. The resulting `VertexRDD` will not be set up for efficient joins with any + * [[EdgeRDD]]. * * @tparam VD the vertex attribute type * - * @param rdd the collection of vertex-attribute pairs + * @param vertices the collection of vertex-attribute pairs */ - def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)]): VertexRDD[VD] = { - val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match { - case Some(p) => rdd - case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) + def apply[VD: ClassTag](vertices: RDD[(VertexId, VD)]): VertexRDD[VD] = { + val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { + case Some(p) => vertices + case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size)) } - val vertexPartitions = partitioned.mapPartitions( - iter => Iterator(VertexPartition(iter)), + val vertexPartitions = vPartitioned.mapPartitions( + iter => Iterator(ShippableVertexPartition(iter)), preservesPartitioning = true) new VertexRDD(vertexPartitions) } + def apply[VD: ClassTag]( + vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD): VertexRDD[VD] = { + VertexRDD(vertices, edges, defaultVal, (a, b) => b) + } + /** * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using * `mergeFunc`. @@ -326,25 +352,53 @@ object VertexRDD { * @param rdd the collection of vertex-attribute pairs * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassTag](rdd: RDD[(VertexId, VD)], mergeFunc: (VD, VD) => VD): VertexRDD[VD] = { - val partitioned: RDD[(VertexId, VD)] = rdd.partitioner match { - case Some(p) => rdd - case None => rdd.partitionBy(new HashPartitioner(rdd.partitions.size)) + def apply[VD: ClassTag]( + vertices: RDD[(VertexId, VD)], edges: EdgeRDD[_, _], defaultVal: VD, mergeFunc: (VD, VD) => VD + ): VertexRDD[VD] = { + val vPartitioned: RDD[(VertexId, VD)] = vertices.partitioner match { + case Some(p) => vertices + case None => vertices.copartitionWithVertices(new HashPartitioner(vertices.partitions.size)) + } + val routingTables = createRoutingTables(edges, vPartitioned.partitioner.get) + val vertexPartitions = vPartitioned.zipPartitions(routingTables, preservesPartitioning = true) { + (vertexIter, routingTableIter) => + val routingTable = + if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty + Iterator(ShippableVertexPartition(vertexIter, routingTable, defaultVal)) } - val vertexPartitions = partitioned.mapPartitions( - iter => Iterator(VertexPartition(iter)), - preservesPartitioning = true) new VertexRDD(vertexPartitions) } /** - * Constructs a VertexRDD from the vertex IDs in `vids`, taking attributes from `rdd` and using - * `defaultVal` otherwise. + * Constructs a `VertexRDD` from an RDD of vertex-attribute pairs, merging duplicates using + * `mergeFunc`. + * + * @tparam VD the vertex attribute type + * + * @param rdd the collection of vertex-attribute pairs + * @param mergeFunc the associative, commutative merge function. */ - def apply[VD: ClassTag](vids: RDD[VertexId], rdd: RDD[(VertexId, VD)], defaultVal: VD) - : VertexRDD[VD] = { - VertexRDD(vids.map(vid => (vid, defaultVal))).leftJoin(rdd) { (vid, default, value) => - value.getOrElse(default) - } + def fromEdges[VD: ClassTag](edges: EdgeRDD[_, _], numPartitions: Int, defaultVal: VD): VertexRDD[VD] = { + val routingTables = createRoutingTables(edges, new HashPartitioner(numPartitions)) + val vertexPartitions = routingTables.mapPartitions({ routingTableIter => + val routingTable = + if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty + Iterator(ShippableVertexPartition(Iterator.empty, routingTable, defaultVal)) + }, preservesPartitioning = true) + new VertexRDD(vertexPartitions) + } + + + def createRoutingTables( + edges: EdgeRDD[_, _], vertexPartitioner: Partitioner): RDD[RoutingTablePartition] = { + // Determine which vertices each edge partition needs by creating a mapping from vid to pid. + val vid2pid = edges.partitionsRDD.mapPartitions(_.flatMap( + Function.tupled(RoutingTablePartition.edgePartitionToMsgs))) + .setName("VertexRDD.createRoutingTables - vid2pid (aggregation)") + + val numEdgePartitions = edges.partitions.size + vid2pid.copartitionWithVertices(vertexPartitioner).mapPartitions( + iter => Iterator(RoutingTablePartition.fromMsgs(numEdgePartitions, iter)), + preservesPartitioning = true) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala index 2e05f5d4e..65042e00b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartition.scala @@ -17,7 +17,7 @@ package org.apache.spark.graphx.impl -import scala.reflect.ClassTag +import scala.reflect.{classTag, ClassTag} import org.apache.spark.graphx._ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap @@ -33,23 +33,60 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap * @tparam ED the edge attribute type. */ private[graphx] -class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag]( +class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED: ClassTag, VD: ClassTag]( val srcIds: Array[VertexId], val dstIds: Array[VertexId], val data: Array[ED], - val index: PrimitiveKeyOpenHashMap[VertexId, Int]) extends Serializable { + val index: PrimitiveKeyOpenHashMap[VertexId, Int], + // Must include all vids mentioned in srcIds and dstIds. Mask is never used. + val vertexPartition: VertexPartition[VD], + /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */ + val activeSet: Option[VertexSet] = None + ) extends Serializable { + + /** Look up vid in activeSet, throwing an exception if it is None. */ + def isActive(vid: VertexId): Boolean = { + activeSet.get.contains(vid) + } + + /** The number of active vertices, if any exist. */ + def numActives: Option[Int] = activeSet.map(_.size) + + def withData[ED2: ClassTag](data_ : Array[ED2]): EdgePartition[ED2, VD] = { + new EdgePartition(srcIds, dstIds, data_, index, vertexPartition, activeSet) + } + + def withVertexPartition[VD2: ClassTag]( + vertexPartition_ : VertexPartition[VD2]): EdgePartition[ED, VD2] = { + new EdgePartition(srcIds, dstIds, data, index, vertexPartition_, activeSet) + } + + def withActiveSet(iter: Iterator[VertexId]): EdgePartition[ED, VD] = { + val newActiveSet = new VertexSet + iter.foreach(newActiveSet.add(_)) + new EdgePartition(srcIds, dstIds, data, index, vertexPartition, Some(newActiveSet)) + } + + def withActiveSet(activeSet_ : Option[VertexSet]): EdgePartition[ED, VD] = { + new EdgePartition(srcIds, dstIds, data, index, vertexPartition, activeSet_) + } + + // iter must contain only vertices already in the vertex partition + def updateVertices(iter: Iterator[(VertexId, VD)]): EdgePartition[ED, VD] = { + this.withVertexPartition(vertexPartition.innerJoinKeepLeft(iter)) + } /** * Reverse all the edges in this partition. * * @return a new edge partition with all edges reversed. */ - def reverse: EdgePartition[ED] = { - val builder = new EdgePartitionBuilder(size) + def reverse: EdgePartition[ED, VD] = { + val builder = new EdgePartitionBuilder(size)(classTag[ED], classTag[VD]) for (e <- iterator) { builder.add(e.dstId, e.srcId, e.attr) } - builder.toEdgePartition + builder.toEdgePartition.withVertexPartition(vertexPartition).withActiveSet(activeSet) } /** @@ -64,7 +101,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @return a new edge partition with the result of the function `f` * applied to each edge */ - def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2] = { + def map[ED2: ClassTag](f: Edge[ED] => ED2): EdgePartition[ED2, VD] = { val newData = new Array[ED2](data.size) val edge = new Edge[ED]() val size = data.size @@ -76,7 +113,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) newData(i) = f(edge) i += 1 } - new EdgePartition(srcIds, dstIds, newData, index) + this.withData(newData) } /** @@ -91,7 +128,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @tparam ED2 the type of the new attribute * @return a new edge partition with the attribute values replaced */ - def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2] = { + def map[ED2: ClassTag](iter: Iterator[ED2]): EdgePartition[ED2, VD] = { // Faster than iter.toArray, because the expected size is known. val newData = new Array[ED2](data.size) var i = 0 @@ -100,7 +137,19 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) i += 1 } assert(newData.size == i) - new EdgePartition(srcIds, dstIds, newData, index) + this.withData(newData) + } + + def filter( + epred: EdgeTriplet[VD, ED] => Boolean, + vpred: (VertexId, VD) => Boolean): EdgePartition[ED, VD] = { + val filtered = tripletIterator().filter(et => + vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et)) + val builder = new EdgePartitionBuilder[ED, VD] + for (e <- filtered) { + builder.add(e.srcId, e.dstId, e.attr) + } + builder.toEdgePartition.withVertexPartition(vertexPartition).withActiveSet(activeSet) } /** @@ -119,8 +168,8 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * @param merge a commutative associative merge operation * @return a new edge partition without duplicate edges */ - def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED] = { - val builder = new EdgePartitionBuilder[ED] + def groupEdges(merge: (ED, ED) => ED): EdgePartition[ED, VD] = { + val builder = new EdgePartitionBuilder[ED, VD] var currSrcId: VertexId = null.asInstanceOf[VertexId] var currDstId: VertexId = null.asInstanceOf[VertexId] var currAttr: ED = null.asInstanceOf[ED] @@ -141,7 +190,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) if (size > 0) { builder.add(currSrcId, currDstId, currAttr) } - builder.toEdgePartition + builder.toEdgePartition.withVertexPartition(vertexPartition).withActiveSet(activeSet) } /** @@ -155,9 +204,9 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) * once. */ def innerJoin[ED2: ClassTag, ED3: ClassTag] - (other: EdgePartition[ED2]) - (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3] = { - val builder = new EdgePartitionBuilder[ED3] + (other: EdgePartition[ED2, _]) + (f: (VertexId, VertexId, ED, ED2) => ED3): EdgePartition[ED3, VD] = { + val builder = new EdgePartitionBuilder[ED3, VD] var i = 0 var j = 0 // For i = index of each edge in `this`... @@ -175,7 +224,7 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) } i += 1 } - builder.toEdgePartition + builder.toEdgePartition.withVertexPartition(vertexPartition).withActiveSet(activeSet) } /** @@ -211,6 +260,44 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) } } + // TODO: make a version that reuses objects + def tripletIterator( + includeSrc: Boolean = true, includeDst: Boolean = true): Iterator[EdgeTriplet[VD, ED]] = { + new EdgeTripletIterator(vertexPartition.index, vertexPartition.values, this, + includeSrc, includeDst) + } + + def upgradeIterator( + edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true) + : Iterator[EdgeTriplet[VD, ED]] = { + val tripletIter = new Iterator[EdgeTriplet[VD, ED]] { + private[this] val triplet = new EdgeTriplet[VD, ED] + override def hasNext = edgeIter.hasNext + override def next() = { + triplet.set(edgeIter.next()) + } + } + val withSrc = + if (includeSrc) { + tripletIter.map { triplet => + triplet.srcAttr = EdgePartition.this.vertexPartition(triplet.srcId) + triplet + } + } else { + tripletIter + } + val withDst = + if (includeDst) { + withSrc.map { triplet => + triplet.dstAttr = EdgePartition.this.vertexPartition(triplet.dstId) + triplet + } + } else { + withSrc + } + withDst + } + /** * Get an iterator over the edges in this partition whose source vertex ids match srcIdPred. The * iterator is generated using an index scan, so it is efficient at skipping edges that don't @@ -243,4 +330,6 @@ class EdgePartition[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) edge } } + + def vidIterator: Iterator[VertexId] = vertexPartition.index.iterator } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala index 63ccccb05..80d4c3378 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgePartitionBuilder.scala @@ -22,10 +22,12 @@ import scala.util.Sorting import org.apache.spark.graphx._ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap -import org.apache.spark.util.collection.PrimitiveVector +import org.apache.spark.util.collection.{BitSet, OpenHashSet, PrimitiveVector} private[graphx] -class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: Int = 64) { +class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag, VD: ClassTag]( + size: Int = 64, + defaultVertexAttr: VD = null.asInstanceOf[VD]) { var edges = new PrimitiveVector[Edge[ED]](size) /** Add a new edge to the partition. */ @@ -33,7 +35,7 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I edges += Edge(src, dst, d) } - def toEdgePartition: EdgePartition[ED] = { + def toEdgePartition: EdgePartition[ED, VD] = { val edgeArray = edges.trim().array Sorting.quickSort(edgeArray)(Edge.lexicographicOrdering) val srcIds = new Array[VertexId](edgeArray.size) @@ -57,6 +59,12 @@ class EdgePartitionBuilder[@specialized(Long, Int, Double) ED: ClassTag](size: I i += 1 } } - new EdgePartition(srcIds, dstIds, data, index) + + val vidsIter = srcIds.iterator ++ dstIds.iterator + val vertexIds = new OpenHashSet[VertexId] + vidsIter.foreach(vid => vertexIds.add(vid)) + val vertexPartition = new VertexPartition( + vertexIds, new Array[VD](vertexIds.capacity), new BitSet(vertexIds.capacity)) + new EdgePartition(srcIds, dstIds, data, index, vertexPartition) } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala index 220a89d73..a04d5c078 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/EdgeTripletIterator.scala @@ -31,7 +31,9 @@ private[impl] class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( val vidToIndex: VertexIdToIndexMap, val vertexArray: Array[VD], - val edgePartition: EdgePartition[ED]) + val edgePartition: EdgePartition[ED, VD], + val includeSrc: Boolean, + val includeDst: Boolean) extends Iterator[EdgeTriplet[VD, ED]] { // Current position in the array. @@ -44,9 +46,13 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag]( override def next() = { val triplet = new EdgeTriplet[VD, ED] triplet.srcId = edgePartition.srcIds(pos) - triplet.srcAttr = vmap(triplet.srcId) + if (includeSrc) { + triplet.srcAttr = vmap(triplet.srcId) + } triplet.dstId = edgePartition.dstIds(pos) - triplet.dstAttr = vmap(triplet.dstId) + if (includeDst) { + triplet.dstAttr = vmap(triplet.dstId) + } triplet.attr = edgePartition.data(pos) pos += 1 triplet diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index c2b510a31..31f75df8a 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} +import org.apache.spark.Logging import org.apache.spark.util.collection.PrimitiveVector import org.apache.spark.{HashPartitioner, Partitioner} import org.apache.spark.SparkContext._ @@ -44,29 +45,25 @@ import org.apache.spark.util.ClosureCleaner */ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( @transient val vertices: VertexRDD[VD], - @transient val edges: EdgeRDD[ED], - @transient val routingTable: RoutingTable, - @transient val replicatedVertexView: ReplicatedVertexView[VD]) - extends Graph[VD, ED] with Serializable { + @transient private val replicatedVertexView: ReplicatedVertexView[VD, ED]) + extends Graph[VD, ED] with Serializable with Logging { /** Default constructor is provided to support serialization */ - protected def this() = this(null, null, null, null) + protected def this() = this(null, null) + + @transient override val edges: EdgeRDD[ED, VD] = replicatedVertexView.edges /** Return a RDD that brings edges together with their source and destination vertices. */ - @transient override val triplets: RDD[EdgeTriplet[VD, ED]] = { - val vdTag = classTag[VD] - val edTag = classTag[ED] - edges.partitionsRDD.zipPartitions( - replicatedVertexView.get(true, true), true) { (ePartIter, vPartIter) => - val (pid, ePart) = ePartIter.next() - val (_, vPart) = vPartIter.next() - new EdgeTripletIterator(vPart.index, vPart.values, ePart)(vdTag, edTag) - } + @transient override lazy val triplets: RDD[EdgeTriplet[VD, ED]] = { + replicatedVertexView.upgrade(vertices, true, true) + replicatedVertexView.edges.partitionsRDD.mapPartitions(_.flatMap { + case (pid, part) => part.tripletIterator() + }) } override def persist(newLevel: StorageLevel): Graph[VD, ED] = { vertices.persist(newLevel) - edges.persist(newLevel) + replicatedVertexView.edges.persist(newLevel) this } @@ -74,14 +71,14 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) - replicatedVertexView.unpersist(blocking) this } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - val numPartitions = edges.partitions.size + val numPartitions = replicatedVertexView.edges.partitions.size val edTag = classTag[ED] - val newEdges = new EdgeRDD(edges.map { e => + val vdTag = classTag[VD] + val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e => val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) // Should we be using 3-tuple or an optimized class @@ -89,20 +86,20 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } .partitionBy(new HashPartitioner(numPartitions)) .mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED]()(edTag) + val builder = new EdgePartitionBuilder[ED, VD]()(edTag, vdTag) iter.foreach { message => val data = message.data builder.add(data._1, data._2, data._3) } val edgePartition = builder.toEdgePartition Iterator((pid, edgePartition)) - }, preservesPartitioning = true).cache()) - GraphImpl(vertices, newEdges) + }, preservesPartitioning = true)) + GraphImpl.fromExistingRDDs(vertices, newEdges) } override def reverse: Graph[VD, ED] = { - val newETable = edges.mapEdgePartitions((pid, part) => part.reverse) - new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) + val newEdges = edges.mapEdgePartitions((pid, part) => part.reverse) + new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges)) } override def mapVertices[VD2: ClassTag](f: (VertexId, VD) => VD2): Graph[VD2, ED] = { @@ -110,45 +107,31 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( // The map preserves type, so we can use incremental replication val newVerts = vertices.mapVertexPartitions(_.map(f)).cache() val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) - val newReplicatedVertexView = new ReplicatedVertexView[VD2]( - changedVerts, edges, routingTable, - Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) - new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) + val newReplicatedVertexView = replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]] + .updateVertices(changedVerts) + new GraphImpl(newVerts, newReplicatedVertexView) } else { // The map does not preserve type, so we must re-replicate all vertices - GraphImpl(vertices.mapVertexPartitions(_.map(f)), edges, routingTable) + GraphImpl(vertices.mapVertexPartitions(_.map(f)), replicatedVertexView.edges) } } override def mapEdges[ED2: ClassTag]( f: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] = { - val newETable = edges.mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator))) - new GraphImpl(vertices, newETable , routingTable, replicatedVertexView) + val newEdges = replicatedVertexView.edges + .mapEdgePartitions((pid, part) => part.map(f(pid, part.iterator))) + new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges)) } override def mapTriplets[ED2: ClassTag]( f: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]): Graph[VD, ED2] = { - val newEdgePartitions = - edges.partitionsRDD.zipPartitions(replicatedVertexView.get(true, true), true) { - (ePartIter, vTableReplicatedIter) => - val (ePid, edgePartition) = ePartIter.next() - val (vPid, vPart) = vTableReplicatedIter.next() - assert(!vTableReplicatedIter.hasNext) - assert(ePid == vPid) - val et = new EdgeTriplet[VD, ED] - val inputIterator = edgePartition.iterator.map { e => - et.set(e) - et.srcAttr = vPart(e.srcId) - et.dstAttr = vPart(e.dstId) - et - } - // Apply the user function to the vertex partition - val outputIter = f(ePid, inputIterator) - // Consume the iterator to update the edge attributes - val newEdgePartition = edgePartition.map(outputIter) - Iterator((ePid, newEdgePartition)) - } - new GraphImpl(vertices, new EdgeRDD(newEdgePartitions), routingTable, replicatedVertexView) + val mapUsesSrcAttr = accessesVertexAttr(f, "srcAttr") + val mapUsesDstAttr = accessesVertexAttr(f, "dstAttr") + replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr) + val newEdges = replicatedVertexView.edges.mapEdgePartitions { (pid, part) => + part.map(f(pid, part.tripletIterator(mapUsesSrcAttr, mapUsesDstAttr))) + } + new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges)) } override def subgraph( @@ -156,38 +139,23 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( vpred: (VertexId, VD) => Boolean = (a, b) => true): Graph[VD, ED] = { // Filter the vertices, reusing the partitioner and the index from this graph val newVerts = vertices.mapVertexPartitions(_.filter(vpred)) - - // Filter the edges - val edTag = classTag[ED] - val newEdges = new EdgeRDD[ED](triplets.filter { et => - vpred(et.srcId, et.srcAttr) && vpred(et.dstId, et.dstAttr) && epred(et) - }.mapPartitionsWithIndex( { (pid, iter) => - val builder = new EdgePartitionBuilder[ED]()(edTag) - iter.foreach { et => builder.add(et.srcId, et.dstId, et.attr) } - val edgePartition = builder.toEdgePartition - Iterator((pid, edgePartition)) - }, preservesPartitioning = true)).cache() - - // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been - // removed will be ignored, since we only refer to replicated vertices when they are adjacent to - // an edge. - new GraphImpl(newVerts, newEdges, new RoutingTable(newEdges, newVerts), replicatedVertexView) - } // end of subgraph + // Filter the triplets + replicatedVertexView.upgrade(vertices, true, true) + val newEdges = replicatedVertexView.edges.filter(epred, vpred) + new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges)) + } override def mask[VD2: ClassTag, ED2: ClassTag] ( other: Graph[VD2, ED2]): Graph[VD, ED] = { val newVerts = vertices.innerJoin(other.vertices) { (vid, v, w) => v } - val newEdges = edges.innerJoin(other.edges) { (src, dst, v, w) => v } - // Reuse the previous ReplicatedVertexView unmodified. The replicated vertices that have been - // removed will be ignored, since we only refer to replicated vertices when they are adjacent to - // an edge. - new GraphImpl(newVerts, newEdges, routingTable, replicatedVertexView) + val newEdges = replicatedVertexView.edges.innerJoin(other.edges) { (src, dst, v, w) => v } + new GraphImpl(newVerts, replicatedVertexView.withEdges(newEdges)) } override def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] = { - ClosureCleaner.clean(merge) - val newETable = edges.mapEdgePartitions((pid, part) => part.groupEdges(merge)) - new GraphImpl(vertices, newETable, routingTable, replicatedVertexView) + val newEdges = replicatedVertexView.edges.mapEdgePartitions( + (pid, part) => part.groupEdges(merge)) + new GraphImpl(vertices, replicatedVertexView.withEdges(newEdges)) } // /////////////////////////////////////////////////////////////////////////////////////////////// @@ -199,68 +167,58 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( reduceFunc: (A, A) => A, activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = { - ClosureCleaner.clean(mapFunc) - ClosureCleaner.clean(reduceFunc) + vertices.cache() // For each vertex, replicate its attribute only to partitions where it is // in the relevant position in an edge. val mapUsesSrcAttr = accessesVertexAttr(mapFunc, "srcAttr") val mapUsesDstAttr = accessesVertexAttr(mapFunc, "dstAttr") - val vs = activeSetOpt match { + replicatedVertexView.upgrade(vertices, mapUsesSrcAttr, mapUsesDstAttr) + val view = activeSetOpt match { case Some((activeSet, _)) => - replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr, activeSet) + replicatedVertexView.withActiveSet(activeSet) case None => - replicatedVertexView.get(mapUsesSrcAttr, mapUsesDstAttr) + replicatedVertexView } val activeDirectionOpt = activeSetOpt.map(_._2) // Map and combine. - val preAgg = edges.partitionsRDD.zipPartitions(vs, true) { (ePartIter, vPartIter) => - val (ePid, edgePartition) = ePartIter.next() - val (vPid, vPart) = vPartIter.next() - assert(!vPartIter.hasNext) - assert(ePid == vPid) - // Choose scan method - val activeFraction = vPart.numActives.getOrElse(0) / edgePartition.indexSize.toFloat - val edgeIter = activeDirectionOpt match { - case Some(EdgeDirection.Both) => - if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) - .filter(e => vPart.isActive(e.dstId)) - } else { - edgePartition.iterator.filter(e => vPart.isActive(e.srcId) && vPart.isActive(e.dstId)) - } - case Some(EdgeDirection.Either) => - // TODO: Because we only have a clustered index on the source vertex ID, we can't filter - // the index here. Instead we have to scan all edges and then do the filter. - edgePartition.iterator.filter(e => vPart.isActive(e.srcId) || vPart.isActive(e.dstId)) - case Some(EdgeDirection.Out) => - if (activeFraction < 0.8) { - edgePartition.indexIterator(srcVertexId => vPart.isActive(srcVertexId)) - } else { - edgePartition.iterator.filter(e => vPart.isActive(e.srcId)) - } - case Some(EdgeDirection.In) => - edgePartition.iterator.filter(e => vPart.isActive(e.dstId)) - case _ => // None - edgePartition.iterator - } - - // Scan edges and run the map function - val et = new EdgeTriplet[VD, ED] - val mapOutputs = edgeIter.flatMap { e => - et.set(e) - if (mapUsesSrcAttr) { - et.srcAttr = vPart(e.srcId) - } - if (mapUsesDstAttr) { - et.dstAttr = vPart(e.dstId) + val preAgg = view.edges.partitionsRDD.mapPartitions(_.flatMap { + case (pid, edgePartition) => + // Choose scan method + val activeFraction = edgePartition.numActives.getOrElse(0) / edgePartition.indexSize.toFloat + val edgeIter = activeDirectionOpt match { + case Some(EdgeDirection.Both) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId)) + .filter(e => edgePartition.isActive(e.dstId)) + } else { + edgePartition.iterator.filter(e => + edgePartition.isActive(e.srcId) && edgePartition.isActive(e.dstId)) + } + case Some(EdgeDirection.Either) => + // TODO: Because we only have a clustered index on the source vertex ID, we can't filter + // the index here. Instead we have to scan all edges and then do the filter. + edgePartition.iterator.filter(e => + edgePartition.isActive(e.srcId) || edgePartition.isActive(e.dstId)) + case Some(EdgeDirection.Out) => + if (activeFraction < 0.8) { + edgePartition.indexIterator(srcVertexId => edgePartition.isActive(srcVertexId)) + } else { + edgePartition.iterator.filter(e => edgePartition.isActive(e.srcId)) + } + case Some(EdgeDirection.In) => + edgePartition.iterator.filter(e => edgePartition.isActive(e.dstId)) + case _ => // None + edgePartition.iterator } - mapFunc(et) - } - // Note: This doesn't allow users to send messages to arbitrary vertices. - vPart.aggregateUsingIndex(mapOutputs, reduceFunc).iterator - } + + // Scan edges and run the map function + val mapOutputs = edgePartition.upgradeIterator(edgeIter, mapUsesSrcAttr, mapUsesDstAttr) + .flatMap(mapFunc(_)) + // Note: This doesn't allow users to send messages to arbitrary vertices. + edgePartition.vertexPartition.aggregateUsingIndex(mapOutputs, reduceFunc).iterator + }).setName("GraphImpl.mapReduceTriplets - preAgg") // do the final reduction reusing the index map vertices.aggregateUsingIndex(preAgg, reduceFunc) @@ -268,20 +226,18 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( override def outerJoinVertices[U: ClassTag, VD2: ClassTag] (other: RDD[(VertexId, U)]) - (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = - { + (updateF: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED] = { if (classTag[VD] equals classTag[VD2]) { // updateF preserves type, so we can use incremental replication val newVerts = vertices.leftJoin(other)(updateF) val changedVerts = vertices.asInstanceOf[VertexRDD[VD2]].diff(newVerts) - val newReplicatedVertexView = new ReplicatedVertexView[VD2]( - changedVerts, edges, routingTable, - Some(replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2]])) - new GraphImpl(newVerts, edges, routingTable, newReplicatedVertexView) + val newReplicatedVertexView = + replicatedVertexView.asInstanceOf[ReplicatedVertexView[VD2, ED]].updateVertices(changedVerts) + new GraphImpl(newVerts, newReplicatedVertexView) } else { // updateF does not preserve type, so we must re-replicate all vertices val newVerts = vertices.leftJoin(other)(updateF) - GraphImpl(newVerts, edges, routingTable) + GraphImpl(newVerts, replicatedVertexView.edges) } } @@ -300,13 +256,12 @@ object GraphImpl { def apply[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD, ED] = - { + defaultVertexAttr: VD): GraphImpl[VD, ED] = { fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) } def fromEdgePartitions[VD: ClassTag, ED: ClassTag]( - edgePartitions: RDD[(PartitionID, EdgePartition[ED])], + edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])], defaultVertexAttr: VD): GraphImpl[VD, ED] = { fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) } @@ -314,43 +269,26 @@ object GraphImpl { def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD, ED] = - { - val edgeRDD = createEdgeRDD(edges).cache() - - // Get the set of all vids - val partitioner = Partitioner.defaultPartitioner(vertices) - val vPartitioned = vertices.partitionBy(partitioner) - val vidsFromEdges = collectVertexIdsFromEdges(edgeRDD, partitioner) - val vids = vPartitioned.zipPartitions(vidsFromEdges) { (vertexIter, vidsFromEdgesIter) => - vertexIter.map(_._1) ++ vidsFromEdgesIter.map(_._1) - } - - val vertexRDD = VertexRDD(vids, vPartitioned, defaultVertexAttr) - + defaultVertexAttr: VD): GraphImpl[VD, ED] = { + val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache() + val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) GraphImpl(vertexRDD, edgeRDD) } def apply[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], - edges: EdgeRDD[ED]): GraphImpl[VD, ED] = { - // Cache RDDs that are referenced multiple times - edges.cache() - - GraphImpl(vertices, edges, new RoutingTable(edges, vertices)) + edges: EdgeRDD[ED, _]): GraphImpl[VD, ED] = { + // Convert the vertex partitions in edges to the correct type + val newEdges = edges.mapEdgePartitions( + (pid, part) => part.withVertexPartition(part.vertexPartition.map( + (vid, attr) => null.asInstanceOf[VD]))) + GraphImpl.fromExistingRDDs(vertices, newEdges) } - def apply[VD: ClassTag, ED: ClassTag]( + def fromExistingRDDs[VD: ClassTag, ED: ClassTag]( vertices: VertexRDD[VD], - edges: EdgeRDD[ED], - routingTable: RoutingTable): GraphImpl[VD, ED] = { - // Cache RDDs that are referenced multiple times. `routingTable` is cached by default, so we - // don't cache it explicitly. - vertices.cache() - edges.cache() - - new GraphImpl( - vertices, edges, routingTable, new ReplicatedVertexView(vertices, edges, routingTable)) + edges: EdgeRDD[ED, VD]): GraphImpl[VD, ED] = { + new GraphImpl(vertices, new ReplicatedVertexView(edges)) } /** @@ -361,10 +299,10 @@ object GraphImpl { * pair: the key is the partition id, and the value is an EdgePartition object containing all the * edges in a partition. */ - private def createEdgeRDD[ED: ClassTag]( - edges: RDD[Edge[ED]]): EdgeRDD[ED] = { + private def createEdgeRDD[ED: ClassTag, VD: ClassTag]( + edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = { val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => - val builder = new EdgePartitionBuilder[ED] + val builder = new EdgePartitionBuilder[ED, VD] iter.foreach { e => builder.add(e.srcId, e.dstId, e.attr) } @@ -374,23 +312,11 @@ object GraphImpl { } private def fromEdgeRDD[VD: ClassTag, ED: ClassTag]( - edges: EdgeRDD[ED], + edges: EdgeRDD[ED, VD], defaultVertexAttr: VD): GraphImpl[VD, ED] = { edges.cache() - // Get the set of all vids - val vids = collectVertexIdsFromEdges(edges, new HashPartitioner(edges.partitions.size)) - // Create the VertexRDD. - val vertices = VertexRDD(vids.mapValues(x => defaultVertexAttr)) - GraphImpl(vertices, edges) + val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr) + fromExistingRDDs(vertices, edges) } - /** Collects all vids mentioned in edges and partitions them by partitioner. */ - private def collectVertexIdsFromEdges( - edges: EdgeRDD[_], - partitioner: Partitioner): RDD[(VertexId, Int)] = { - // TODO: Consider doing map side distinct before shuffle. - new ShuffledRDD[VertexId, Int, (VertexId, Int)]( - edges.collectVertexIds.map(vid => (vid, 0)), partitioner) - .setSerializer(new VertexIdMsgSerializer) - } } // end of object GraphImpl diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala index 9d4f3750c..b3f5fe6c0 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/MessageToPartition.scala @@ -88,7 +88,6 @@ class MsgRDDFunctions[T: ClassTag](self: RDD[MessageToPartition[T]]) { } - private[graphx] object MsgRDDFunctions { implicit def rdd2PartitionRDDFunctions[T: ClassTag](rdd: RDD[MessageToPartition[T]]) = { @@ -98,18 +97,28 @@ object MsgRDDFunctions { implicit def rdd2vertexMessageRDDFunctions[T: ClassTag](rdd: RDD[VertexBroadcastMsg[T]]) = { new VertexBroadcastMsgRDDFunctions(rdd) } +} - def partitionForAggregation[T: ClassTag](msgs: RDD[(VertexId, T)], partitioner: Partitioner) = { - val rdd = new ShuffledRDD[VertexId, T, (VertexId, T)](msgs, partitioner) +private[graphx] +class VertexRDDFunctions[VD: ClassTag](self: RDD[(VertexId, VD)]) { + def copartitionWithVertices(partitioner: Partitioner): RDD[(VertexId, VD)] = { + val rdd = new ShuffledRDD[VertexId, VD, (VertexId, VD)](self, partitioner) // Set a custom serializer if the data is of int or double type. - if (classTag[T] == ClassTag.Int) { + if (classTag[VD] == ClassTag.Int) { rdd.setSerializer(new IntAggMsgSerializer) - } else if (classTag[T] == ClassTag.Long) { + } else if (classTag[VD] == ClassTag.Long) { rdd.setSerializer(new LongAggMsgSerializer) - } else if (classTag[T] == ClassTag.Double) { + } else if (classTag[VD] == ClassTag.Double) { rdd.setSerializer(new DoubleAggMsgSerializer) } rdd } } + +private[graphx] +object VertexRDDFunctions { + implicit def rdd2VertexRDDFunctions[VD: ClassTag](rdd: RDD[(VertexId, VD)]) = { + new VertexRDDFunctions(rdd) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index a8154b63c..9edbeff37 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -19,6 +19,7 @@ package org.apache.spark.graphx.impl import scala.reflect.{classTag, ClassTag} +import org.apache.spark.Logging import org.apache.spark.SparkContext._ import org.apache.spark.rdd.RDD import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} @@ -38,175 +39,74 @@ import org.apache.spark.graphx._ * [[org.apache.spark.graphx.Pregel]] for an example. */ private[impl] -class ReplicatedVertexView[VD: ClassTag]( - updatedVerts: VertexRDD[VD], - edges: EdgeRDD[_], - routingTable: RoutingTable, - prevViewOpt: Option[ReplicatedVertexView[VD]] = None) { - - /** - * Within each edge partition, create a local map from vid to an index into the attribute - * array. Each map contains a superset of the vertices that it will receive, because it stores - * vids from both the source and destination of edges. It must always include both source and - * destination vids because some operations, such as GraphImpl.mapReduceTriplets, rely on this. - */ - private val localVertexIdMap: RDD[(Int, VertexIdToIndexMap)] = prevViewOpt match { - case Some(prevView) => - prevView.localVertexIdMap - case None => - edges.partitionsRDD.mapPartitions(_.map { - case (pid, epart) => - val vidToIndex = new VertexIdToIndexMap - epart.foreach { e => - vidToIndex.add(e.srcId) - vidToIndex.add(e.dstId) - } - (pid, vidToIndex) - }, preservesPartitioning = true).cache().setName("ReplicatedVertexView localVertexIdMap") - } - - private lazy val bothAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(true, true) - private lazy val srcAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(true, false) - private lazy val dstAttrOnly: RDD[(PartitionID, VertexPartition[VD])] = create(false, true) - private lazy val noAttrs: RDD[(PartitionID, VertexPartition[VD])] = create(false, false) - - def unpersist(blocking: Boolean = true): ReplicatedVertexView[VD] = { - bothAttrs.unpersist(blocking) - srcAttrOnly.unpersist(blocking) - dstAttrOnly.unpersist(blocking) - noAttrs.unpersist(blocking) - // Don't unpersist localVertexIdMap because a future ReplicatedVertexView may be using it - // without modification - this +class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( + var edges: EdgeRDD[ED, VD], + var hasSrcId: Boolean = false, + var hasDstId: Boolean = false) extends Logging { + + def withEdges[VD2: ClassTag, ED2: ClassTag]( + edges_ : EdgeRDD[ED2, VD2]): ReplicatedVertexView[VD2, ED2] = { + new ReplicatedVertexView(edges_, hasSrcId, hasDstId) } - def get(includeSrc: Boolean, includeDst: Boolean): RDD[(PartitionID, VertexPartition[VD])] = { - (includeSrc, includeDst) match { - case (true, true) => bothAttrs - case (true, false) => srcAttrOnly - case (false, true) => dstAttrOnly - case (false, false) => noAttrs + def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean) { + val shipSrc = includeSrc && !hasSrcId + val shipDst = includeDst && !hasDstId + if (shipSrc || shipDst) { + val shippedVerts: RDD[(Int, VertexAttributeBlock[VD])] = + vertices.shipVertexAttributes(shipSrc, shipDst) + .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format( + includeSrc, includeDst, shipSrc, shipDst)) + .partitionBy(edges.partitioner.get) + val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { + (ePartIter, shippedVertsIter) => ePartIter.map { + case (pid, edgePartition) => + (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) + } + }) + edges = newEdges + hasSrcId = includeSrc + hasDstId = includeDst } } - def get( - includeSrc: Boolean, - includeDst: Boolean, - actives: VertexRDD[_]): RDD[(PartitionID, VertexPartition[VD])] = { + def withActiveSet(actives: VertexRDD[_]): ReplicatedVertexView[VD, ED] = { // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is // also shipped there. - val shippedActives = routingTable.get(true, true) - .zipPartitions(actives.partitionsRDD)(ReplicatedVertexView.buildActiveBuffer(_, _)) + val shippedActives = actives.shipVertexIds() + .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)") .partitionBy(edges.partitioner.get) + // Update the view with shippedActives, setting activeness flags in the resulting // VertexPartitions - get(includeSrc, includeDst).zipPartitions(shippedActives) { (viewIter, shippedActivesIter) => - val (pid, vPart) = viewIter.next() - val newPart = vPart.replaceActives(shippedActivesIter.flatMap(_._2.iterator)) - Iterator((pid, newPart)) - } - } + val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) { + (ePartIter, shippedActivesIter) => ePartIter.map { + case (pid, edgePartition) => + (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator))) + } + }) - private def create(includeSrc: Boolean, includeDst: Boolean) - : RDD[(PartitionID, VertexPartition[VD])] = { - val vdTag = classTag[VD] + new ReplicatedVertexView(newEdges, hasSrcId, hasDstId) + } + def updateVertices(updates: VertexRDD[VD]): ReplicatedVertexView[VD, ED] = { // Ship vertex attributes to edge partitions according to vertexPlacement - val verts = updatedVerts.partitionsRDD - val shippedVerts = routingTable.get(includeSrc, includeDst) - .zipPartitions(verts)(ReplicatedVertexView.buildBuffer(_, _)(vdTag)) + val shippedVerts = updates.shipVertexAttributes(hasSrcId, hasDstId) + .setName("ReplicatedVertexView.updateVertices - shippedVerts %s %s (broadcast)".format( + hasSrcId, hasDstId)) .partitionBy(edges.partitioner.get) - // TODO: Consider using a specialized shuffler. - - prevViewOpt match { - case Some(prevView) => - // Update prevView with shippedVerts, setting staleness flags in the resulting - // VertexPartitions - prevView.get(includeSrc, includeDst).zipPartitions(shippedVerts) { - (prevViewIter, shippedVertsIter) => - val (pid, prevVPart) = prevViewIter.next() - val newVPart = prevVPart.innerJoinKeepLeft(shippedVertsIter.flatMap(_._2.iterator)) - Iterator((pid, newVPart)) - }.cache().setName("ReplicatedVertexView delta %s %s".format(includeSrc, includeDst)) - - case None => - // Within each edge partition, place the shipped vertex attributes into the correct - // locations specified in localVertexIdMap - localVertexIdMap.zipPartitions(shippedVerts) { (mapIter, shippedVertsIter) => - val (pid, vidToIndex) = mapIter.next() - assert(!mapIter.hasNext) - // Populate the vertex array using the vidToIndex map - val vertexArray = vdTag.newArray(vidToIndex.capacity) - for ((_, block) <- shippedVertsIter) { - for (i <- 0 until block.vids.size) { - val vid = block.vids(i) - val attr = block.attrs(i) - val ind = vidToIndex.getPos(vid) - vertexArray(ind) = attr - } - } - val newVPart = new VertexPartition( - vidToIndex, vertexArray, vidToIndex.getBitSet)(vdTag) - Iterator((pid, newVPart)) - }.cache().setName("ReplicatedVertexView %s %s".format(includeSrc, includeDst)) - } - } -} -private object ReplicatedVertexView { - protected def buildBuffer[VD: ClassTag]( - pid2vidIter: Iterator[Array[Array[VertexId]]], - vertexPartIter: Iterator[VertexPartition[VD]]) = { - val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() - val vertexPart: VertexPartition[VD] = vertexPartIter.next() - - Iterator.tabulate(pid2vid.size) { pid => - val vidsCandidate = pid2vid(pid) - val size = vidsCandidate.length - val vids = new PrimitiveVector[VertexId](pid2vid(pid).size) - val attrs = new PrimitiveVector[VD](pid2vid(pid).size) - var i = 0 - while (i < size) { - val vid = vidsCandidate(i) - if (vertexPart.isDefined(vid)) { - vids += vid - attrs += vertexPart(vid) - } - i += 1 + // Update prevView with shippedVerts, setting staleness flags in the resulting + // VertexPartitions + val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { + (ePartIter, shippedVertsIter) => ePartIter.map { + case (pid, edgePartition) => + (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) } - (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array)) - } - } + }) - protected def buildActiveBuffer( - pid2vidIter: Iterator[Array[Array[VertexId]]], - activePartIter: Iterator[VertexPartition[_]]) - : Iterator[(Int, Array[VertexId])] = { - val pid2vid: Array[Array[VertexId]] = pid2vidIter.next() - val activePart: VertexPartition[_] = activePartIter.next() - - Iterator.tabulate(pid2vid.size) { pid => - val vidsCandidate = pid2vid(pid) - val size = vidsCandidate.length - val actives = new PrimitiveVector[VertexId](vidsCandidate.size) - var i = 0 - while (i < size) { - val vid = vidsCandidate(i) - if (activePart.isDefined(vid)) { - actives += vid - } - i += 1 - } - (pid, actives.trim().array) - } + new ReplicatedVertexView(newEdges, hasSrcId, hasDstId) } } - -private[graphx] -class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD]) - extends Serializable { - def iterator: Iterator[(VertexId, VD)] = - (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala deleted file mode 100644 index fe44e1ee0..000000000 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTable.scala +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx.impl - -import org.apache.spark.SparkContext._ -import org.apache.spark.graphx._ -import org.apache.spark.rdd.RDD -import org.apache.spark.storage.StorageLevel -import org.apache.spark.util.collection.PrimitiveVector - -/** - * Stores the locations of edge-partition join sites for each vertex attribute; that is, the routing - * information for shipping vertex attributes to edge partitions. This is always cached because it - * may be used multiple times in ReplicatedVertexView -- once to ship the vertex attributes and - * (possibly) once to ship the active-set information. - */ -private[impl] -class RoutingTable(edges: EdgeRDD[_], vertices: VertexRDD[_]) { - - val bothAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(true, true) - val srcAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(true, false) - val dstAttrOnly: RDD[Array[Array[VertexId]]] = createPid2Vid(false, true) - val noAttrs: RDD[Array[Array[VertexId]]] = createPid2Vid(false, false) - - def get(includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = - (includeSrcAttr, includeDstAttr) match { - case (true, true) => bothAttrs - case (true, false) => srcAttrOnly - case (false, true) => dstAttrOnly - case (false, false) => noAttrs - } - - private def createPid2Vid( - includeSrcAttr: Boolean, includeDstAttr: Boolean): RDD[Array[Array[VertexId]]] = { - // Determine which vertices each edge partition needs by creating a mapping from vid to pid. - val vid2pid: RDD[(VertexId, PartitionID)] = edges.partitionsRDD.mapPartitions { iter => - val (pid: PartitionID, edgePartition: EdgePartition[_]) = iter.next() - val numEdges = edgePartition.size - val vSet = new VertexSet - if (includeSrcAttr) { // Add src vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.srcIds(i)) - i += 1 - } - } - if (includeDstAttr) { // Add dst vertices to the set. - var i = 0 - while (i < numEdges) { - vSet.add(edgePartition.dstIds(i)) - i += 1 - } - } - vSet.iterator.map { vid => (vid, pid) } - } - - val numPartitions = vertices.partitions.size - vid2pid.partitionBy(vertices.partitioner.get).mapPartitions { iter => - val pid2vid = Array.fill(numPartitions)(new PrimitiveVector[VertexId]) - for ((vid, pid) <- iter) { - pid2vid(pid) += vid - } - - Iterator(pid2vid.map(_.trim().array)) - }.cache().setName("RoutingTable %s %s".format(includeSrcAttr, includeDstAttr)) - } -} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTableMessage.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTableMessage.scala new file mode 100644 index 000000000..f188819c9 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTableMessage.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.Partitioner +import org.apache.spark.graphx.{PartitionID, VertexId} +import org.apache.spark.rdd.{ShuffledRDD, RDD} + +/** See RoutingTablePartition.edgePartitionToMsgs */ +private[graphx] +class RoutingTableMessage( + var vid: VertexId, + var pid: PartitionID, + var position: Byte) + extends Product2[VertexId, (PartitionID, Byte)] with Serializable { + + override def _1 = vid + + override def _2 = (pid, position) + + override def canEqual(that: Any): Boolean = that.isInstanceOf[RoutingTableMessage] +} + +private[graphx] +class RoutingTableMessageRDDFunctions(self: RDD[RoutingTableMessage]) { + def copartitionWithVertices(partitioner: Partitioner): RDD[RoutingTableMessage] = { + new ShuffledRDD[VertexId, (PartitionID, Byte), RoutingTableMessage](self, partitioner) + .setSerializer(new RoutingTableMessageSerializer) + } +} + +private[graphx] +object RoutingTableMessageRDDFunctions { + implicit def rdd2RoutingTableMessageRDDFunctions(rdd: RDD[RoutingTableMessage]) = { + new RoutingTableMessageRDDFunctions(rdd) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala new file mode 100644 index 000000000..d7706d092 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.BitSet +import org.apache.spark.util.collection.PrimitiveVector + +private[graphx] object RoutingTablePartition { + + val empty: RoutingTablePartition = new RoutingTablePartition(Array.empty) + + def edgePartitionToMsgs(pid: PartitionID, edgePartition: EdgePartition[_, _]) + : Iterator[RoutingTableMessage] = { + // Determine which positions each vertex id appears in using a map where the low 2 bits + // represent src and dst + val map = new PrimitiveKeyOpenHashMap[VertexId, Byte] + edgePartition.srcIds.iterator.foreach { srcId => + map.changeValue(srcId, 0x1, (b: Byte) => (b | 0x1).toByte) + } + edgePartition.dstIds.iterator.foreach { dstId => + map.changeValue(dstId, 0x2, (b: Byte) => (b | 0x2).toByte) + } + map.iterator.map { case (vid, position) => new RoutingTableMessage(vid, pid, position) } + } + + def fromMsgs(numEdgePartitions: Int, iter: Iterator[RoutingTableMessage]) + : RoutingTablePartition = { + val pid2vid = Array.fill(numEdgePartitions)(new PrimitiveVector[VertexId]) + val srcFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) + val dstFlags = Array.fill(numEdgePartitions)(new PrimitiveVector[Boolean]) + for (msg <- iter) { + pid2vid(msg.pid) += msg.vid + srcFlags(msg.pid) += (msg.position & 0x1) != 0 + dstFlags(msg.pid) += (msg.position & 0x2) != 0 + } + + new RoutingTablePartition(pid2vid.zipWithIndex.map { + case (vids, pid) => (vids.trim().array, toBitSet(srcFlags(pid)), toBitSet(dstFlags(pid))) + }) + } + + private def toBitSet(flags: PrimitiveVector[Boolean]): BitSet = { + val bitset = new BitSet(flags.size) + var i = 0 + while (i < flags.size) { + if (flags(i)) { + bitset.set(i) + } + i += 1 + } + bitset + } +} + +private[graphx] class RoutingTablePartition( + private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) { + + val numEdgePartitions: Int = routingTable.size + + def partitionSize(pid: PartitionID): Int = routingTable(pid)._1.size + + def iterator: Iterator[VertexId] = routingTable.iterator.flatMap(_._1.iterator) + + def foreachWithinEdgePartition + (pid: PartitionID, includeSrc: Boolean, includeDst: Boolean) + (f: VertexId => Unit) { + val (vidsCandidate, srcVids, dstVids) = routingTable(pid) + val size = vidsCandidate.length + if (includeSrc && includeDst) { + // Avoid checks for performance + vidsCandidate.iterator.foreach(f) + } else if (!includeSrc && !includeDst) { + // Do nothing + } else { + val relevantVids = if (includeSrc) srcVids else dstVids + relevantVids.iterator.foreach { i => f(vidsCandidate(i)) } + } + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala index 2f2c524df..d89aecef9 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/Serializers.scala @@ -24,6 +24,33 @@ import org.apache.spark.SparkConf import org.apache.spark.graphx._ import org.apache.spark.serializer._ +private[graphx] +class RoutingTableMessageSerializer extends Serializer with Serializable { + override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { + + override def serializeStream(s: OutputStream) = new ShuffleSerializationStream(s) { + def writeObject[T](t: T) = { + val msg = t.asInstanceOf[RoutingTableMessage] + writeVarLong(msg.vid, optimizePositive = false) + writeUnsignedVarInt(msg.pid) + // TODO: Write only the bottom two bits of msg.position + s.write(msg.position) + this + } + } + + override def deserializeStream(s: InputStream) = new ShuffleDeserializationStream(s) { + override def readObject[T](): T = { + val a = readVarLong(optimizePositive = false) + val b = readUnsignedVarInt() + val c = s.read() + if (c == -1) throw new EOFException + new RoutingTableMessage(a, b, c.toByte).asInstanceOf[T] + } + } + } +} + private[graphx] class VertexIdMsgSerializer extends Serializer with Serializable { override def newInstance(): SerializerInstance = new ShuffleSerializerInstance { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala new file mode 100644 index 000000000..b3db0c0b1 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ShippableVertexPartition.scala @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.BitSet +import org.apache.spark.util.collection.PrimitiveVector + +private[graphx] +object ShippableVertexPartition { + def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): ShippableVertexPartition[VD] = + apply(iter, RoutingTablePartition.empty, null.asInstanceOf[VD]) + + def apply[VD: ClassTag]( + iter: Iterator[(VertexId, VD)], routingTable: RoutingTablePartition, defaultVal: VD) + : ShippableVertexPartition[VD] = { + val fullIter = iter ++ routingTable.iterator.map(vid => (vid, defaultVal)) + val (index, values, mask) = VertexPartitionBase.initFrom(fullIter, (a: VD, b: VD) => a) + new ShippableVertexPartition(index, values, mask, routingTable) + } + + // For using ops directly on particular subclasses of VertexPartitionBase + implicit def shippablePartitionToOps[VD: ClassTag](partition: ShippableVertexPartition[VD]) = + new ShippableVertexPartitionOps(partition) + + // For using ops on generic VertexPartitionBase types via a context bound + implicit object ShippableVertexPartitionOpsConstructor extends VertexPartitionBaseOpsConstructor[ShippableVertexPartition] { + def toOps[VD: ClassTag](partition: ShippableVertexPartition[VD]): VertexPartitionBaseOps[VD, ShippableVertexPartition] = + shippablePartitionToOps(partition) + } +} + +private[graphx] +class ShippableVertexPartition[VD: ClassTag]( + val index: VertexIdToIndexMap, + val values: Array[VD], + val mask: BitSet, + /** + * Stores the locations of edge-partition join sites for each vertex attribute; that is, the + * routing information for shipping vertex attributes to edge partitions. + */ + val routingTable: RoutingTablePartition) + extends VertexPartitionBase[VD] { + + // Ship vertex attributes to edge partitions according to vertexPlacement + def shipVertexAttributes( + shipSrc: Boolean, shipDst: Boolean): Iterator[(PartitionID, VertexAttributeBlock[VD])] = { + Iterator.tabulate(routingTable.numEdgePartitions) { pid => + val initialSize = if (shipSrc && shipDst) routingTable.partitionSize(pid) else 64 + val vids = new PrimitiveVector[VertexId](initialSize) + val attrs = new PrimitiveVector[VD](initialSize) + var i = 0 + routingTable.foreachWithinEdgePartition(pid, shipSrc, shipDst) { vid => + if (isDefined(vid)) { + vids += vid + attrs += this(vid) + } + i += 1 + } + (pid, new VertexAttributeBlock(vids.trim().array, attrs.trim().array)) + } + } + + // Ship active sets to edge partitions using vertexPlacement, but ignoring includeSrc and + // includeDst. These flags govern attribute shipping, but the activeness of a vertex must be + // shipped to all edges mentioning that vertex, regardless of whether the vertex attribute is + // also shipped there. + def shipVertexIds(): Iterator[(PartitionID, Array[VertexId])] = { + Iterator.tabulate(routingTable.numEdgePartitions) { pid => + val vids = new PrimitiveVector[VertexId](routingTable.partitionSize(pid)) + var i = 0 + routingTable.foreachWithinEdgePartition(pid, true, true) { vid => + if (isDefined(vid)) { + vids += vid + } + i += 1 + } + (pid, vids.trim().array) + } + } +} + +private[graphx] class ShippableVertexPartitionOps[VD: ClassTag](self: ShippableVertexPartition[VD]) + extends VertexPartitionBaseOps[VD, ShippableVertexPartition](self) { + + def withIndex(index: VertexIdToIndexMap): ShippableVertexPartition[VD] = { + new ShippableVertexPartition(index, self.values, self.mask, self.routingTable) + } + + def withValues[VD2: ClassTag](values: Array[VD2]): ShippableVertexPartition[VD2] = { + new ShippableVertexPartition(self.index, values, self.mask, self.routingTable) + } + + def withMask(mask: BitSet): ShippableVertexPartition[VD] = { + new ShippableVertexPartition(self.index, self.values, mask, self.routingTable) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexAttributeBlock.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexAttributeBlock.scala new file mode 100644 index 000000000..aede5ff4e --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexAttributeBlock.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.{classTag, ClassTag} + +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.rdd.RDD +import org.apache.spark.util.collection.{PrimitiveVector, OpenHashSet} + +import org.apache.spark.graphx._ + +private[graphx] +class VertexAttributeBlock[VD: ClassTag](val vids: Array[VertexId], val attrs: Array[VD]) + extends Serializable { + def iterator: Iterator[(VertexId, VD)] = + (0 until vids.size).iterator.map { i => (vids(i), attrs(i)) } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala index 7a54b413d..880d50557 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartition.scala @@ -25,254 +25,40 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap import org.apache.spark.util.collection.BitSet private[graphx] object VertexPartition { - - def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]): VertexPartition[VD] = { - val map = new PrimitiveKeyOpenHashMap[VertexId, VD] - iter.foreach { case (k, v) => - map(k) = v - } - new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) + def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)]) + : VertexPartition[VD] = { + val (index, values, mask) = VertexPartitionBase.initFrom(iter) + new VertexPartition(index, values, mask) } - def apply[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD) - : VertexPartition[VD] = - { - val map = new PrimitiveKeyOpenHashMap[VertexId, VD] - iter.foreach { case (k, v) => - map.setMerge(k, v, mergeFunc) - } - new VertexPartition(map.keySet, map._values, map.keySet.getBitSet) + // For using ops directly on particular subclasses of VertexPartitionBase + implicit def partitionToOps[VD: ClassTag](partition: VertexPartition[VD]) = + new VertexPartitionOps(partition) + + // For using ops on generic VertexPartitionBase types via a context bound + implicit object VertexPartitionOpsConstructor extends VertexPartitionBaseOpsConstructor[VertexPartition] { + def toOps[VD: ClassTag](partition: VertexPartition[VD]): VertexPartitionBaseOps[VD, VertexPartition] = + partitionToOps(partition) } } - - -private[graphx] -class VertexPartition[@specialized(Long, Int, Double) VD: ClassTag]( +private[graphx] class VertexPartition[VD: ClassTag]( val index: VertexIdToIndexMap, val values: Array[VD], - val mask: BitSet, - /** A set of vids of active vertices. May contain vids not in index due to join rewrite. */ - private val activeSet: Option[VertexSet] = None) - extends Logging { - - val capacity: Int = index.capacity - - def size: Int = mask.cardinality() - - /** Return the vertex attribute for the given vertex ID. */ - def apply(vid: VertexId): VD = values(index.getPos(vid)) - - def isDefined(vid: VertexId): Boolean = { - val pos = index.getPos(vid) - pos >= 0 && mask.get(pos) - } - - /** Look up vid in activeSet, throwing an exception if it is None. */ - def isActive(vid: VertexId): Boolean = { - activeSet.get.contains(vid) - } - - /** The number of active vertices, if any exist. */ - def numActives: Option[Int] = activeSet.map(_.size) - - /** - * Pass each vertex attribute along with the vertex id through a map - * function and retain the original RDD's partitioning and index. - * - * @tparam VD2 the type returned by the map function - * - * @param f the function applied to each vertex id and vertex - * attribute in the RDD - * - * @return a new VertexPartition with values obtained by applying `f` to - * each of the entries in the original VertexRDD. The resulting - * VertexPartition retains the same index. - */ - def map[VD2: ClassTag](f: (VertexId, VD) => VD2): VertexPartition[VD2] = { - // Construct a view of the map transformation - val newValues = new Array[VD2](capacity) - var i = mask.nextSetBit(0) - while (i >= 0) { - newValues(i) = f(index.getValue(i), values(i)) - i = mask.nextSetBit(i + 1) - } - new VertexPartition[VD2](index, newValues, mask) - } - - /** - * Restrict the vertex set to the set of vertices satisfying the given predicate. - * - * @param pred the user defined predicate - * - * @note The vertex set preserves the original index structure which means that the returned - * RDD can be easily joined with the original vertex-set. Furthermore, the filter only - * modifies the bitmap index and so no new values are allocated. - */ - def filter(pred: (VertexId, VD) => Boolean): VertexPartition[VD] = { - // Allocate the array to store the results into - val newMask = new BitSet(capacity) - // Iterate over the active bits in the old mask and evaluate the predicate - var i = mask.nextSetBit(0) - while (i >= 0) { - if (pred(index.getValue(i), values(i))) { - newMask.set(i) - } - i = mask.nextSetBit(i + 1) - } - new VertexPartition(index, values, newMask) - } + val mask: BitSet) + extends VertexPartitionBase[VD] - /** - * Hides vertices that are the same between this and other. For vertices that are different, keeps - * the values from `other`. The indices of `this` and `other` must be the same. - */ - def diff(other: VertexPartition[VD]): VertexPartition[VD] = { - if (index != other.index) { - logWarning("Diffing two VertexPartitions with different indexes is slow.") - diff(createUsingIndex(other.iterator)) - } else { - val newMask = mask & other.mask - var i = newMask.nextSetBit(0) - while (i >= 0) { - if (values(i) == other.values(i)) { - newMask.unset(i) - } - i = newMask.nextSetBit(i + 1) - } - new VertexPartition(index, other.values, newMask) - } - } - - /** Left outer join another VertexPartition. */ - def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: VertexPartition[VD2]) - (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { - if (index != other.index) { - logWarning("Joining two VertexPartitions with different indexes is slow.") - leftJoin(createUsingIndex(other.iterator))(f) - } else { - val newValues = new Array[VD3](capacity) - - var i = mask.nextSetBit(0) - while (i >= 0) { - val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None - newValues(i) = f(index.getValue(i), values(i), otherV) - i = mask.nextSetBit(i + 1) - } - new VertexPartition(index, newValues, mask) - } - } - - /** Left outer join another iterator of messages. */ - def leftJoin[VD2: ClassTag, VD3: ClassTag] - (other: Iterator[(VertexId, VD2)]) - (f: (VertexId, VD, Option[VD2]) => VD3): VertexPartition[VD3] = { - leftJoin(createUsingIndex(other))(f) - } - - /** Inner join another VertexPartition. */ - def innerJoin[U: ClassTag, VD2: ClassTag](other: VertexPartition[U]) - (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = { - if (index != other.index) { - logWarning("Joining two VertexPartitions with different indexes is slow.") - innerJoin(createUsingIndex(other.iterator))(f) - } else { - val newMask = mask & other.mask - val newValues = new Array[VD2](capacity) - var i = newMask.nextSetBit(0) - while (i >= 0) { - newValues(i) = f(index.getValue(i), values(i), other.values(i)) - i = newMask.nextSetBit(i + 1) - } - new VertexPartition(index, newValues, newMask) - } - } +private[graphx] class VertexPartitionOps[VD: ClassTag](self: VertexPartition[VD]) + extends VertexPartitionBaseOps[VD, VertexPartition](self) { - /** - * Inner join an iterator of messages. - */ - def innerJoin[U: ClassTag, VD2: ClassTag] - (iter: Iterator[Product2[VertexId, U]]) - (f: (VertexId, VD, U) => VD2): VertexPartition[VD2] = { - innerJoin(createUsingIndex(iter))(f) + def withIndex(index: VertexIdToIndexMap): VertexPartition[VD] = { + new VertexPartition(index, self.values, self.mask) } - /** - * Similar effect as aggregateUsingIndex((a, b) => a) - */ - def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]]) - : VertexPartition[VD2] = { - val newMask = new BitSet(capacity) - val newValues = new Array[VD2](capacity) - iter.foreach { case (vid, vdata) => - val pos = index.getPos(vid) - if (pos >= 0) { - newMask.set(pos) - newValues(pos) = vdata - } - } - new VertexPartition[VD2](index, newValues, newMask) + def withValues[VD2: ClassTag](values: Array[VD2]): VertexPartition[VD2] = { + new VertexPartition(self.index, values, self.mask) } - /** - * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in - * the partition, hidden by the bitmask. - */ - def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): VertexPartition[VD] = { - val newMask = new BitSet(capacity) - val newValues = new Array[VD](capacity) - System.arraycopy(values, 0, newValues, 0, newValues.length) - iter.foreach { case (vid, vdata) => - val pos = index.getPos(vid) - if (pos >= 0) { - newMask.set(pos) - newValues(pos) = vdata - } - } - new VertexPartition(index, newValues, newMask) + def withMask(mask: BitSet): VertexPartition[VD] = { + new VertexPartition(self.index, self.values, mask) } - - def aggregateUsingIndex[VD2: ClassTag]( - iter: Iterator[Product2[VertexId, VD2]], - reduceFunc: (VD2, VD2) => VD2): VertexPartition[VD2] = { - val newMask = new BitSet(capacity) - val newValues = new Array[VD2](capacity) - iter.foreach { product => - val vid = product._1 - val vdata = product._2 - val pos = index.getPos(vid) - if (pos >= 0) { - if (newMask.get(pos)) { - newValues(pos) = reduceFunc(newValues(pos), vdata) - } else { // otherwise just store the new value - newMask.set(pos) - newValues(pos) = vdata - } - } - } - new VertexPartition[VD2](index, newValues, newMask) - } - - def replaceActives(iter: Iterator[VertexId]): VertexPartition[VD] = { - val newActiveSet = new VertexSet - iter.foreach(newActiveSet.add(_)) - new VertexPartition(index, values, mask, Some(newActiveSet)) - } - - /** - * Construct a new VertexPartition whose index contains only the vertices in the mask. - */ - def reindex(): VertexPartition[VD] = { - val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD] - val arbitraryMerge = (a: VD, b: VD) => a - for ((k, v) <- this.iterator) { - hashMap.setMerge(k, v, arbitraryMerge) - } - new VertexPartition(hashMap.keySet, hashMap._values, hashMap.keySet.getBitSet) - } - - def iterator: Iterator[(VertexId, VD)] = - mask.iterator.map(ind => (index.getValue(ind), values(ind))) - - def vidIterator: Iterator[VertexId] = mask.iterator.map(ind => index.getValue(ind)) } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala new file mode 100644 index 000000000..84a18450d --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.BitSet +import org.apache.spark.util.collection.PrimitiveVector + +private[graphx] object VertexPartitionBase { + // Same effect as initFrom(iter, (a, b) => b) + def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)]) + : (VertexIdToIndexMap, Array[VD], BitSet) = { + val map = new PrimitiveKeyOpenHashMap[VertexId, VD] + iter.foreach { case (k, v) => + map(k) = v + } + (map.keySet, map._values, map.keySet.getBitSet) + } + + def initFrom[VD: ClassTag](iter: Iterator[(VertexId, VD)], mergeFunc: (VD, VD) => VD) + : (VertexIdToIndexMap, Array[VD], BitSet) = { + val map = new PrimitiveKeyOpenHashMap[VertexId, VD] + iter.foreach { case (k, v) => + map.setMerge(k, v, mergeFunc) + } + (map.keySet, map._values, map.keySet.getBitSet) + } +} + +private[graphx] +abstract class VertexPartitionBase[ + @specialized(Long, Int, Double) VD: ClassTag] { + + def index: VertexIdToIndexMap + def values: Array[VD] + def mask: BitSet + + val capacity: Int = index.capacity + + def size: Int = mask.cardinality() + + /** Return the vertex attribute for the given vertex ID. */ + def apply(vid: VertexId): VD = values(index.getPos(vid)) + + def isDefined(vid: VertexId): Boolean = { + val pos = index.getPos(vid) + pos >= 0 && mask.get(pos) + } + + def iterator: Iterator[(VertexId, VD)] = + mask.iterator.map(ind => (index.getValue(ind), values(ind))) +} + +trait VertexPartitionBaseOpsConstructor[T[X] <: VertexPartitionBase[X]] { + def toOps[VD: ClassTag](partition: T[VD]): VertexPartitionBaseOps[VD, T] +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala new file mode 100644 index 000000000..13ebc528a --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.impl + +import scala.reflect.ClassTag + +import org.apache.spark.Logging +import org.apache.spark.graphx._ +import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap +import org.apache.spark.util.collection.BitSet +import org.apache.spark.util.collection.PrimitiveVector + +abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: VertexPartitionBase[X]] + (self: T[VD]) + (implicit ev: VertexPartitionBaseOpsConstructor[T]) + extends Logging { + + def withIndex(index: VertexIdToIndexMap): T[VD] + def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2] + def withMask(mask: BitSet): T[VD] + + /** + * Pass each vertex attribute along with the vertex id through a map + * function and retain the original RDD's partitioning and index. + * + * @tparam VD2 the type returned by the map function + * + * @param f the function applied to each vertex id and vertex + * attribute in the RDD + * + * @return a new VertexPartition with values obtained by applying `f` to + * each of the entries in the original VertexRDD. The resulting + * VertexPartition retains the same index. + */ + def map[VD2: ClassTag](f: (VertexId, VD) => VD2): T[VD2] = { + // Construct a view of the map transformation + val newValues = new Array[VD2](self.capacity) + var i = self.mask.nextSetBit(0) + while (i >= 0) { + newValues(i) = f(self.index.getValue(i), self.values(i)) + i = self.mask.nextSetBit(i + 1) + } + this.withValues(newValues) + } + + /** + * Restrict the vertex set to the set of vertices satisfying the given predicate. + * + * @param pred the user defined predicate + * + * @note The vertex set preserves the original index structure which means that the returned + * RDD can be easily joined with the original vertex-set. Furthermore, the filter only + * modifies the bitmap index and so no new values are allocated. + */ + def filter(pred: (VertexId, VD) => Boolean): T[VD] = { + // Allocate the array to store the results into + val newMask = new BitSet(self.capacity) + // Iterate over the active bits in the old mask and evaluate the predicate + var i = self.mask.nextSetBit(0) + while (i >= 0) { + if (pred(self.index.getValue(i), self.values(i))) { + newMask.set(i) + } + i = self.mask.nextSetBit(i + 1) + } + this.withMask(newMask) + } + + /** + * Hides vertices that are the same between this and other. For vertices that are different, keeps + * the values from `other`. The indices of `this` and `other` must be the same. + */ + def diff(other: T[VD]): T[VD] = { + if (self.index != other.index) { + logWarning("Diffing two VertexPartitions with different indexes is slow.") + diff(createUsingIndex(other.iterator)) + } else { + val newMask = self.mask & other.mask + var i = newMask.nextSetBit(0) + while (i >= 0) { + if (self.values(i) == other.values(i)) { + newMask.unset(i) + } + i = newMask.nextSetBit(i + 1) + } + ev.toOps(this.withValues(other.values)).withMask(newMask) + } + } + + /** Left outer join another VertexPartition. */ + def leftJoin[VD2: ClassTag, VD3: ClassTag] + (other: T[VD2]) + (f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = { + if (self.index != other.index) { + logWarning("Joining two VertexPartitions with different indexes is slow.") + leftJoin(createUsingIndex(other.iterator))(f) + } else { + val newValues = new Array[VD3](self.capacity) + + var i = self.mask.nextSetBit(0) + while (i >= 0) { + val otherV: Option[VD2] = if (other.mask.get(i)) Some(other.values(i)) else None + newValues(i) = f(self.index.getValue(i), self.values(i), otherV) + i = self.mask.nextSetBit(i + 1) + } + this.withValues(newValues) + } + } + + /** Left outer join another iterator of messages. */ + def leftJoin[VD2: ClassTag, VD3: ClassTag] + (other: Iterator[(VertexId, VD2)]) + (f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = { + leftJoin(createUsingIndex(other))(f) + } + + /** Inner join another VertexPartition. */ + def innerJoin[U: ClassTag, VD2: ClassTag] + (other: T[U]) + (f: (VertexId, VD, U) => VD2): T[VD2] = { + if (self.index != other.index) { + logWarning("Joining two VertexPartitions with different indexes is slow.") + innerJoin(createUsingIndex(other.iterator))(f) + } else { + val newMask = self.mask & other.mask + val newValues = new Array[VD2](self.capacity) + var i = newMask.nextSetBit(0) + while (i >= 0) { + newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i)) + i = newMask.nextSetBit(i + 1) + } + ev.toOps(this.withValues(newValues)).withMask(newMask) + } + } + + /** + * Inner join an iterator of messages. + */ + def innerJoin[U: ClassTag, VD2: ClassTag] + (iter: Iterator[Product2[VertexId, U]]) + (f: (VertexId, VD, U) => VD2): T[VD2] = { + innerJoin(createUsingIndex(iter))(f) + } + + /** + * Similar effect as aggregateUsingIndex((a, b) => a) + */ + def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]]) + : T[VD2] = { + val newMask = new BitSet(self.capacity) + val newValues = new Array[VD2](self.capacity) + iter.foreach { case (vid, vdata) => + val pos = self.index.getPos(vid) + if (pos >= 0) { + newMask.set(pos) + newValues(pos) = vdata + } + } + ev.toOps(this.withValues(newValues)).withMask(newMask) + } + + /** + * Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in + * the partition, hidden by the bitmask. + */ + def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): T[VD] = { + val newMask = new BitSet(self.capacity) + val newValues = new Array[VD](self.capacity) + System.arraycopy(self.values, 0, newValues, 0, newValues.length) + iter.foreach { case (vid, vdata) => + val pos = self.index.getPos(vid) + if (pos >= 0) { + newMask.set(pos) + newValues(pos) = vdata + } + } + ev.toOps(this.withValues(newValues)).withMask(newMask) + } + + def aggregateUsingIndex[VD2: ClassTag]( + iter: Iterator[Product2[VertexId, VD2]], + reduceFunc: (VD2, VD2) => VD2): T[VD2] = { + val newMask = new BitSet(self.capacity) + val newValues = new Array[VD2](self.capacity) + iter.foreach { product => + val vid = product._1 + val vdata = product._2 + val pos = self.index.getPos(vid) + if (pos >= 0) { + if (newMask.get(pos)) { + newValues(pos) = reduceFunc(newValues(pos), vdata) + } else { // otherwise just store the new value + newMask.set(pos) + newValues(pos) = vdata + } + } + } + ev.toOps(this.withValues(newValues)).withMask(newMask) + } + + /** + * Construct a new VertexPartition whose index contains only the vertices in the mask. + */ + def reindex(): T[VD] = { + val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD] + val arbitraryMerge = (a: VD, b: VD) => a + for ((k, v) <- self.iterator) { + hashMap.setMerge(k, v, arbitraryMerge) + } + ev.toOps( + ev.toOps( + this.withIndex(hashMap.keySet)) + .withValues(hashMap._values)) + .withMask(hashMap.keySet.getBitSet) + } +} diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index fa533a512..4a3c3344d 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -58,12 +58,14 @@ object Analytics extends Logging { var outFname = "" var numEPart = 4 var partitionStrategy: Option[PartitionStrategy] = None + var numIterOpt: Option[Int] = None options.foreach{ case ("tol", v) => tol = v.toFloat case ("output", v) => outFname = v case ("numEPart", v) => numEPart = v.toInt case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + case ("numIter", v) => numIterOpt = Some(v.toInt) case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } @@ -80,7 +82,10 @@ object Analytics extends Logging { println("GRAPHX: Number of vertices " + graph.vertices.count) println("GRAPHX: Number of edges " + graph.edges.count) - val pr = graph.pageRank(tol).vertices.cache() + val pr = (numIterOpt match { + case Some(numIter) => PageRank.run(graph, numIter) + case None => PageRank.runUntilConvergence(graph, tol) + }).vertices.cache() println("GRAPHX: Total rank: " + pr.map(_._2).reduce(_ + _)) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 28d34dd9a..4346d483e 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -110,7 +110,7 @@ class GraphSuite extends FunSuite with LocalSparkContext { val p = 100 val verts = 1 to n val graph = Graph.fromEdgeTuples(sc.parallelize(verts.flatMap(x => - verts.filter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0) + verts.withFilter(y => y % x == 0).map(y => (x: VertexId, y: VertexId))), p), 0) assert(graph.edges.partitions.length === p) val partitionedGraph = graph.partitionBy(EdgePartition2D) assert(graph.edges.partitions.length === p) @@ -120,7 +120,13 @@ class GraphSuite extends FunSuite with LocalSparkContext { val part = iter.next()._2 Iterator((part.srcIds ++ part.dstIds).toSet) }.collect - assert(verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) + if (!verts.forall(id => partitionSets.count(_.contains(id)) <= bound)) { + val numFailures = verts.count(id => partitionSets.count(_.contains(id)) > bound) + val failure = verts.maxBy(id => partitionSets.count(_.contains(id))) + fail(("Replication bound test failed for %d/%d vertices. " + + "Example: vertex %d replicated to %d (> %f) partitions.").format( + numFailures, n, failure, partitionSets.count(_.contains(failure)), bound)) + } // This should not be true for the default hash partitioning val partitionSetsUnpartitioned = graph.edges.partitionsRDD.mapPartitions { iter => val part = iter.next()._2 @@ -287,4 +293,16 @@ class GraphSuite extends FunSuite with LocalSparkContext { } } + test("more edge partitions than vertex partitions") { + withSpark { sc => + val verts = sc.parallelize(List((1: VertexId, "a"), (2: VertexId, "b")), 1) + val edges = sc.parallelize(List(Edge(1, 2, 0), Edge(2, 1, 0)), 2) + val graph = Graph(verts, edges) + val triplets = graph.triplets.map(et => (et.srcId, et.dstId, et.srcAttr, et.dstAttr)) + .collect.toSet + assert(triplets === + Set((1: VertexId, 2: VertexId, "a", "b"), (2: VertexId, 1: VertexId, "b", "a"))) + } + } + } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala index e135d1d7a..d2e0c01bc 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala @@ -26,10 +26,16 @@ import org.apache.spark.graphx._ class EdgePartitionSuite extends FunSuite { + def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A, Int] = { + val builder = new EdgePartitionBuilder[A, Int] + for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) } + builder.toEdgePartition + } + test("reverse") { val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0)) val reversedEdges = List(Edge(0, 2, 0), Edge(1, 0, 0), Edge(2, 1, 0)) - val builder = new EdgePartitionBuilder[Int] + val builder = new EdgePartitionBuilder[Int, Nothing] for (e <- edges) { builder.add(e.srcId, e.dstId, e.attr) } @@ -40,7 +46,7 @@ class EdgePartitionSuite extends FunSuite { test("map") { val edges = List(Edge(0, 1, 0), Edge(1, 2, 0), Edge(2, 0, 0)) - val builder = new EdgePartitionBuilder[Int] + val builder = new EdgePartitionBuilder[Int, Nothing] for (e <- edges) { builder.add(e.srcId, e.dstId, e.attr) } @@ -49,11 +55,22 @@ class EdgePartitionSuite extends FunSuite { edges.map(e => e.copy(attr = e.srcId + e.dstId))) } + test("filter") { + val edges = List(Edge(0, 1, 0), Edge(0, 2, 0), Edge(2, 0, 0)) + val builder = new EdgePartitionBuilder[Int, Int] + for (e <- edges) { + builder.add(e.srcId, e.dstId, e.attr) + } + val edgePartition = builder.toEdgePartition + val filtered = edgePartition.filter(et => et.srcId == 0, (vid, attr) => vid == 0 || vid == 1) + assert(filtered.tripletIterator().toList.map(et => (et.srcId, et.dstId)) === List((0L, 1L))) + } + test("groupEdges") { val edges = List( Edge(0, 1, 1), Edge(1, 2, 2), Edge(2, 0, 4), Edge(0, 1, 8), Edge(1, 2, 16), Edge(2, 0, 32)) val groupedEdges = List(Edge(0, 1, 9), Edge(1, 2, 18), Edge(2, 0, 36)) - val builder = new EdgePartitionBuilder[Int] + val builder = new EdgePartitionBuilder[Int, Nothing] for (e <- edges) { builder.add(e.srcId, e.dstId, e.attr) } @@ -61,11 +78,19 @@ class EdgePartitionSuite extends FunSuite { assert(edgePartition.groupEdges(_ + _).iterator.map(_.copy()).toList === groupedEdges) } + test("upgradeIterator") { + val edges = List((0, 1, 0), (1, 0, 0)) + val verts = List((0L, 1), (1L, 2)) + val part = makeEdgePartition(edges).updateVertices(verts.iterator) + assert(part.upgradeIterator(part.iterator).map(_.toTuple).toList === + part.tripletIterator().toList.map(_.toTuple)) + } + test("indexIterator") { val edgesFrom0 = List(Edge(0, 1, 0)) val edgesFrom1 = List(Edge(1, 0, 0), Edge(1, 2, 0)) val sortedEdges = edgesFrom0 ++ edgesFrom1 - val builder = new EdgePartitionBuilder[Int] + val builder = new EdgePartitionBuilder[Int, Nothing] for (e <- Random.shuffle(sortedEdges)) { builder.add(e.srcId, e.dstId, e.attr) } @@ -77,11 +102,6 @@ class EdgePartitionSuite extends FunSuite { } test("innerJoin") { - def makeEdgePartition[A: ClassTag](xs: Iterable[(Int, Int, A)]): EdgePartition[A] = { - val builder = new EdgePartitionBuilder[A] - for ((src, dst, attr) <- xs) { builder.add(src: VertexId, dst: VertexId, attr) } - builder.toEdgePartition - } val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0)) val bList = List((0, 1, 0), (1, 0, 0), (1, 1, 0), (3, 4, 0), (5, 5, 0)) val a = makeEdgePartition(aList) @@ -90,4 +110,14 @@ class EdgePartitionSuite extends FunSuite { assert(a.innerJoin(b) { (src, dst, a, b) => a }.iterator.map(_.copy()).toList === List(Edge(0, 1, 0), Edge(1, 0, 0), Edge(5, 5, 0))) } + + test("isActive, numActives, replaceActives") { + val ep = new EdgePartitionBuilder[Nothing, Nothing].toEdgePartition + .withActiveSet(Iterator(0L, 2L, 0L)) + assert(ep.isActive(0)) + assert(!ep.isActive(1)) + assert(ep.isActive(2)) + assert(!ep.isActive(-1)) + assert(ep.numActives == Some(2)) + } } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala index 9cbb2d2ac..ed3d5401f 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgeTripletIteratorSuite.scala @@ -26,7 +26,7 @@ import org.apache.spark.graphx._ class EdgeTripletIteratorSuite extends FunSuite { test("iterator.toList") { - val builder = new EdgePartitionBuilder[Int] + val builder = new EdgePartitionBuilder[Int, Int] builder.add(1, 2, 0) builder.add(1, 3, 0) builder.add(1, 4, 0) @@ -36,7 +36,7 @@ class EdgeTripletIteratorSuite extends FunSuite { vidmap.add(3) vidmap.add(4) val vs = Array.fill(vidmap.capacity)(0) - val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition) + val iter = new EdgeTripletIterator[Int, Int](vidmap, vs, builder.toEdgePartition, true, true) val result = iter.toList.map(et => (et.srcId, et.dstId)) assert(result === Seq((1, 2), (1, 3), (1, 4))) } diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala index a048d13fd..8bf1384d5 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala @@ -30,17 +30,6 @@ class VertexPartitionSuite extends FunSuite { assert(!vp.isDefined(-1)) } - test("isActive, numActives, replaceActives") { - val vp = VertexPartition(Iterator((0L, 1), (1L, 1))) - .filter { (vid, attr) => vid == 0 } - .replaceActives(Iterator(0, 2, 0)) - assert(vp.isActive(0)) - assert(!vp.isActive(1)) - assert(vp.isActive(2)) - assert(!vp.isActive(-1)) - assert(vp.numActives == Some(2)) - } - test("map") { val vp = VertexPartition(Iterator((0L, 1), (1L, 1))).map { (vid, attr) => 2 } assert(vp(0) === 2)