Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ private[ui] class StageTable(stages: Seq[StageInfo], parent: JobProgressUI) {
val poolName = listener.stageIdToPool.get(s.stageId)
val nameLink =
<a href={"%s/stages/stage?id=%s".format(UIUtils.prependBaseUri(basePath), s.stageId)}>
{s.name}
{s.name + " (%s)".format(s.rddInfo.name)}
</a>
val description = listener.stageIdToDescription.get(s.stageId)
.map(d => <div><em>{d}</em></div><div>{nameLink}</div>).getOrElse(nameLink)
Expand Down
72 changes: 48 additions & 24 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,20 @@ package org.apache.spark.graphx

import scala.reflect.{classTag, ClassTag}

import org.apache.spark.Logging
import org.apache.spark.{OneToOneDependency, Partition, Partitioner, TaskContext}
import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

/**
* `EdgeRDD[ED]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each partition
* for performance.
*/
class EdgeRDD[@specialized ED: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD)))
with Logging {

partitionsRDD.setName("EdgeRDD")

Expand All @@ -45,33 +47,41 @@ class EdgeRDD[@specialized ED: ClassTag](
partitionsRDD.partitioner.orElse(Some(Partitioner.defaultPartitioner(partitionsRDD)))

override def compute(part: Partition, context: TaskContext): Iterator[Edge[ED]] = {
val p = firstParent[(PartitionID, EdgePartition[ED])].iterator(part, context)
p.next._2.iterator.map(_.copy())
val p = firstParent[(PartitionID, EdgePartition[ED, VD])].iterator(part, context)
if (p.hasNext) {
p.next._2.iterator.map(_.copy())
} else {
Iterator.empty
}
}

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

override def persist(newLevel: StorageLevel): EdgeRDD[ED] = {
override def persist(newLevel: StorageLevel): EdgeRDD[ED, VD] = {
partitionsRDD.persist(newLevel)
this
}

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY)
override def persist(): EdgeRDD[ED, VD] = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
override def cache(): EdgeRDD[ED] = persist()
override def cache(): EdgeRDD[ED, VD] = persist()

override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = {
override def unpersist(blocking: Boolean = true): EdgeRDD[ED, VD] = {
partitionsRDD.unpersist(blocking)
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag](
f: (PartitionID, EdgePartition[ED]) => EdgePartition[ED2]): EdgeRDD[ED2] = {
new EdgeRDD[ED2](partitionsRDD.mapPartitions({ iter =>
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
} else {
Iterator.empty
}
}, preservesPartitioning = true))
}

Expand All @@ -82,15 +92,21 @@ class EdgeRDD[@specialized ED: ClassTag](
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2] =
def mapValues[ED2: ClassTag](f: Edge[ED] => ED2): EdgeRDD[ED2, VD] =
mapEdgePartitions((pid, part) => part.map(f))

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse: EdgeRDD[ED] = mapEdgePartitions((pid, part) => part.reverse)
def reverse: EdgeRDD[ED, VD] = mapEdgePartitions((pid, part) => part.reverse)

def filter(
epred: EdgeTriplet[VD, ED] => Boolean,
vpred: (VertexId, VD) => Boolean): EdgeRDD[ED, VD] = {
mapEdgePartitions((pid, part) => part.filter(epred, vpred))
}

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
Expand All @@ -102,19 +118,27 @@ class EdgeRDD[@specialized ED: ClassTag](
* with values supplied by `f`
*/
def innerJoin[ED2: ClassTag, ED3: ClassTag]
(other: EdgeRDD[ED2])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] = {
(other: EdgeRDD[ED2, _])
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
if (thisIter.hasNext && otherIter.hasNext) {
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
} else {
if (thisIter.hasNext != otherIter.hasNext) {
logError("innerJoin: Dropped non-empty edge partition from `%s`".format(
if (thisIter.hasNext) "this" else "other"))
}
Iterator.empty
}
})
}

private[graphx] def collectVertexIds(): RDD[VertexId] = {
partitionsRDD.flatMap { case (_, p) => Array.concat(p.srcIds, p.dstIds) }
partitionsRDD.flatMap { case (_, p) => p.vidIterator }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,6 @@ class EdgeTriplet[VD, ED] extends Edge[ED] {
if (srcId == vid) srcAttr else { assert(dstId == vid); dstAttr }

override def toString = ((srcId, srcAttr), (dstId, dstAttr), attr).toString()

def toTuple = ((srcId, srcAttr), (dstId, dstAttr), attr)
}
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
* along with their vertex data.
*
*/
val edges: EdgeRDD[ED]
val edges: EdgeRDD[ED, VD]

/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ class GraphKryoRegistrator extends KryoRegistrator {
kryo.register(classOf[Edge[Object]])
kryo.register(classOf[MessageToPartition[Object]])
kryo.register(classOf[VertexBroadcastMsg[Object]])
kryo.register(classOf[RoutingTableMessage])
kryo.register(classOf[(VertexId, Object)])
kryo.register(classOf[EdgePartition[Object]])
kryo.register(classOf[EdgePartition[Object, Object]])
kryo.register(classOf[BitSet])
kryo.register(classOf[VertexIdToIndexMap])
kryo.register(classOf[VertexAttributeBlock[Object]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ object GraphLoader extends Logging {
val startTime = System.currentTimeMillis

// Parse the edge data table directly into edge partitions
val edges = sc.textFile(path, minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int]
val edges = sc.textFile(path, minEdgePartitions).coalesce(minEdgePartitions).mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[Int, Int]
iter.foreach { line =>
if (!line.isEmpty && line(0) != '#') {
val lineArray = line.split("\\s+")
Expand All @@ -78,7 +78,7 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
}.cache()
}.cache().setName("GraphLoader.edgeListFile - raw edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))
Expand Down
6 changes: 3 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,19 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
* The in-degree of each vertex in the graph.
* @note Vertices with no in-edges are not returned in the resulting RDD.
*/
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In)
lazy val inDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.In).setName("GraphOps.inDegrees")

/**
* The out-degree of each vertex in the graph.
* @note Vertices with no out-edges are not returned in the resulting RDD.
*/
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out)
lazy val outDegrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Out).setName("GraphOps.outDegrees")

/**
* The degree of each vertex in the graph.
* @note Vertices with no edges are not returned in the resulting RDD.
*/
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either)
lazy val degrees: VertexRDD[Int] = degreesRDD(EdgeDirection.Either).setName("GraphOps.degrees")

/**
* Computes the neighboring vertex degrees.
Expand Down
6 changes: 5 additions & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Pregel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.graphx

import scala.reflect.ClassTag
import org.apache.spark.Logging


/**
Expand Down Expand Up @@ -52,7 +53,7 @@ import scala.reflect.ClassTag
* }}}
*
*/
object Pregel {
object Pregel extends Logging {

/**
* Execute a Pregel-like iterative vertex-parallel abstraction. The
Expand Down Expand Up @@ -142,6 +143,9 @@ object Pregel {
// hides oldMessages (depended on by newVerts), newVerts (depended on by messages), and the
// vertices of prevG (depended on by newVerts, oldMessages, and the vertices of g).
activeMessages = messages.count()

logInfo("Pregel finished iteration " + i)

// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking=false)
newVerts.unpersist(blocking=false)
Expand Down
Loading