Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert upgradeIterator to if-in-a-loop
  • Loading branch information
ankurdave committed May 7, 2014
commit c88b269b84f2e3ed97d428f8696d7bc11a2d8644
Original file line number Diff line number Diff line change
Expand Up @@ -293,32 +293,7 @@ class EdgePartition[
def upgradeIterator(
edgeIter: Iterator[Edge[ED]], includeSrc: Boolean = true, includeDst: Boolean = true)
: Iterator[EdgeTriplet[VD, ED]] = {
val tripletIter = new Iterator[EdgeTriplet[VD, ED]] {
private[this] val triplet = new EdgeTriplet[VD, ED]
override def hasNext = edgeIter.hasNext
override def next() = {
triplet.set(edgeIter.next())
}
}
val withSrc =
if (includeSrc) {
tripletIter.map { triplet =>
triplet.srcAttr = EdgePartition.this.vertices(triplet.srcId)
triplet
}
} else {
tripletIter
}
val withDst =
if (includeDst) {
withSrc.map { triplet =>
triplet.dstAttr = EdgePartition.this.vertices(triplet.dstId)
triplet
}
} else {
withSrc
}
withDst
new ReusingEdgeTripletIterator(edgeIter, this, includeSrc, includeDst)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ import org.apache.spark.graphx._
import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap

/**
* The Iterator type returned when constructing edge triplets. This class technically could be
* an anonymous class in GraphImpl.triplets, but we name it here explicitly so it is easier to
* debug / profile.
* The Iterator type returned when constructing edge triplets. This could be an anonymous class in
* EdgePartition.tripletIterator, but we name it here explicitly so it is easier to debug / profile.
*/
private[impl]
class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
Expand Down Expand Up @@ -54,3 +53,32 @@ class EdgeTripletIterator[VD: ClassTag, ED: ClassTag](
triplet
}
}

/**
* An Iterator type for internal use that reuses EdgeTriplet objects. This could be an anonymous
* class in EdgePartition.upgradeIterator, but we name it here explicitly so it is easier to debug /
* profile.
*/
private[impl]
class ReusingEdgeTripletIterator[VD: ClassTag, ED: ClassTag](
val edgeIter: Iterator[Edge[ED]],
val edgePartition: EdgePartition[ED, VD],
val includeSrc: Boolean,
val includeDst: Boolean)
extends Iterator[EdgeTriplet[VD, ED]] {

private val triplet = new EdgeTriplet[VD, ED]

override def hasNext = edgeIter.hasNext

override def next() = {
triplet.set(edgeIter.next())
if (includeSrc) {
triplet.srcAttr = edgePartition.vertices(triplet.srcId)
}
if (includeDst) {
triplet.dstAttr = edgePartition.vertices(triplet.dstId)
}
triplet
}
}