-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-8997][MLlib]Performance improvements in LocalPrefixSpan #7360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
70b93e3
2e00cba
f055d82
9212256
91e4357
59db2f5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,8 +20,6 @@ package org.apache.spark.mllib.fpm | |
| import org.apache.spark.Logging | ||
| import org.apache.spark.annotation.Experimental | ||
|
|
||
| import scala.collection.mutable.ArrayBuffer | ||
|
|
||
| /** | ||
| * | ||
| * :: Experimental :: | ||
|
|
@@ -36,80 +34,71 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { | |
| * @param minCount minimum count | ||
| * @param maxPatternLength maximum pattern length | ||
| * @param prefix prefix | ||
| * @param projectedDatabase the projected dabase | ||
| * @param database the projected dabase | ||
| * @return a set of sequential pattern pairs, | ||
| * the key of pair is sequential pattern (a list of items), | ||
| * the value of pair is the pattern's count. | ||
| */ | ||
| def run( | ||
| minCount: Long, | ||
| maxPatternLength: Int, | ||
| prefix: ArrayBuffer[Int], | ||
| projectedDatabase: Array[Array[Int]]): Iterator[(Array[Int], Long)] = { | ||
| val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) | ||
| val frequentPatternAndCounts = frequentPrefixAndCounts | ||
| .map(x => ((prefix :+ x._1).toArray, x._2)) | ||
| val prefixProjectedDatabases = getPatternAndProjectedDatabase( | ||
| prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) | ||
| prefix: List[Int], | ||
| database: Iterable[Array[Int]]): Iterator[(Array[Int], Long)] = { | ||
|
|
||
| if (database.isEmpty) return Iterator.empty | ||
|
|
||
| val frequentItemAndCounts = getFreqItemAndCounts(minCount, database) | ||
| val frequentItems = frequentItemAndCounts.map(_._1) | ||
| val frequentPatternAndCounts = frequentItemAndCounts | ||
| .map { case (item, count) => ((item :: prefix).reverse.toArray, count) } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't need to reverse in |
||
|
|
||
| if (prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength) { | ||
| frequentPatternAndCounts.iterator ++ prefixProjectedDatabases.flatMap { | ||
| case (nextPrefix, projDB) => run(minCount, maxPatternLength, nextPrefix, projDB) | ||
| val filteredProjectedDatabase = database.map(x => x.filter(frequentItems.contains(_))) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Move this to the |
||
|
|
||
| if (prefix.length + 1 < maxPatternLength) { | ||
| frequentPatternAndCounts ++ frequentItems.flatMap { item => | ||
| val nextProjected = project(filteredProjectedDatabase, item) | ||
| run(minCount, maxPatternLength, item :: prefix, nextProjected) | ||
| } | ||
| } else { | ||
| frequentPatternAndCounts.iterator | ||
| frequentPatternAndCounts | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * calculate suffix sequence following a prefix in a sequence | ||
| * @param prefix prefix | ||
| * @param sequence sequence | ||
| * Calculate suffix sequence immediately after the first occurrence of an item. | ||
| * @param item item to get suffix after | ||
| * @param sequence sequence to extract suffix from | ||
| * @return suffix sequence | ||
| */ | ||
| def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { | ||
| val index = sequence.indexOf(prefix) | ||
| def getSuffix(item: Int, sequence: Array[Int]): Array[Int] = { | ||
| val index = sequence.indexOf(item) | ||
| if (index == -1) { | ||
| Array() | ||
| } else { | ||
| sequence.drop(index + 1) | ||
| } | ||
| } | ||
|
|
||
| def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = { | ||
| database | ||
| .map(candidateSeq => getSuffix(prefix, candidateSeq)) | ||
| .filter(_.nonEmpty) | ||
| } | ||
|
|
||
| /** | ||
| * Generates frequent items by filtering the input data using minimal count level. | ||
| * @param minCount the absolute minimum count | ||
| * @param sequences sequences data | ||
| * @return array of item and count pair | ||
| * @param minCount the minimum count for an item to be frequent | ||
| * @param database database of sequences | ||
| * @return item and count pairs | ||
| */ | ||
| private def getFreqItemAndCounts( | ||
| minCount: Long, | ||
| sequences: Array[Array[Int]]): Array[(Int, Long)] = { | ||
| sequences.flatMap(_.distinct) | ||
| database: Iterable[Array[Int]]): Iterator[(Int, Long)] = { | ||
| database.flatMap(_.distinct) | ||
| .foldRight(Map[Int, Long]().withDefaultValue(0L)) { case (item, ctr) => | ||
| ctr + (item -> (ctr(item) + 1)) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a mutable |
||
| } | ||
| .filter(_._2 >= minCount) | ||
| .toArray | ||
| } | ||
|
|
||
| /** | ||
| * Get the frequent prefixes' projected database. | ||
| * @param prefix the frequent prefixes' prefix | ||
| * @param frequentPrefixes frequent next prefixes | ||
| * @param projDB projected database for given prefix | ||
| * @return extensions of prefix by one item and corresponding projected databases | ||
| */ | ||
| private def getPatternAndProjectedDatabase( | ||
| prefix: ArrayBuffer[Int], | ||
| frequentPrefixes: Array[Int], | ||
| projDB: Array[Array[Int]]): Array[(ArrayBuffer[Int], Array[Array[Int]])] = { | ||
| val filteredProjectedDatabase = projDB.map(x => x.filter(frequentPrefixes.contains(_))) | ||
| frequentPrefixes.map { nextItem => | ||
| val nextProjDB = filteredProjectedDatabase | ||
| .map(candidateSeq => getSuffix(nextItem, candidateSeq)) | ||
| .filter(_.nonEmpty) | ||
| (prefix :+ nextItem, nextProjDB) | ||
| }.filter(x => x._2.nonEmpty) | ||
| .iterator | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -153,7 +153,7 @@ class PrefixSpan private ( | |
| minCount: Long, | ||
| data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(Array[Int], Long)] = { | ||
| data.flatMap { case (prefix, projDB) => | ||
| LocalPrefixSpan.run(minCount, maxPatternLength, prefix.to[ArrayBuffer], projDB) | ||
| LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList, projDB) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. call |
||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
databaseshould be anArray[Array[Int]]. We need multiple access to it. The return type should beIterator[(List[Int], Long)].