Skip to content
Closed
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/_data/menu-ml.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
url: ml-clustering.html
- text: Collaborative filtering
url: ml-collaborative-filtering.html
- text: Frequent Pattern Mining
url: ml-frequent-pattern-mining.html
- text: Model selection and tuning
url: ml-tuning.html
- text: Advanced topics
Expand Down
75 changes: 75 additions & 0 deletions docs/ml-frequent-pattern-mining.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
layout: global
title: Frequent Pattern Mining
displayTitle: Frequent Pattern Mining
---

Mining frequent items, itemsets, subsequences, or other substructures is usually among the
first steps to analyze a large-scale dataset, which has been an active research topic in
data mining for years.
We refer users to Wikipedia's [association rule learning](http://en.wikipedia.org/wiki/Association_rule_learning)
for more information.

**Table of Contents**

* This will become a table of contents (this text will be scraped).
{:toc}

## FP-Growth

The FP-growth algorithm is described in the paper
[Han et al., Mining frequent patterns without candidate generation](http://dx.doi.org/10.1145/335191.335372),
where "FP" stands for frequent pattern.
Given a dataset of transactions, the first step of FP-growth is to calculate item frequencies and identify frequent items.
Different from [Apriori-like](http://en.wikipedia.org/wiki/Apriori_algorithm) algorithms designed for the same purpose,
the second step of FP-growth uses a suffix tree (FP-tree) structure to encode transactions without generating candidate sets
explicitly, which are usually expensive to generate.
After the second step, the frequent itemsets can be extracted from the FP-tree.
In `spark.mllib`, we implemented a parallel version of FP-growth called PFP,
as described in [Li et al., PFP: Parallel FP-growth for query recommendation](http://dx.doi.org/10.1145/1454008.1454027).
PFP distributes the work of growing FP-trees based on the suffices of transactions,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suffixes

and hence more scalable than a single-machine implementation.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is more scalable

We refer users to the papers for more details.

`spark.ml`'s FP-growth implementation takes the following (hyper-)parameters:

* `minSupport`: the minimum support for an itemset to be identified as frequent.
For example, if an item appears 3 out of 5 transactions, it has a support of 3/5=0.6.
* `minConfidence`: minimum confidence for generating Association Rule. The parameter will not affect the mining
for frequent itemsets,, but specify the minimum confidence for generating association rules from frequent itemsets.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be good to give an example for confidence as well since one has been given for support

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, there are two commas after itemsets

* `numPartitions`: the number of partitions used to distribute the work. By default the param is not set, and
partition number of the input dataset is used.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the number of partitions of the input dataset


The `FPGrowthModel` provides:

* `freqItemsets`: frequent itemsets in the format of DataFrame("items"[Array], "freq"[Long])
* `associationRules`: association rules generated with confidence above `minConfidence`, in the format of
DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double]).
* `transform`: The transform method examines the input items in `itemsCol` against all the association rules and
summarize the consequents as prediction. The prediction column has the same data type as the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this really explains what transform does or maybe it's just me?

I would have said something like:

The transform method will produce predictionCol containing all the consequents of the association rules containing the items in itemsCol as their antecedents. The prediction column...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion. I do wish to have a better illustration here. But the two containing in your version make it not that straightforward, and actually it should be items in itemsCol contains the antecedents for association rules.

I extend it to a longer version,

For each record in itemsCol, the transform method will compare its items against the antecedents of each association rule. If the record contains all the antecedents of a specific association rule, the rule will be considered as applicable and its consequents will be added to the prediction result. The transform method will summarize the consequents from all the applicable rules as prediction.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even better 👍

`itemsCol` and does not contain existing items in the `itemsCol`.


**Examples**

<div class="codetabs">

<div data-lang="scala" markdown="1">
Refer to the [Scala API docs](api/scala/index.html#org.apache.spark.ml.fpm.FPGrowth) for more details.

{% include_example scala/org/apache/spark/examples/ml/FPGrowthExample.scala %}
</div>

<div data-lang="java" markdown="1">
Refer to the [Java API docs](api/java/org/apache/spark/ml/fpm/FPGrowth.html) for more details.

{% include_example java/org/apache/spark/examples/ml/JavaFPGrowthExample.java %}
</div>

<div data-lang="python" markdown="1">
Refer to the [Python API docs](api/python/pyspark.ml.html#pyspark.ml.fpm.FPGrowth) for more details.

{% include_example python/ml/fpgrowth_example.py %}
</div>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add R please

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Added reference to R example. Manually checked on generated doc.


</div>
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.ml;

// $example on$
import java.util.Arrays;
import java.util.List;

import org.apache.spark.ml.fpm.FPGrowth;
import org.apache.spark.ml.fpm.FPGrowthModel;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.*;
// $example off$

/**
* An example demonstrating FPGrowth.
* Run with
* <pre>
* bin/run-example ml.JavaFPGrowthExample
* </pre>
*/
public class JavaFPGrowthExample {
public static void main(String[] args) {
SparkSession spark = SparkSession
.builder()
.appName("JavaFPGrowthExample")
.getOrCreate();

// $example on$
List<Row> data = Arrays.asList(
RowFactory.create(Arrays.asList("1 2 5".split(" "))),
RowFactory.create(Arrays.asList("1 2 3 5".split(" "))),
RowFactory.create(Arrays.asList("1 2".split(" ")))
);
StructType schema = new StructType(new StructField[]{ new StructField(
"items", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
});
Dataset<Row> itemsDF = spark.createDataFrame(data, schema);

FPGrowthModel model = new FPGrowth()
.setItemsCol("items")
.setMinSupport(0.5)
.setMinConfidence(0.6)
.fit(itemsDF);

// Display frequent itemsets.
model.freqItemsets().show();

// Display generated association rules.
model.associationRules().show();

// transform examines the input items against all the association rules and summarize the
// consequents as prediction
model.transform(itemsDF).show();
// $example off$

spark.stop();
}
}
48 changes: 48 additions & 0 deletions examples/src/main/python/ml/fpgrowth_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# $example on$
from pyspark.ml.fpm import FPGrowth
# $example off$
from pyspark.sql import SparkSession

"""
An example demonstrating FPGrowth.
Run with:
bin/spark-submit examples/src/main/python/ml/fpgrowth_example.py
"""

if __name__ == "__main__":
spark = SparkSession\
.builder\
.appName("FPGrowthExample")\
.getOrCreate()

# $example on$
df = spark.createDataFrame([
(0, [1, 2, 5]),
(1, [1, 2, 3, 5]),
(2, [1, 2])
], ["id", "items"])

fpGrowth = FPGrowth(itemsCol="items", minSupport=0.5, minConfidence=0.6)
fpGrowthModel = fpGrowth.fit(df)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we associationRules example? After all this is the biggest advantage for the Python user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

definitely. Thanks.

fpGrowthModel.transform(df).show()
# $example off$

spark.stop()
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.ml

// scalastyle:off println

// $example on$
import org.apache.spark.ml.fpm.FPGrowth
// $example off$
import org.apache.spark.sql.SparkSession

/**
* An example demonstrating FP-Growth.
* Run with
* {{{
* bin/run-example ml.FPGrowthExample
* }}}
*/
object FPGrowthExample {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove blank line

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName(s"${this.getClass.getSimpleName}")
.getOrCreate()
import spark.implicits._

// $example on$
val dataset = spark.createDataset(Seq(
"1 2 5",
"1 2 3 5",
"1 2")
).map(t => t.split(" ")).toDF("items")

val fpgrowth = new FPGrowth().setItemsCol("items").setMinSupport(0.5).setMinConfidence(0.6)
val model = fpgrowth.fit(dataset)

// Display frequent itemsets.
model.freqItemsets.show()

// Display generated association rules.
model.associationRules.show()

// transform examines the input items against all the association rules and summarize the
// consequents as prediction
model.transform(dataset).show()
// $example off$

spark.stop()
}
}
// scalastyle:on println
34 changes: 14 additions & 20 deletions mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.ml.fpm

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -54,7 +53,7 @@ private[fpm] trait FPGrowthParams extends Params with HasPredictionCol {

/**
* Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears
* more than (minSupport * size-of-the-dataset) times will be output
* more than (minSupport * size-of-the-dataset) times will be output in the frequent itemsets.
* Default: 0.3
* @group param
*/
Expand Down Expand Up @@ -82,8 +81,8 @@ private[fpm] trait FPGrowthParams extends Params with HasPredictionCol {
def getNumPartitions: Int = $(numPartitions)

/**
* Minimal confidence for generating Association Rule.
* Note that minConfidence has no effect during fitting.
* Minimal confidence for generating Association Rule. MinConfidence will not affect the mining
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lower case minConfidence?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it.

* for frequent itemsets, but will affect the association rules generation.
* Default: 0.8
* @group param
*/
Expand Down Expand Up @@ -118,7 +117,7 @@ private[fpm] trait FPGrowthParams extends Params with HasPredictionCol {
* Recommendation</a>. PFP distributes computation in such a way that each worker executes an
* independent group of mining tasks. The FP-Growth algorithm is described in
* <a href="http://dx.doi.org/10.1145/335191.335372">Han et al., Mining frequent patterns without
* candidate generation</a>. Note null values in the feature column are ignored during fit().
* candidate generation</a>. Note null values in the itemsCol column are ignored during fit().
*
* @see <a href="http://en.wikipedia.org/wiki/Association_rule_learning">
* Association rule learning (Wikipedia)</a>
Expand Down Expand Up @@ -167,7 +166,6 @@ class FPGrowth @Since("2.2.0") (
}
val parentModel = mllibFP.run(items)
val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq))

val schema = StructType(Seq(
StructField("items", dataset.schema($(itemsCol)).dataType, nullable = false),
StructField("freq", LongType, nullable = false)))
Expand Down Expand Up @@ -196,7 +194,7 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] {
* :: Experimental ::
* Model fitted by FPGrowth.
*
* @param freqItemsets frequent items in the format of DataFrame("items"[Seq], "freq"[Long])
* @param freqItemsets frequent itemsets in the format of DataFrame("items"[Array], "freq"[Long])
*/
@Since("2.2.0")
@Experimental
Expand Down Expand Up @@ -232,7 +230,7 @@ class FPGrowthModel private[ml] (
* Then for each association rule, it will examine the input items against antecedents and
* summarize the consequents as prediction. The prediction column has the same data type as the
* input column(Array[T]) and will not contain existing items in the input column. The null
* values in the feature columns are treated as empty sets.
* values in the itemsCol columns are treated as empty sets.
* WARNING: internally it collects association rules to the driver and uses broadcast for
* efficiency. This may bring pressure to driver memory for large set of association rules.
*/
Expand All @@ -253,12 +251,8 @@ class FPGrowthModel private[ml] (
val predictUDF = udf((items: Seq[_]) => {
if (items != null) {
val itemset = items.toSet
brRules.value.flatMap(rule =>
if (items != null && rule._1.forall(item => itemset.contains(item))) {
rule._2.filter(item => !itemset.contains(item))
} else {
Seq.empty
}).distinct
brRules.value.filter(_._1.forall(itemset.contains))
.flatMap(_._2.filter(!itemset.contains(_))).distinct
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we aware of code changes here

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we don't need to handle null item?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @felixcheung , can you be more specific about the null item that concerns you? Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, 2 things - first just calling out while the PR says doc changes there is this one code change here.
second, before this code was checking items != null do we need not consider that now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

items != null is already checked at two lines above.
Please refer to the comments in the PR for the history of the code change. I can update title to include the code change.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's update the PR/JIRA if code change is required for the doc change.
otherwise, let's leave code change as a separate PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that's the right way. I will revert the code change.

} else {
Seq.empty
}}, dt)
Expand Down Expand Up @@ -320,13 +314,13 @@ private[fpm] object AssociationRules {

/**
* Computes the association rules with confidence above minConfidence.
* @param dataset DataFrame("items", "freq") containing frequent itemset obtained from
* algorithms like [[FPGrowth]].
* @param dataset DataFrame("items"[Array], "freq"[Long]) containing frequent itemsets obtained
* from algorithms like [[FPGrowth]].
* @param itemsCol column name for frequent itemsets
* @param freqCol column name for frequent itemsets count
* @param minConfidence minimum confidence for the result association rules
* @return a DataFrame("antecedent", "consequent", "confidence") containing the association
* rules.
* @param freqCol column name for appearance count of the frequent itemsets
* @param minConfidence minimum confidence for generating the association rules
* @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double])
* containing the association rules.
*/
def getAssociationRulesFromFP[T: ClassTag](
dataset: Dataset[_],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.ml.fpm
import org.apache.spark.SparkFunSuite
import org.apache.spark.ml.util.DefaultReadWriteTest
import org.apache.spark.mllib.util.MLlibTestSparkContext
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -91,6 +91,8 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
.setMinConfidence(0.5678)
assert(fpGrowth.getMinSupport === 0.4567)
assert(model.getMinConfidence === 0.5678)
// numPartitions should not have default value.
assert(fpGrowth.isDefined(fpGrowth.numPartitions) === false)
}

test("read/write") {
Expand Down