-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-6487][MLlib] Add sequential pattern mining algorithm PrefixSpan to Spark MLlib #7258
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make it generic type so that it can accept not only Array[Int] as input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Int data type is small than some other data types, such as String etc. So, inside the algorithm, we use Int data type to get the better performance.
If user use other data type, like String, maybe he will code String to Int, and decode the result to String. The following:
val sequences = Array(
Array("3", "1", "3", "4", "5"),
Array("2", "3", "1"),
Array("3", "4", "4", "3"),
Array("1", "3", "4", "5"),
Array("2", "4", "1"),
Array("6", "5", "3"))
val rdd = sc.parallelize(sequences, 2).cache()
// create coder and decoder
val letterMap = rdd.flatMap(x => x.distinct).distinct().zipWithIndex().mapValues(_.toInt).collect
val coder = letterMap.toMap
val decoder = letterMap.map(x => (x._2, x._1)).toMap
// code
val intRdd = rdd.map(x => x.map(y => coder(y)))
val prefixspan1 = new Prefixspan(intRdd, 2, 50)
val result = prefixspan1.run()
// decode
val stringResult = result.map(x => (x._1.map(y => decoder(y)), x._2))
I think this is a general job, some other algorithms maybe need this function too, so can we add the coder and decoder as a separate model for all other algorithms ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FPGrowth accepts generic item type and encode items into indices after the first step. We can do that in a follow-up PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem definition in the referenced paper suggests this should actually be Array[Array[Item]] for Item a generic type. I agree that this extension can be deferred to a later PR and the current code is fine as is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually can the change Array[Int] -> Array[Array[Int]] be made in this PR since it affects the kind of a parameter exposed in a public API and will be more easy to generalize (for now we can just flatten the array and use the existing implementation)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@feynmanliang The generalization for non-temporal items would happen in a follow-up PR before 1.5. This is discussed on the JIRA page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mengxr This PR actually implements non-temporal (by temporal you mean consecutive correct?) sequential pattern mining, see the L86 and L149. My suggestion was to make the definition of a sequence consistent with the Han et al paper (An itemset is a subset of items. A sequence is an ordered list of itemsets) since this PR currently defines a sequence to be a ordered list of singleton itemsets.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Synced offline with @mengxr, Array[Int] OK for this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, It would be fixed in a follow-up PR before 1.5. It's soon.
|
add to whitelist |
|
@feynmanliang Could you take a look? |
|
Test build #36686 has finished for PR 7258 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add ::Experimental:: here and @Experimental to class PrefixSpan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
|
@zhangjiajin Thanks for making this PR minimal! I made some comments about code style. @feynmanliang will make a pass on the implementation. @jackylk Could you make a pass as well? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should be consistent with FPGrowth and define minSupport to be a double in [0,1] indicating the proportion of transactions containing the sequential pattern (I realize that Han et al define it as the total count but Wikipedia, which is referenced below, follows the same definition I am suggesting).
This just means that we will need to multiply by sequences.count() before doing any of the filtering by minSupport.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed, change minSupport data type to double in [0,1]
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is too expensive (creating many temp objects). You only need the counts, so create an PrimitiveKeyOpenHashMap (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala) and count one by one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, the sequences is: , the frequent item and counts is: (a:2),(b:2),(c:1),(d:3),(e:2),(f:1),(g:1). I have no idea how to use PrimitiveKeyOpenHashMap.
|
@zhangjiajin Since you already collected the frequent items (length-1 patterns) to driver, you don't need to keep the RDD of length-1 patterns. When generating the final patterns, recomputing the RDD is more expensive than parallelizing the collected ones. Another comment is to separate local computation from the distributed ones. It makes the implementation easier to read. We can create a private object called |
|
Test build #36936 has finished for PR 7258 at commit
|
|
@mengxr I don't know why method 2 is projection before filtering. I think the method two is exactly what you want. The only need to add functionality to the current code is step 4 (Do we have enough candidates to distribute the work? If no, go to 1 and generate candidates with length + 1.) |
|
Test build #37035 has finished for PR 7258 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"the key of pair is pattern (a list of elements)," -> "the key of pair is sequential pattern (a list of items),"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
|
@feynmanliang comments: Delete makePrefixProjectedDatabases, move the groupByKey() to the last call in this method (no need to include the two map()s on L161 and L163 since they don't do anything) Because the pair's key is Array(), groupByKey() don't work well, so, the Array must be converted to seq before groupByKey, and be converted back after groupByKey. |
|
@zhangjiajin Yep, you're right. Thanks for pointing it out! |
|
LGTM pending tests |
|
Test build #37073 has finished for PR 7258 at commit
|
|
@zhangjiajin Let's merge this version and make improvements in follow-up PRs:
I will make JIRAs for each of them. @feynmanliang will work on 1). @zhangjiajin Could you work on 2) and do some performance benchmark? Thanks! |
|
Merged into master. Thanks for contributing PrefixSpan! |
|
OK |
…an to Spark MLlib Add parallel PrefixSpan algorithm and test file. Support non-temporal sequences. Author: zhangjiajin <[email protected]> Author: zhang jiajin <[email protected]> Closes #7258 from zhangjiajin/master and squashes the following commits: ca9c4c8 [zhangjiajin] Modified the code according to the review comments. 574e56c [zhangjiajin] Add new object LocalPrefixSpan, and do some optimization. ba5df34 [zhangjiajin] Fix a Scala style error. 4c60fb3 [zhangjiajin] Fix some Scala style errors. 1dd33ad [zhangjiajin] Modified the code according to the review comments. 89bc368 [zhangjiajin] Fixed a Scala style error. a2eb14c [zhang jiajin] Delete PrefixspanSuite.scala 951fd42 [zhang jiajin] Delete Prefixspan.scala 575995f [zhangjiajin] Modified the code according to the review comments. 91fd7e6 [zhangjiajin] Add new algorithm PrefixSpan and test file.


Add parallel PrefixSpan algorithm and test file.
Support non-temporal sequences.