Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Convert implicit parameter to context bound
  • Loading branch information
ankurdave committed May 7, 2014
commit 04d3ae563519e501b37d93c7496f74e9e11f395d
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.graphx.impl

import scala.language.higherKinds
import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark.Logging
Expand All @@ -31,14 +32,14 @@ import org.apache.spark.graphx.util.collection.PrimitiveKeyOpenHashMap
* implicit evidence of membership in the `VertexPartitionBaseOpsConstructor` typeclass (for
* example, [[VertexPartition.VertexPartitionOpsConstructor]]).
*/
private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: VertexPartitionBase[X]]
(self: T[VD])
(implicit ev: VertexPartitionBaseOpsConstructor[T])
extends Logging {
private[graphx] abstract class VertexPartitionBaseOps
[VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
(self: Self[VD])
extends Logging {

def withIndex(index: VertexIdToIndexMap): T[VD]
def withValues[VD2: ClassTag](values: Array[VD2]): T[VD2]
def withMask(mask: BitSet): T[VD]
def withIndex(index: VertexIdToIndexMap): Self[VD]
def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]
def withMask(mask: BitSet): Self[VD]

/**
* Pass each vertex attribute along with the vertex id through a map
Expand All @@ -53,7 +54,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
* each of the entries in the original VertexRDD. The resulting
* VertexPartition retains the same index.
*/
def map[VD2: ClassTag](f: (VertexId, VD) => VD2): T[VD2] = {
def map[VD2: ClassTag](f: (VertexId, VD) => VD2): Self[VD2] = {
// Construct a view of the map transformation
val newValues = new Array[VD2](self.capacity)
var i = self.mask.nextSetBit(0)
Expand All @@ -73,7 +74,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
* RDD can be easily joined with the original vertex-set. Furthermore, the filter only
* modifies the bitmap index and so no new values are allocated.
*/
def filter(pred: (VertexId, VD) => Boolean): T[VD] = {
def filter(pred: (VertexId, VD) => Boolean): Self[VD] = {
// Allocate the array to store the results into
val newMask = new BitSet(self.capacity)
// Iterate over the active bits in the old mask and evaluate the predicate
Expand All @@ -91,7 +92,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
* Hides vertices that are the same between this and other. For vertices that are different, keeps
* the values from `other`. The indices of `this` and `other` must be the same.
*/
def diff(other: T[VD]): T[VD] = {
def diff(other: Self[VD]): Self[VD] = {
if (self.index != other.index) {
logWarning("Diffing two VertexPartitions with different indexes is slow.")
diff(createUsingIndex(other.iterator))
Expand All @@ -104,14 +105,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
}
i = newMask.nextSetBit(i + 1)
}
ev.toOps(this.withValues(other.values)).withMask(newMask)
toOps(this.withValues(other.values)).withMask(newMask)
}
}

/** Left outer join another VertexPartition. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: T[VD2])
(f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
(other: Self[VD2])
(f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
if (self.index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
leftJoin(createUsingIndex(other.iterator))(f)
Expand All @@ -131,14 +132,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
/** Left outer join another iterator of messages. */
def leftJoin[VD2: ClassTag, VD3: ClassTag]
(other: Iterator[(VertexId, VD2)])
(f: (VertexId, VD, Option[VD2]) => VD3): T[VD3] = {
(f: (VertexId, VD, Option[VD2]) => VD3): Self[VD3] = {
leftJoin(createUsingIndex(other))(f)
}

/** Inner join another VertexPartition. */
def innerJoin[U: ClassTag, VD2: ClassTag]
(other: T[U])
(f: (VertexId, VD, U) => VD2): T[VD2] = {
(other: Self[U])
(f: (VertexId, VD, U) => VD2): Self[VD2] = {
if (self.index != other.index) {
logWarning("Joining two VertexPartitions with different indexes is slow.")
innerJoin(createUsingIndex(other.iterator))(f)
Expand All @@ -150,7 +151,7 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
newValues(i) = f(self.index.getValue(i), self.values(i), other.values(i))
i = newMask.nextSetBit(i + 1)
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}
}

Expand All @@ -159,15 +160,15 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
*/
def innerJoin[U: ClassTag, VD2: ClassTag]
(iter: Iterator[Product2[VertexId, U]])
(f: (VertexId, VD, U) => VD2): T[VD2] = {
(f: (VertexId, VD, U) => VD2): Self[VD2] = {
innerJoin(createUsingIndex(iter))(f)
}

/**
* Similar effect as aggregateUsingIndex((a, b) => a)
*/
def createUsingIndex[VD2: ClassTag](iter: Iterator[Product2[VertexId, VD2]])
: T[VD2] = {
: Self[VD2] = {
val newMask = new BitSet(self.capacity)
val newValues = new Array[VD2](self.capacity)
iter.foreach { case (vid, vdata) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto on the foreach with case

Expand All @@ -177,14 +178,14 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
newValues(pos) = vdata
}
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}

/**
* Similar to innerJoin, but vertices from the left side that don't appear in iter will remain in
* the partition, hidden by the bitmask.
*/
def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): T[VD] = {
def innerJoinKeepLeft(iter: Iterator[Product2[VertexId, VD]]): Self[VD] = {
val newMask = new BitSet(self.capacity)
val newValues = new Array[VD](self.capacity)
System.arraycopy(self.values, 0, newValues, 0, newValues.length)
Expand All @@ -195,12 +196,12 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
newValues(pos) = vdata
}
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}

def aggregateUsingIndex[VD2: ClassTag](
iter: Iterator[Product2[VertexId, VD2]],
reduceFunc: (VD2, VD2) => VD2): T[VD2] = {
reduceFunc: (VD2, VD2) => VD2): Self[VD2] = {
val newMask = new BitSet(self.capacity)
val newValues = new Array[VD2](self.capacity)
iter.foreach { product =>
Expand All @@ -216,22 +217,29 @@ private[graphx] abstract class VertexPartitionBaseOps[VD: ClassTag, T[X] <: Vert
}
}
}
ev.toOps(this.withValues(newValues)).withMask(newMask)
this.withValues(newValues).withMask(newMask)
}

/**
* Construct a new VertexPartition whose index contains only the vertices in the mask.
*/
def reindex(): T[VD] = {
def reindex(): Self[VD] = {
val hashMap = new PrimitiveKeyOpenHashMap[VertexId, VD]
val arbitraryMerge = (a: VD, b: VD) => a
for ((k, v) <- self.iterator) {
hashMap.setMerge(k, v, arbitraryMerge)
}
ev.toOps(
ev.toOps(
this.withIndex(hashMap.keySet))
.withValues(hashMap._values))
.withMask(hashMap.keySet.getBitSet)
this.withIndex(hashMap.keySet).withValues(hashMap._values).withMask(hashMap.keySet.getBitSet)
}

/**
* Converts a vertex partition (in particular, one of type `Self`) into a
* `VertexPartitionBaseOps`. Within this class, this allows chaining the methods defined above,
* because these methods return a `Self` and this implicit conversion re-wraps that in a
* `VertexPartitionBaseOps`. This relies on the context bound on `Self`.
*/
private implicit def toOps[VD2: ClassTag](
partition: Self[VD2]): VertexPartitionBaseOps[VD2, Self] = {
implicitly[VertexPartitionBaseOpsConstructor[Self]].toOps(partition)
}
}