diff --git a/R/pkg/R/mllib_fpm.R b/R/pkg/R/mllib_fpm.R index 0cc7a16c302d..30bc51b93204 100644 --- a/R/pkg/R/mllib_fpm.R +++ b/R/pkg/R/mllib_fpm.R @@ -122,11 +122,12 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"), # Get association rules. #' @return A \code{SparkDataFrame} with association rules. -#' The \code{SparkDataFrame} contains four columns: +#' The \code{SparkDataFrame} contains five columns: #' \code{antecedent} (an array of the same type as the input column), #' \code{consequent} (an array of the same type as the input column), #' \code{condfidence} (confidence for the rule) -#' and \code{lift} (lift for the rule) +#' \code{lift} (lift for the rule) +#' and \code{support} (support for the rule) #' @rdname spark.fpGrowth #' @aliases associationRules,FPGrowthModel-method #' @note spark.associationRules(FPGrowthModel) since 2.2.0 diff --git a/R/pkg/tests/fulltests/test_mllib_fpm.R b/R/pkg/tests/fulltests/test_mllib_fpm.R index bc1e17538d41..78d26d332447 100644 --- a/R/pkg/tests/fulltests/test_mllib_fpm.R +++ b/R/pkg/tests/fulltests/test_mllib_fpm.R @@ -45,7 +45,8 @@ test_that("spark.fpGrowth", { antecedent = I(list(list("2"), list("3"))), consequent = I(list(list("1"), list("1"))), confidence = c(1, 1), - lift = c(1, 1) + lift = c(1, 1), + support = c(0.75, 0.5) ) expect_equivalent(expected_association_rules, collect(spark.associationRules(model))) diff --git a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala index e50d4255b1f3..f1a68edaed95 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala @@ -244,9 +244,9 @@ class FPGrowthModel private[ml] ( @transient private var _cachedRules: DataFrame = _ /** - * Get association rules fitted using the minConfidence. Returns a dataframe with four fields, - * "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are - * Array[T], whereas "confidence" and "lift" are Double. + * Get association rules fitted using the minConfidence. Returns a dataframe with five fields, + * "antecedent", "consequent", "confidence", "lift" and "support", where "antecedent" and + * "consequent" are Array[T], whereas "confidence", "lift" and "support" are Double. */ @Since("2.2.0") @transient def associationRules: DataFrame = { @@ -254,7 +254,8 @@ class FPGrowthModel private[ml] ( _cachedRules } else { _cachedRules = AssociationRules - .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport) + .getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport, + numTrainingRecords) _cachedMinConf = $(minConfidence) _cachedRules } @@ -385,6 +386,7 @@ private[fpm] object AssociationRules { * @param freqCol column name for appearance count of the frequent itemsets * @param minConfidence minimum confidence for generating the association rules * @param itemSupport map containing an item and its support + * @param numTrainingRecords count of training Dataset * @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double], * "lift" [Double]) containing the association rules. */ @@ -393,21 +395,23 @@ private[fpm] object AssociationRules { itemsCol: String, freqCol: String, minConfidence: Double, - itemSupport: scala.collection.Map[T, Double]): DataFrame = { - + itemSupport: scala.collection.Map[T, Double], + numTrainingRecords: Long): DataFrame = { val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd .map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1))) val rows = new MLlibAssociationRules() .setMinConfidence(minConfidence) .run(freqItemSetRdd, itemSupport) - .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull)) + .map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull, + r.freqUnion / numTrainingRecords)) val dt = dataset.schema(itemsCol).dataType val schema = StructType(Seq( StructField("antecedent", dt, nullable = false), StructField("consequent", dt, nullable = false), StructField("confidence", DoubleType, nullable = false), - StructField("lift", DoubleType))) + StructField("lift", DoubleType), + StructField("support", DoubleType, nullable = false))) val rules = dataset.sparkSession.createDataFrame(rows, schema) rules } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala index 43d256bbc46c..601c7da30ffe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/AssociationRules.scala @@ -124,7 +124,7 @@ object AssociationRules { class Rule[Item] private[fpm] ( @Since("1.5.0") val antecedent: Array[Item], @Since("1.5.0") val consequent: Array[Item], - freqUnion: Double, + private[spark] val freqUnion: Double, freqAntecedent: Double, freqConsequent: Option[Double]) extends Serializable { diff --git a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala index b75526a48371..d42ced0f8f91 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/fpm/FPGrowthSuite.scala @@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul val model = new FPGrowth().setMinSupport(0.5).fit(data) val generatedRules = model.setMinConfidence(0.5).associationRules val expectedRules = spark.createDataFrame(Seq( - (Array("2"), Array("1"), 1.0, 1.0), - (Array("1"), Array("2"), 0.75, 1.0) - )).toDF("antecedent", "consequent", "confidence", "lift") + (Array("2"), Array("1"), 1.0, 1.0, 0.75), + (Array("1"), Array("2"), 0.75, 1.0, 0.75) + )).toDF("antecedent", "consequent", "confidence", "lift", "support") .withColumn("antecedent", col("antecedent").cast(ArrayType(dt))) .withColumn("consequent", col("consequent").cast(ArrayType(dt))) assert(expectedRules.sort("antecedent").rdd.collect().sameElements( @@ -61,6 +61,31 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul } } + test("FPGrowth associationRules") { + val dataset = spark.createDataFrame(Seq( + (1, Array("1", "2")), + (2, Array("3")), + (3, Array("4", "5")), + (4, Array("1", "2", "3")), + (5, Array("2")) + )).toDF("id", "items") + val model = new FPGrowth().setMinSupport(0.1).setMinConfidence(0.1).fit(dataset) + val expectedRules = spark.createDataFrame(Seq( + (Array("2"), Array("1"), 0.6666666666666666, 1.6666666666666665, 0.4), + (Array("2"), Array("3"), 0.3333333333333333, 0.8333333333333333, 0.2), + (Array("3"), Array("1"), 0.5, 1.25, 0.2), + (Array("3"), Array("2"), 0.5, 0.8333333333333334, 0.2), + (Array("1", "3"), Array("2"), 1.0, 1.6666666666666667, 0.2), + (Array("1", "2"), Array("3"), 0.5, 1.25, 0.2), + (Array("4"), Array("5"), 1.0, 5.0, 0.2), + (Array("5"), Array("4"), 1.0, 5.0, 0.2), + (Array("1"), Array("3"), 0.5, 1.25, 0.2), + (Array("1"), Array("2"), 1.0, 1.6666666666666667, 0.4), + (Array("3", "2"), Array("1"), 1.0, 2.5, 0.2) + )).toDF("antecedent", "consequent", "confidence", "lift", "support") + assert(expectedRules.collect().toSet.equals(model.associationRules.collect().toSet)) + } + test("FPGrowth getFreqItems") { val model = new FPGrowth().setMinSupport(0.7).fit(dataset) val expectedFreq = spark.createDataFrame(Seq( diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 7d933daf9e03..7a5591f3fbf7 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -180,15 +180,15 @@ class FPGrowth(JavaEstimator, _FPGrowthParams, JavaMLWritable, JavaMLReadable): only showing top 5 rows ... >>> fpm.associationRules.show(5) - +----------+----------+----------+----+ - |antecedent|consequent|confidence|lift| - +----------+----------+----------+----+ - | [t, s]| [y]| 1.0| 2.0| - | [t, s]| [x]| 1.0| 1.5| - | [t, s]| [z]| 1.0| 1.2| - | [p]| [r]| 1.0| 2.0| - | [p]| [z]| 1.0| 1.2| - +----------+----------+----------+----+ + +----------+----------+----------+----+------------------+ + |antecedent|consequent|confidence|lift| support| + +----------+----------+----------+----+------------------+ + | [t, s]| [y]| 1.0| 2.0|0.3333333333333333| + | [t, s]| [x]| 1.0| 1.5|0.3333333333333333| + | [t, s]| [z]| 1.0| 1.2|0.3333333333333333| + | [p]| [r]| 1.0| 2.0|0.3333333333333333| + | [p]| [z]| 1.0| 1.2|0.3333333333333333| + +----------+----------+----------+----+------------------+ only showing top 5 rows ... >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"]) diff --git a/python/pyspark/ml/tests/test_algorithms.py b/python/pyspark/ml/tests/test_algorithms.py index 2faf2d98f027..c948bd0c646d 100644 --- a/python/pyspark/ml/tests/test_algorithms.py +++ b/python/pyspark/ml/tests/test_algorithms.py @@ -226,8 +226,8 @@ def test_association_rules(self): fpm = fp.fit(self.data) expected_association_rules = self.spark.createDataFrame( - [([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)], - ["antecedent", "consequent", "confidence", "lift"] + [([3], [1], 1.0, 1.0, 0.5), ([2], [1], 1.0, 1.0, 0.75)], + ["antecedent", "consequent", "confidence", "lift", "support"] ) actual_association_rules = fpm.associationRules