diff --git a/R/pkg/R/mllib_fpm.R b/R/pkg/R/mllib_fpm.R index e2394906d801..4ad34fe82328 100644 --- a/R/pkg/R/mllib_fpm.R +++ b/R/pkg/R/mllib_fpm.R @@ -116,10 +116,11 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"), # Get association rules. #' @return A \code{SparkDataFrame} with association rules. -#' The \code{SparkDataFrame} contains three columns: +#' The \code{SparkDataFrame} contains four columns: #' \code{antecedent} (an array of the same type as the input column), #' \code{consequent} (an array of the same type as the input column), -#' and \code{condfidence} (confidence). +#' \code{condfidence} (confidence for the rule) +#' and \code{lift} (lift for the rule) #' @rdname spark.fpGrowth #' @aliases associationRules,FPGrowthModel-method #' @note spark.associationRules(FPGrowthModel) since 2.2.0 diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R index 69dda52f0c27..d80f66a25de1 100644 --- a/R/pkg/tests/fulltests/test_mllib_fpm.R +++ b/R/pkg/tests/fulltests/test_mllib_fpm.R @@ -44,7 +44,8 @@ test_that("spark.fpGrowth", { expected_association_rules <- data.frame( antecedent = I(list(list("2"), list("3"))), consequent = I(list(list("1"), list("1"))), - confidence = c(1, 1) + confidence = c(1, 1), + lift = c(1, 1) ) expect_equivalent(expected_association_rules, collect(spark.associationRules(model))) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index 85c483c387ad..840a89b76d26 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -20,6 +20,8 @@ package org.apache.spark.ml.fpm import scala.reflect.ClassTag import org.apache.hadoop.fs.Path +import org.json4s.{DefaultFormats, JObject} +import org.json4s.JsonDSL._ import org.apache.spark.annotation.{Experimental, Since} import org.apache.spark.ml.{Estimator, Model} @@ -34,6 +36,7 @@ import org.apache.spark.sql._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.VersionUtils /** * Common params for FPGrowth and FPGrowthModel @@ -175,7 +178,8 @@ class FPGrowth @Since("2.2.0") ( if (handlePersistence) { items.persist(StorageLevel.MEMORY_AND_DISK) } - + val inputRowCount = items.count() + instr.logNumExamples(inputRowCount) val parentModel = mllibFP.run(items) val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq)) val schema = StructType(Seq( @@ -187,7 +191,8 @@ class FPGrowth @Since("2.2.0") ( items.unpersist() } - copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this) + copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport, inputRowCount)) + .setParent(this) } @Since("2.2.0") @@ -217,7 +222,9 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] { @Experimental class FPGrowthModel private[ml] ( @Since("2.2.0") override val uid: String, - @Since("2.2.0") @transient val freqItemsets: DataFrame) + @Since("2.2.0") @transient val freqItemsets: DataFrame, + private val itemSupport: scala.collection.Map[Any, Double], + private val numTrainingRecords: Long) extends Model[FPGrowthModel] with FPGrowthParams with MLWritable { /** @group setParam */ @@ -241,9 +248,9 @@ class FPGrowthModel private[ml] ( @transient private var _cachedRules: DataFrame = _ /** - * Get association rules fitted using the minConfidence. Returns a dataframe - * with three fields, "antecedent", "consequent" and "confidence", where "antecedent" and - * "consequent" are Array[T] and "confidence" is Double. + * Get association rules fitted using the minConfidence. Returns a dataframe with four fields, + * "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are + * Array[T], whereas "confidence" and "lift" are Double. */ @Since("2.2.0") @transient def associationRules: DataFrame = { @@ -251,7 +258,7 @@ class FPGrowthModel private[ml] ( _cachedRules } else { _cachedRules = AssociationRules - .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence)) + .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport) _cachedMinConf = $(minConfidence) _cachedRules } @@ -301,7 +308,7 @@ class FPGrowthModel private[ml] ( @Since("2.2.0") override def copy(extra: ParamMap): FPGrowthModel = { - val copied = new FPGrowthModel(uid, freqItemsets) + val copied = new FPGrowthModel(uid, freqItemsets, itemSupport, numTrainingRecords) copyValues(copied, extra).setParent(this.parent) } @@ -323,7 +330,8 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { class FPGrowthModelWriter(instance: FPGrowthModel) extends MLWriter { override protected def saveImpl(path: String): Unit = { - DefaultParamsWriter.saveMetadata(instance, path, sc) + val extraMetadata: JObject = Map("numTrainingRecords" -> instance.numTrainingRecords) + DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata = Some(extraMetadata)) val dataPath = new Path(path, "data").toString instance.freqItemsets.write.parquet(dataPath) } @@ -335,10 +343,28 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] { private val className = classOf[FPGrowthModel].getName override def load(path: String): FPGrowthModel = { + implicit val format = DefaultFormats val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val (major, minor) = VersionUtils.majorMinorVersion(metadata.sparkVersion) + val numTrainingRecords = if (major.toInt < 2 || (major.toInt == 2 && minor.toInt < 4)) { + // 2.3 and before don't store the count + 0L + } else { + // 2.4+ + (metadata.metadata \ "numTrainingRecords").extract[Long] + } val dataPath = new Path(path, "data").toString val frequentItems = sparkSession.read.parquet(dataPath) - val model = new FPGrowthModel(metadata.uid, frequentItems) + val itemSupport = if (numTrainingRecords == 0L) { + Map.empty[Any, Double] + } else { + frequentItems.rdd.flatMap { + case Row(items: Seq[_], count: Long) if items.length == 1 => + Some(items.head -> count.toDouble / numTrainingRecords) + case _ => None + }.collectAsMap() + } + val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport, numTrainingRecords) metadata.getAndSetParams(model) model } @@ -354,27 +380,30 @@ private[fpm] object AssociationRules { * @param itemsCol column name for frequent itemsets * @param freqCol column name for appearance count of the frequent itemsets * @param minConfidence minimum confidence for generating the association rules - * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]) - * containing the association rules. + * @param itemSupport map containing an item and its support + * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double], + * "lift" [Double]) containing the association rules. */ def getAssociationRulesFromFP[T: ClassTag]( dataset: Dataset[_], itemsCol: String, freqCol: String, - minConfidence: Double): DataFrame = { + minConfidence: Double, + itemSupport: scala.collection.Map[T, Double]): DataFrame = { val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd .map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1))) val rows = new MLlibAssociationRules() .setMinConfidence(minConfidence) - .run(freqItemSetRdd) - .map(r => Row(r.antecedent, r.consequent, r.confidence)) + .run(freqItemSetRdd, itemSupport) + .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull)) val dt = dataset.schema(itemsCol).dataType val schema = StructType(Seq( StructField("antecedent", dt, nullable = false), StructField("consequent", dt, nullable = false), - StructField("confidence", DoubleType, nullable = false))) + StructField("confidence", DoubleType, nullable = false), + StructField("lift", DoubleType))) val rules = dataset.sparkSession.createDataFrame(rows, schema) rules } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index acb83ac31aff..43d256bbc46c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -56,11 +56,24 @@ class AssociationRules private[fpm] ( /** * Computes the association rules with confidence above `minConfidence`. * @param freqItemsets frequent itemset model obtained from [[FPGrowth]] - * @return a `Set[Rule[Item]]` containing the association rules. + * @return a `RDD[Rule[Item]]` containing the association rules. * */ @Since("1.5.0") def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = { + run(freqItemsets, Map.empty[Item, Double]) + } + + /** + * Computes the association rules with confidence above `minConfidence`. + * @param freqItemsets frequent itemset model obtained from [[FPGrowth]] + * @param itemSupport map containing an item and its support + * @return a `RDD[Rule[Item]]` containing the association rules. The rules will be able to + * compute also the lift metric. + */ + @Since("2.4.0") + def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]], + itemSupport: scala.collection.Map[Item, Double]): RDD[Rule[Item]] = { // For candidate rule X => Y, generate (X, (Y, freq(X union Y))) val candidates = freqItemsets.flatMap { itemset => val items = itemset.items @@ -76,8 +89,13 @@ class AssociationRules private[fpm] ( // Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq))) .map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) => - new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent) - }.filter(_.confidence >= minConfidence) + new Rule(antecendent.toArray, + consequent.toArray, + freqUnion, + freqAntecedent, + // the consequent contains always only one element + itemSupport.get(consequent.head)) + }.filter(_.confidence >= minConfidence) } /** @@ -107,14 +125,21 @@ object AssociationRules { @Since("1.5.0") val antecedent: Array[Item], @Since("1.5.0") val consequent: Array[Item], freqUnion: Double, - freqAntecedent: Double) extends Serializable { + freqAntecedent: Double, + freqConsequent: Option[Double]) extends Serializable { /** * Returns the confidence of the rule. * */ @Since("1.5.0") - def confidence: Double = freqUnion.toDouble / freqAntecedent + def confidence: Double = freqUnion / freqAntecedent + + /** + * Returns the lift of the rule. + */ + @Since("2.4.0") + def lift: Option[Double] = freqConsequent.map(fCons => confidence / fCons) require(antecedent.toSet.intersect(consequent.toSet).isEmpty, { val sharedItems = antecedent.toSet.intersect(consequent.toSet) @@ -142,7 +167,7 @@ object AssociationRules { override def toString: String = { s"${antecedent.mkString("{", ",", "}")} => " + - s"${consequent.mkString("{", ",", "}")}: ${confidence}" + s"${consequent.mkString("{", ",", "}")}: (confidence: $confidence; lift: $lift)" } } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala index 4f2b7e6f0764..3a1bc35186dc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala @@ -48,9 +48,14 @@ import org.apache.spark.storage.StorageLevel * @tparam Item item type */ @Since("1.3.0") -class FPGrowthModel[Item: ClassTag] @Since("1.3.0") ( - @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]]) +class FPGrowthModel[Item: ClassTag] @Since("2.4.0") ( + @Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]], + @Since("2.4.0") val itemSupport: Map[Item, Double]) extends Saveable with Serializable { + + @Since("1.3.0") + def this(freqItemsets: RDD[FreqItemset[Item]]) = this(freqItemsets, Map.empty) + /** * Generates association rules for the `Item`s in [[freqItemsets]]. * @param confidence minimal confidence of the rules produced @@ -58,7 +63,7 @@ class FPGrowthModel[Item: ClassTag] @Since("1.3.0") ( @Since("1.5.0") def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = { val associationRules = new AssociationRules(confidence) - associationRules.run(freqItemsets) + associationRules.run(freqItemsets, itemSupport) } /** @@ -213,9 +218,12 @@ class FPGrowth private[spark] ( val minCount = math.ceil(minSupport * count).toLong val numParts = if (numPartitions > 0) numPartitions else data.partitions.length val partitioner = new HashPartitioner(numParts) - val freqItems = genFreqItems(data, minCount, partitioner) - val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner) - new FPGrowthModel(freqItemsets) + val freqItemsCount = genFreqItems(data, minCount, partitioner) + val freqItemsets = genFreqItemsets(data, minCount, freqItemsCount.map(_._1), partitioner) + val itemSupport = freqItemsCount.map { + case (item, cnt) => item -> cnt.toDouble / count + }.toMap + new FPGrowthModel(freqItemsets, itemSupport) } /** @@ -231,12 +239,12 @@ class FPGrowth private[spark] ( * Generates frequent items by filtering the input data using minimal support level. * @param minCount minimum count for frequent itemsets * @param partitioner partitioner used to distribute items - * @return array of frequent pattern ordered by their frequencies + * @return array of frequent patterns and their frequencies ordered by their frequencies */ private def genFreqItems[Item: ClassTag]( data: RDD[Array[Item]], minCount: Long, - partitioner: Partitioner): Array[Item] = { + partitioner: Partitioner): Array[(Item, Long)] = { data.flatMap { t => val uniq = t.toSet if (t.length != uniq.size) { @@ -248,7 +256,6 @@ class FPGrowth private[spark] ( .filter(_._2 >= minCount) .collect() .sortBy(-_._2) - .map(_._1) } /** diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala index 87f8b9034dde..b75526a48371 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala @@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul val model = new FPGrowth().setMinSupport(0.5).fit(data) val generatedRules = model.setMinConfidence(0.5).associationRules val expectedRules = spark.createDataFrame(Seq( - (Array("2"), Array("1"), 1.0), - (Array("1"), Array("2"), 0.75) - )).toDF("antecedent", "consequent", "confidence") + (Array("2"), Array("1"), 1.0, 1.0), + (Array("1"), Array("2"), 0.75, 1.0) + )).toDF("antecedent", "consequent", "confidence", "lift") .withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) .withColumn("consequent", col("consequent").cast(ArrayType(dt))) assert(expectedRules.sort("antecedent").rdd.collect().sameElements( diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 4f250c9943ed..62f8b1af50a6 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,10 @@ object MimaExcludes { // Exclude rules for 2.4.x lazy val v24excludes = v23excludes ++ Seq( + // [SPARK-10697][ML] Add lift to Association rules + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"), + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"), + // [SPARK-25044] Address translation of LMF closure primitive args to Object in Scala 2.12 ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.apply"), diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index f9394421e0cc..c2b29b73460f 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -145,10 +145,11 @@ def freqItemsets(self): @since("2.2.0") def associationRules(self): """ - DataFrame with three columns: + DataFrame with four columns: * `antecedent` - Array of the same type as the input column. * `consequent` - Array of the same type as the input column. * `confidence` - Confidence for the rule (`DoubleType`). + * `lift` - Lift for the rule (`DoubleType`). """ return self._call_java("associationRules") diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 5c87d1de4139..625d9927f706 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -2158,8 +2158,8 @@ def test_association_rules(self): fpm = fp.fit(self.data) expected_association_rules = self.spark.createDataFrame( - [([3], [1], 1.0), ([2], [1], 1.0)], - ["antecedent", "consequent", "confidence"] + [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)], + ["antecedent", "consequent", "confidence", "lift"] ) actual_association_rules = fpm.associationRules