Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
559c099
[SPARK-18334] MinHash should use binary hash distance
Nov 7, 2016
517a97b
Remove misleading documentation as requested
Yunni Nov 8, 2016
b546dbd
Add warning for multi-probe in MinHash
Nov 8, 2016
a3cd928
Merge branch 'SPARK-18334-yunn-minhash-bug' of https://github.com/Yun…
Nov 8, 2016
c8243c7
(1) Fix documentation as CR suggested (2) Fix typo in unit test
Nov 9, 2016
6aac8b3
Fix typo in unit test
Nov 9, 2016
9870743
[SPARK-18408] API Improvements for LSH
Nov 14, 2016
0e9250b
(1) Fix description for numHashFunctions (2) Make numEntries in MinHa…
Nov 14, 2016
adbbefe
Add assertion for hashFunction in BucketedRandomProjectionLSHSuite
Nov 14, 2016
c115ed3
Revert AND-amplification for a future PR
Nov 14, 2016
033ae5d
Code Review Comments
Nov 15, 2016
c597f4c
Add unit tests to run on Jenkins.
Nov 16, 2016
d759875
Add unit tests to run on Jenkins.
Nov 16, 2016
596eb06
CR comments
Nov 17, 2016
00d08bf
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Nov 17, 2016
3d0810f
Update comments
Nov 17, 2016
257ef19
Add scaladoc for approximately min-wise independence
Yunni Nov 18, 2016
2c264b7
Change documentation reference
Yunni Nov 18, 2016
36ca278
Removing modulo numEntries
Nov 19, 2016
4508393
Merge branch 'SPARK-18408-yunn-api-improvements' of https://github.co…
Nov 19, 2016
939e9d5
Code Review Comments
Nov 22, 2016
8b9403d
Minimize the test cases by directly using artificial models
Nov 22, 2016
f0ebcb7
Code review comments
Nov 22, 2016
e198080
Merge branch 'master' of https://github.com/apache/spark into SPARK-1…
Yunni Nov 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import org.apache.spark.sql.types.StructType
/**
* :: Experimental ::
*
* Params for [[RandomProjection]].
* Params for [[BucketedRandomProjectionLSH]].
*/
private[ml] trait RandomProjectionParams extends Params {
private[ml] trait BucketedRandomProjectionLSHParams extends Params {

/**
* The length of each hash bucket, a larger bucket lowers the false negative rate. The number of
Expand All @@ -58,8 +58,8 @@ private[ml] trait RandomProjectionParams extends Params {
/**
* :: Experimental ::
*
* Model produced by [[RandomProjection]], where multiple random vectors are stored. The vectors
* are normalized to be unit vectors and each vector is used in a hash function:
* Model produced by [[BucketedRandomProjectionLSH]], where multiple random vectors are stored. The
* vectors are normalized to be unit vectors and each vector is used in a hash function:
* `h_i(x) = floor(r_i.dot(x) / bucketLength)`
* where `r_i` is the i-th random unit vector. The number of buckets will be `(max L2 norm of input
* vectors) / bucketLength`.
Expand All @@ -68,18 +68,19 @@ private[ml] trait RandomProjectionParams extends Params {
*/
@Experimental
@Since("2.1.0")
class RandomProjectionModel private[ml] (
class BucketedRandomProjectionLSHModel private[ml](
override val uid: String,
@Since("2.1.0") val randUnitVectors: Array[Vector])
extends LSHModel[RandomProjectionModel] with RandomProjectionParams {
private[ml] val randUnitVectors: Array[Vector])
extends LSHModel[BucketedRandomProjectionLSHModel] with BucketedRandomProjectionLSHParams {

@Since("2.1.0")
override protected[ml] val hashFunction: (Vector) => Vector = {
override protected[ml] val hashFunction: Vector => Array[Vector] = {
key: Vector => {
val hashValues: Array[Double] = randUnitVectors.map({
randUnitVector => Math.floor(BLAS.dot(key, randUnitVector) / $(bucketLength))
})
Vectors.dense(hashValues)
// TODO: Output vectors of dimension numHashFunctions in SPARK-18450
hashValues.map(Vectors.dense(_))
}
}

Expand All @@ -89,27 +90,29 @@ class RandomProjectionModel private[ml] (
}

@Since("2.1.0")
override protected[ml] def hashDistance(x: Vector, y: Vector): Double = {
override protected[ml] def hashDistance(x: Seq[Vector], y: Seq[Vector]): Double = {
// Since it's generated by hashing, it will be a pair of dense vectors.
x.toDense.values.zip(y.toDense.values).map(pair => math.abs(pair._1 - pair._2)).min
x.zip(y).map(vectorPair => Vectors.sqdist(vectorPair._1, vectorPair._2)).min
}

@Since("2.1.0")
override def copy(extra: ParamMap): this.type = defaultCopy(extra)

@Since("2.1.0")
override def write: MLWriter = new RandomProjectionModel.RandomProjectionModelWriter(this)
override def write: MLWriter = {
new BucketedRandomProjectionLSHModel.BucketedRandomProjectionLSHModelWriter(this)
}
}

/**
* :: Experimental ::
*
* This [[RandomProjection]] implements Locality Sensitive Hashing functions for Euclidean
* distance metrics.
* This [[BucketedRandomProjectionLSH]] implements Locality Sensitive Hashing functions for
* Euclidean distance metrics.
*
* The input is dense or sparse vectors, each of which represents a point in the Euclidean
* distance space. The output will be vectors of configurable dimension. Hash value in the same
* dimension is calculated by the same hash function.
* distance space. The output will be vectors of configurable dimension. Hash values in the
* same dimension are calculated by the same hash function.
*
* References:
*
Expand All @@ -121,8 +124,9 @@ class RandomProjectionModel private[ml] (
*/
@Experimental
@Since("2.1.0")
class RandomProjection(override val uid: String) extends LSH[RandomProjectionModel]
with RandomProjectionParams with HasSeed {
class BucketedRandomProjectionLSH(override val uid: String)
extends LSH[BucketedRandomProjectionLSHModel]
with BucketedRandomProjectionLSHParams with HasSeed {

@Since("2.1.0")
override def setInputCol(value: String): this.type = super.setInputCol(value)
Expand All @@ -131,11 +135,11 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
override def setOutputCol(value: String): this.type = super.setOutputCol(value)

@Since("2.1.0")
override def setOutputDim(value: Int): this.type = super.setOutputDim(value)
override def setNumHashTables(value: Int): this.type = super.setNumHashTables(value)

@Since("2.1.0")
def this() = {
this(Identifiable.randomUID("random projection"))
this(Identifiable.randomUID("brp-lsh"))
}

/** @group setParam */
Expand All @@ -147,15 +151,16 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
def setSeed(value: Long): this.type = set(seed, value)

@Since("2.1.0")
override protected[this] def createRawLSHModel(inputDim: Int): RandomProjectionModel = {
override protected[this] def createRawLSHModel(
inputDim: Int): BucketedRandomProjectionLSHModel = {
val rand = new Random($(seed))
val randUnitVectors: Array[Vector] = {
Array.fill($(outputDim)) {
Array.fill($(numHashTables)) {
val randArray = Array.fill(inputDim)(rand.nextGaussian())
Vectors.fromBreeze(normalize(breeze.linalg.Vector(randArray)))
}
}
new RandomProjectionModel(uid, randUnitVectors)
new BucketedRandomProjectionLSHModel(uid, randUnitVectors)
}

@Since("2.1.0")
Expand All @@ -169,23 +174,25 @@ class RandomProjection(override val uid: String) extends LSH[RandomProjectionMod
}

@Since("2.1.0")
object RandomProjection extends DefaultParamsReadable[RandomProjection] {
object BucketedRandomProjectionLSH extends DefaultParamsReadable[BucketedRandomProjectionLSH] {

@Since("2.1.0")
override def load(path: String): RandomProjection = super.load(path)
override def load(path: String): BucketedRandomProjectionLSH = super.load(path)
}

@Since("2.1.0")
object RandomProjectionModel extends MLReadable[RandomProjectionModel] {
object BucketedRandomProjectionLSHModel extends MLReadable[BucketedRandomProjectionLSHModel] {

@Since("2.1.0")
override def read: MLReader[RandomProjectionModel] = new RandomProjectionModelReader
override def read: MLReader[BucketedRandomProjectionLSHModel] = {
new BucketedRandomProjectionLSHModelReader
}

@Since("2.1.0")
override def load(path: String): RandomProjectionModel = super.load(path)
override def load(path: String): BucketedRandomProjectionLSHModel = super.load(path)

private[RandomProjectionModel] class RandomProjectionModelWriter(instance: RandomProjectionModel)
extends MLWriter {
private[BucketedRandomProjectionLSHModel] class BucketedRandomProjectionLSHModelWriter(
instance: BucketedRandomProjectionLSHModel) extends MLWriter {

// TODO: Save using the existing format of Array[Vector] once SPARK-12878 is resolved.
private case class Data(randUnitVectors: Matrix)
Expand All @@ -203,20 +210,22 @@ object RandomProjectionModel extends MLReadable[RandomProjectionModel] {
}
}

private class RandomProjectionModelReader extends MLReader[RandomProjectionModel] {
private class BucketedRandomProjectionLSHModelReader
extends MLReader[BucketedRandomProjectionLSHModel] {

/** Checked against metadata when loading model */
private val className = classOf[RandomProjectionModel].getName
private val className = classOf[BucketedRandomProjectionLSHModel].getName

override def load(path: String): RandomProjectionModel = {
override def load(path: String): BucketedRandomProjectionLSHModel = {
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)

val dataPath = new Path(path, "data").toString
val data = sparkSession.read.parquet(dataPath)
val Row(randUnitVectors: Matrix) = MLUtils.convertMatrixColumnsToML(data, "randUnitVectors")
.select("randUnitVectors")
.head()
val model = new RandomProjectionModel(metadata.uid, randUnitVectors.rowIter.toArray)
val model = new BucketedRandomProjectionLSHModel(metadata.uid,
randUnitVectors.rowIter.toArray)

DefaultParamsReader.getAndSetParams(model, metadata)
model
Expand Down
Loading