Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public static void main(String[] args) {

AssociationRules arules = new AssociationRules()
.setMinConfidence(0.8);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets);
JavaRDD<AssociationRules.Rule<String>> results = arules.run(freqItemsets, 50L);

for (AssociationRules.Rule<String> rule : results.collect()) {
System.out.println(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object AssociationRulesExample {

val ar = new AssociationRules()
.setMinConfidence(0.8)
val results = ar.run(freqItemsets)
val results = ar.run(freqItemsets, 50L)

results.collect().foreach { rule =>
println("[" + rule.antecedent.mkString(",")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rdd.RDD
* A Wrapper of FPGrowthModel to provide helper method for Python
*/
private[python] class FPGrowthModelWrapper(model: FPGrowthModel[Any])
extends FPGrowthModel(model.freqItemsets) {
extends FPGrowthModel(model.freqItemsets, model.dataSize) {

def getFreqItemsets: RDD[Array[Any]] = {
SerDe.fromTuple2RDD(model.freqItemsets.map(x => (x.javaItems, x.freq)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class AssociationRules private[fpm] (
*
*/
@Since("1.5.0")
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = {
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]], dataSize: Long): RDD[Rule[Item]] = {
// For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
val candidates = freqItemsets.flatMap { itemset =>
val items = itemset.items
Expand All @@ -79,15 +79,15 @@ class AssociationRules private[fpm] (
// Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
.map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent)
new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent, dataSize)
}.filter(_.confidence >= minConfidence)
}

/** Java-friendly version of [[run]]. */
@Since("1.5.0")
def run[Item](freqItemsets: JavaRDD[FreqItemset[Item]]): JavaRDD[Rule[Item]] = {
def run[Item](freqItemsets: JavaRDD[FreqItemset[Item]], dataSize: Long): JavaRDD[Rule[Item]] = {
val tag = fakeClassTag[Item]
run(freqItemsets.rdd)(tag)
run(freqItemsets.rdd, dataSize)(tag)
}
}

Expand All @@ -111,7 +111,8 @@ object AssociationRules {
@Since("1.5.0") val antecedent: Array[Item],
@Since("1.5.0") val consequent: Array[Item],
freqUnion: Double,
freqAntecedent: Double) extends Serializable {
freqAntecedent: Double,
dataSize: Long) extends Serializable {

/**
* Returns the confidence of the rule.
Expand All @@ -120,6 +121,13 @@ object AssociationRules {
@Since("1.5.0")
def confidence: Double = freqUnion.toDouble / freqAntecedent

/**
* Returns the support of the rule. Current implementation would return the number of
* co-occurrence of antecedent and consequent.
*/
@Since("2.1.0")
def support: Double = freqUnion.toDouble / dataSize

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is intentionally typed as Double. In the future, it could be fraction value ( < 1.0).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think the meaning of this should ever be overloaded. Support is a count.

Copy link
Contributor Author

@hhbyyh hhbyyh Jun 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two major considerations:

  1. In most definition and text books, support is a fraction value in [0.0, 1.0]. It's possible for us to align with it in the future.
  2. Current implementation of Association rule actually allows both
    freqUnion: Double, and
    freqAntecedent: Double
    to be fraction value [0.0, 1.0] although they are both counts now. I don't want to destroy the flexibility and break API in the future.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah right, we use support as a fraction. Well, then best to be consistent and return it as a fraction of the data set size. I can't imagine having a method sometimes return a value with one type of semantics and sometimes another. Just make two methods.

freqUnion however appears to be a count only, and is even explicitly called a 'frequency'.

Copy link
Contributor Author

@hhbyyh hhbyyh Jun 14, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose freqUnion is made as a Double on purpose for the same reason. (flexibility for the future)

Making support a fraction now requires that we must keep the dataset size info in FPGrowthModel and AssociationRule. Yet that would introduce API change in the constructor. I thought we should avoid breaking API between 2.0 and 2.1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dunno, that seems like a mistake to me. It should be a Long if it's a count, and should expose alternative factory methods to accept input of different types if needed. Overloading one argument seems like a hack and I'd prefer not to extend it (or fix it).

See SPARK-15930 which concerns adding the input size just for this reason, I assume. We haven't released 2.0, and so could in theory still put in a change to the constructor. I agree, we might however have to deprecate the existing one, add a new one, and still deal with calls to the old constructor, which would mean it's not possible to compute values that are a fraction of the whole data set. This in turn may argue for clearly separating inputs/outputs that are counts vs percentages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it hard to just deprecating the old constructor and still keep support as a fraction if no dataset size is passed in.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's not possible to implement in that case. There's an argument for just adding the parameter and removing the old constructor for 2.0 in order to support this without the convolutions. I'd love to get a thumbs up from @jkbradley or @mengxr though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support should be a fraction to be consistent with the semantic of minSupport in FPGrowth and PrefixSpan. There should be a compatible way to add support. Rule is not a case class and its constructor is package private. So this should be easy to add. Another approach is to add total number of records in the model, so people can calculate the support easily.

require(antecedent.toSet.intersect(consequent.toSet).isEmpty, {
val sharedItems = antecedent.toSet.intersect(consequent.toSet)
s"A valid association rule must have disjoint antecedent and " +
Expand Down
19 changes: 11 additions & 8 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ import org.apache.spark.storage.StorageLevel
*/
@Since("1.3.0")
class FPGrowthModel[Item: ClassTag] @Since("1.3.0") (
@Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]])
@Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]],
@Since("2.0.0") val dataSize: Long)
extends Saveable with Serializable {
/**
* Generates association rules for the [[Item]]s in [[freqItemsets]].
Expand All @@ -58,7 +59,7 @@ class FPGrowthModel[Item: ClassTag] @Since("1.3.0") (
@Since("1.5.0")
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
val associationRules = new AssociationRules(confidence)
associationRules.run(freqItemsets)
associationRules.run(freqItemsets, dataSize)
}

/**
Expand Down Expand Up @@ -102,7 +103,8 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
val spark = SparkSession.builder().sparkContext(sc).getOrCreate()

val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion)))
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~
("dataSize" -> model.dataSize)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path))

// Get the type of item class
Expand All @@ -128,19 +130,20 @@ object FPGrowthModel extends Loader[FPGrowthModel[_]] {
val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
assert(className == thisClassName)
assert(formatVersion == thisFormatVersion)

val dataSize = (metadata \ "dataSize").extract[Long]
val freqItemsets = spark.read.parquet(Loader.dataPath(path))
val sample = freqItemsets.select("items").head().get(0)
loadImpl(freqItemsets, sample)
loadImpl(freqItemsets, sample, dataSize)
}

def loadImpl[Item: ClassTag](freqItemsets: DataFrame, sample: Item): FPGrowthModel[Item] = {
def loadImpl[Item: ClassTag](freqItemsets: DataFrame, sample: Item,
dataSize: Long): FPGrowthModel[Item] = {
val freqItemsetsRDD = freqItemsets.select("items", "freq").rdd.map { x =>
val items = x.getAs[Seq[Item]](0).toArray
val freq = x.getLong(1)
new FreqItemset(items, freq)
}
new FPGrowthModel(freqItemsetsRDD)
new FPGrowthModel(freqItemsetsRDD, dataSize)
}
}
}
Expand Down Expand Up @@ -215,7 +218,7 @@ class FPGrowth private (
val partitioner = new HashPartitioner(numParts)
val freqItems = genFreqItems(data, minCount, partitioner)
val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
new FPGrowthModel(freqItemsets)
new FPGrowthModel(freqItemsets, count)
}

/** Java-friendly version of [[run]]. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void runAssociationRules() {
new FreqItemset<String>(new String[]{"a", "b"}, 12L)
));

JavaRDD<AssociationRules.Rule<String>> results = (new AssociationRules()).run(freqItemsets);
JavaRDD<AssociationRules.Rule<String>> results = (new AssociationRules()).run(
freqItemsets, 50L);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class AssociationRulesSuite extends SparkFunSuite with MLlibTestSparkContext {

val results1 = ar
.setMinConfidence(0.9)
.run(freqItemsets)
.run(freqItemsets, 10L)
.collect()

/* Verify results using the `R` code:
Expand Down Expand Up @@ -67,7 +67,7 @@ class AssociationRulesSuite extends SparkFunSuite with MLlibTestSparkContext {

val results2 = ar
.setMinConfidence(0)
.run(freqItemsets)
.run(freqItemsets, 10L)
.collect()

/* Verify results using the `R` code:
Expand Down