Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -194,4 +194,6 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
* This internal method is used to implement [[transform()]] and output [[predictionCol]].
*/
protected def predict(features: FeaturesType): Double

def transformInstance(features: FeaturesType): Double = {predict(features)}
}
2 changes: 2 additions & 0 deletions mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, OUT, T]]
*/
protected def createTransformFunc: IN => OUT

def transformInstance(input: IN) : OUT = {this.createTransformFunc(input)}

/**
* Returns the data type of the output column.
*/
Expand Down
36 changes: 18 additions & 18 deletions mllib/src/main/scala/org/apache/spark/ml/feature/Binarizer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,9 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
val outputSchema = transformSchema(dataset.schema, logging = true)
val schema = dataset.schema
val inputType = schema($(inputCol)).dataType
val td = $(threshold)

val binarizerDouble = udf { in: Double => if (in > td) 1.0 else 0.0 }
val binarizerVector = udf { (data: Vector) =>
val indices = ArrayBuilder.make[Int]
val values = ArrayBuilder.make[Double]

data.foreachActive { (index, value) =>
if (value > td) {
indices += index
values += 1.0
}
}

Vectors.sparse(data.size, indices.result(), values.result()).compressed
}

val binarizerDouble = udf{(x: Double) => transformInstance(x)}
val binarizerVector = udf{(x: Vector) => transformInstance(x)}
val metadata = outputSchema($(outputCol)).metadata

inputType match {
case DoubleType =>
dataset.select(col("*"), binarizerDouble(col($(inputCol))).as($(outputCol), metadata))
Expand All @@ -103,6 +87,22 @@ final class Binarizer @Since("1.4.0") (@Since("1.4.0") override val uid: String)
}
}

def transformInstance(in: Double): Double = {if (in > $(threshold)) 1.0 else 0.0}

def transformInstance(data: Vector): Vector = {
val indices = ArrayBuilder.make[Int]
val values = ArrayBuilder.make[Double]
val td = $(threshold)
data.foreachActive { (index, value) =>
if (value > td) {
indices += index
values += 1.0
}
}
Vectors.sparse(data.size, indices.result(), values.result()).compressed
}


@Since("1.4.0")
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,14 +78,15 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema)
val bucketizer = udf { feature: Double =>
Bucketizer.binarySearchForBuckets($(splits), feature)
}
val bucketizer = udf { transformInstance _ }
val newCol = bucketizer(dataset($(inputCol)))
val newField = prepOutputField(dataset.schema)
dataset.withColumn($(outputCol), newCol, newField.metadata)
}

def transformInstance(feature: Double): Double =
{Bucketizer.binarySearchForBuckets($(splits), feature)}

private def prepOutputField(schema: StructType): StructField = {
val buckets = $(splits).sliding(2).map(bucket => bucket.mkString(", ")).toArray
val attr = new NominalAttribute(name = Some($(outputCol)), isOrdinal = Some(true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,12 +148,13 @@ final class ChiSqSelectorModel private[ml] (
override def transform(dataset: Dataset[_]): DataFrame = {
val transformedSchema = transformSchema(dataset.schema, logging = true)
val newField = transformedSchema.last
val selector = udf(transformInstance _)
dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata)
}

def transformInstance(v: Vector): Vector = {
// TODO: Make the transformer natively in ml framework to avoid extra conversion.
val transformer: Vector => Vector = v => chiSqSelector.transform(OldVectors.fromML(v)).asML

val selector = udf(transformer)
dataset.withColumn($(outputCol), selector(col($(featuresCol))), newField.metadata)
chiSqSelector.transform(OldVectors.fromML(v)).asML
}

@Since("1.6.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.{Estimator, Model}
import org.apache.spark.ml.linalg.{Vectors, VectorUDT}
import org.apache.spark.ml.linalg.{Vector, Vectors, VectorUDT}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -246,28 +246,29 @@ class CountVectorizerModel(
val dict = vocabulary.zipWithIndex.toMap
broadcastDict = Some(dataset.sparkSession.sparkContext.broadcast(dict))
}
val vectorizer = udf { transformInstance _ }
dataset.withColumn($(outputCol), vectorizer(col($(inputCol))))
}

def transformInstance(document: Seq[String]): Vector = {
val dictBr = broadcastDict.get
val minTf = $(minTF)
val vectorizer = udf { (document: Seq[String]) =>
val termCounts = new OpenHashMap[Int, Double]
var tokenCount = 0L
document.foreach { term =>
dictBr.value.get(term) match {
case Some(index) => termCounts.changeValue(index, 1.0, _ + 1.0)
case None => // ignore terms not in the vocabulary
}
tokenCount += 1
}
val effectiveMinTF = if (minTf >= 1.0) minTf else tokenCount * minTf
val effectiveCounts = if ($(binary)) {
termCounts.filter(_._2 >= effectiveMinTF).map(p => (p._1, 1.0)).toSeq
} else {
termCounts.filter(_._2 >= effectiveMinTF).toSeq
val termCounts = new OpenHashMap[Int, Double]
var tokenCount = 0L
document.foreach { term =>
dictBr.value.get(term) match {
case Some(index) => termCounts.changeValue(index, 1.0, _ + 1.0)
case None => // ignore terms not in the vocabulary
}

Vectors.sparse(dictBr.value.size, effectiveCounts)
tokenCount += 1
}
dataset.withColumn($(outputCol), vectorizer(col($(inputCol))))
val effectiveMinTF = if (minTf >= 1.0) minTf else tokenCount * minTf
val effectiveCounts = if ($(binary)) {
termCounts.filter(_._2 >= effectiveMinTF).map(p => (p._1, 1.0)).toSeq
} else {
termCounts.filter(_._2 >= effectiveMinTF).toSeq
}
Vectors.sparse(dictBr.value.size, effectiveCounts)
}

@Since("1.5.0")
Expand Down
25 changes: 20 additions & 5 deletions mllib/src/main/scala/org/apache/spark/ml/feature/HashingTF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature
import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.Transformer
import org.apache.spark.ml.attribute.AttributeGroup
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param._
import org.apache.spark.ml.param.shared.{HasInputCol, HasOutputCol}
import org.apache.spark.ml.util._
Expand Down Expand Up @@ -82,26 +83,40 @@ class HashingTF @Since("1.4.0") (@Since("1.4.0") override val uid: String)

/** @group setParam */
@Since("1.2.0")
def setNumFeatures(value: Int): this.type = set(numFeatures, value)
def setNumFeatures(value: Int): this.type = {
set(numFeatures, value)
hashingTF = new feature.HashingTF($(numFeatures)).setBinary($(binary))
this
}

/** @group getParam */
@Since("2.0.0")
def getBinary: Boolean = $(binary)

/** @group setParam */
@Since("2.0.0")
def setBinary(value: Boolean): this.type = set(binary, value)
def setBinary(value: Boolean): this.type = {
set(binary, value)
hashingTF = new feature.HashingTF($(numFeatures)).setBinary($(binary))
this
}

@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val outputSchema = transformSchema(dataset.schema)
val hashingTF = new feature.HashingTF($(numFeatures)).setBinary($(binary))
// TODO: Make the hashingTF.transform natively in ml framework to avoid extra conversion.
val t = udf { terms: Seq[_] => hashingTF.transform(terms).asML }
val t = udf { transformInstance _ }
val metadata = outputSchema($(outputCol)).metadata
dataset.select(col("*"), t(col($(inputCol))).as($(outputCol), metadata))
}

/** Updated by the setters when parameters change */
private var hashingTF = new feature.HashingTF($(numFeatures)).setBinary($(binary))

def transformInstance(terms: Seq[_]): Vector = {
// TODO: Make the hashingTF.transform natively in ml framework to avoid extra conversion.
hashingTF.transform(terms).asML
}

@Since("1.4.0")
override def transformSchema(schema: StructType): StructType = {
val inputType = schema($(inputCol)).dataType
Expand Down
7 changes: 5 additions & 2 deletions mllib/src/main/scala/org/apache/spark/ml/feature/IDF.scala
Original file line number Diff line number Diff line change
Expand Up @@ -133,11 +133,14 @@ class IDFModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
// TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion.
val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML }
val idf = udf { transformInstance _ }
dataset.withColumn($(outputCol), idf(col($(inputCol))))
}

def transformInstance(vec: Vector) : Vector = {
// TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion.
idfModel.transform(OldVectors.fromML(vec)).asML}

@Since("1.4.0")
override def transformSchema(schema: StructType): StructType = {
validateAndTransformSchema(schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,15 @@ class MaxAbsScalerModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val reScale = udf { transformInstance _ }
dataset.withColumn($(outputCol), reScale(col($(inputCol))))
}

def transformInstance(vector: Vector): Vector = {
// TODO: this looks hack, we may have to handle sparse and dense vectors separately.
val maxAbsUnzero = Vectors.dense(maxAbs.toArray.map(x => if (x == 0) 1 else x))
val reScale = udf { (vector: Vector) =>
val brz = vector.asBreeze / maxAbsUnzero.asBreeze
Vectors.fromBreeze(brz)
}
dataset.withColumn($(outputCol), reScale(col($(inputCol))))
val brz = vector.asBreeze / maxAbsUnzero.asBreeze
Vectors.fromBreeze(brz)
}

@Since("2.0.0")
Expand Down
31 changes: 15 additions & 16 deletions mllib/src/main/scala/org/apache/spark/ml/feature/MinMaxScaler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,25 +173,24 @@ class MinMaxScalerModel private[ml] (

@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
val reScale = udf { transformInstance _ }
dataset.withColumn($(outputCol), reScale(col($(inputCol))))
}

def transformInstance(vector: Vector): Vector = {
val originalRange = (originalMax.asBreeze - originalMin.asBreeze).toArray
val minArray = originalMin.toArray

val reScale = udf { (vector: Vector) =>
val scale = $(max) - $(min)

// 0 in sparse vector will probably be rescaled to non-zero
val values = vector.toArray
val size = values.length
var i = 0
while (i < size) {
val raw = if (originalRange(i) != 0) (values(i) - minArray(i)) / originalRange(i) else 0.5
values(i) = raw * scale + $(min)
i += 1
}
Vectors.dense(values)
val scale = $(max) - $(min)
// 0 in sparse vector will probably be rescaled to non-zero
val values = vector.toArray
val size = values.length
var i = 0
while (i < size) {
val raw = if (originalRange(i) != 0) (values(i) - minArray(i)) / originalRange(i) else 0.5
values(i) = raw * scale + $(min)
i += 1
}

dataset.withColumn($(outputCol), reScale(col($(inputCol))))
Vectors.dense(values)
}

@Since("1.5.0")
Expand Down
16 changes: 9 additions & 7 deletions mllib/src/main/scala/org/apache/spark/ml/feature/PCA.scala
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,17 @@ class PCAModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val pcaModel = new feature.PCAModel($(k),
OldMatrices.fromML(pc).asInstanceOf[OldDenseMatrix],
OldVectors.fromML(explainedVariance).asInstanceOf[OldDenseVector])
val pcaOp = udf{ transformInstance _ }
dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
}

// TODO: Make the transformer natively in ml framework to avoid extra conversion.
val transformer: Vector => Vector = v => pcaModel.transform(OldVectors.fromML(v)).asML
lazy val pcaModel = new feature.PCAModel($(k),
OldMatrices.fromML(pc).asInstanceOf[OldDenseMatrix],
OldVectors.fromML(explainedVariance).asInstanceOf[OldDenseVector])

val pcaOp = udf(transformer)
dataset.withColumn($(outputCol), pcaOp(col($(inputCol))))
def transformInstance(v: Vector): Vector = {
// TODO: Make the transformer natively in ml framework to avoid extra conversion.
pcaModel.transform(OldVectors.fromML(v)).asML
}

@Since("1.5.0")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,15 @@ class StandardScalerModel private[ml] (
@Since("2.0.0")
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)
val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean))
val scale = udf(transformInstance _)
dataset.withColumn($(outputCol), scale(col($(inputCol))))
}

// TODO: Make the transformer natively in ml framework to avoid extra conversion.
val transformer: Vector => Vector = v => scaler.transform(OldVectors.fromML(v)).asML
private lazy val scaler = new feature.StandardScalerModel(std, mean, $(withStd), $(withMean))

val scale = udf(transformer)
dataset.withColumn($(outputCol), scale(col($(inputCol))))
def transformInstance(v: Vector) : Vector = {
// TODO: Make the transformer natively in ml framework to avoid extra conversion.
scaler.transform(OldVectors.fromML(v)).asML
}

@Since("1.4.0")
Expand Down
Loading