From 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Tue, 7 Jul 2015 15:30:10 +0800 Subject: [PATCH 01/12] Add new algorithm PrefixSpan and test file. --- .../apache/spark/mllib/fpm/Prefixspan.scala | 183 ++++++++++++++++++ .../spark/mllib/fpm/PrefixspanSuite.scala | 47 +++++ 2 files changed, 230 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala new file mode 100644 index 000000000000..c110a37fce16 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fpm + +import org.apache.spark.rdd.RDD + +/** + * + * A parallel PrefixSpan algorithm to mine sequential pattern. + * The PrefixSpan algorithm is described in + * [[http://web.engr.illinois.edu/~hanj/pdf/span01.pdf]]. + * + * @param sequences original sequences data + * @param minSupport the minimal support level of the sequential pattern, any pattern appears + * more than minSupport times will be output + * @param maxPatternLength the maximal length of the sequential pattern, any pattern appears + * less than maxPatternLength will be output + * + * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining Sequential Pattern Mining + * (Wikipedia)]] + */ +class Prefixspan( + val sequences: RDD[Array[Int]], + val minSupport: Int = 2, + val maxPatternLength: Int = 50) extends java.io.Serializable { + + /** + * Calculate sequential patterns: + * a) find and collect length-one patterns + * b) for each length-one patterns and each sequence, + * emit (pattern (prefix), suffix sequence) as key-value pairs + * c) group by key and then map value iterator to array + * d) local PrefixSpan on each prefix + * @return sequential patterns + */ + def run(): RDD[(Seq[Int], Int)] = { + val (patternsOneLength, prefixAndCandidates) = findPatternsLengthOne() + val repartitionedRdd = repartitionSequences(prefixAndCandidates) + val nextPatterns = getPatternsInLocal(repartitionedRdd) + val allPatterns = patternsOneLength.map(x => (Seq(x._1), x._2)) ++ nextPatterns + allPatterns + } + + /** + * Find the patterns that it's length is one + * @return length-one patterns and projection table + */ + private def findPatternsLengthOne(): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { + val patternsOneLength = sequences + .map(_.distinct) + .flatMap(p => p) + .map((_, 1)) + .reduceByKey(_ + _) + + val removedElements: Array[Int] = patternsOneLength + .filter(_._2 < minSupport) + .map(_._1) + .collect() + + val savedElements = patternsOneLength.filter(_._2 >= minSupport) + + val savedElementsArray = savedElements + .map(_._1) + .collect() + + val filteredSequences = + if (removedElements.isEmpty) { + sequences + } else { + sequences.map { p => + p.filter { x => !removedElements.contains(x) } + } + } + + val prefixAndCandidates = filteredSequences.flatMap { x => + savedElementsArray.map { y => + val sub = getSuffix(y, x) + (Seq(y), sub) + } + } + + (savedElements, prefixAndCandidates) + } + + /** + * Re-partition the RDD data, to get better balance and performance. + * @param data patterns and projected sequences data before re-partition + * @return patterns and projected sequences data after re-partition + */ + private def repartitionSequences( + data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { + val dataRemovedEmptyLine = data.filter(x => x._2.nonEmpty) + val dataMerged = dataRemovedEmptyLine + .groupByKey() + .map(x => (x._1, x._2.toArray)) + dataMerged + } + + /** + * calculate the patterns in local. + * @param data patterns and projected sequences data data + * @return patterns + */ + private def getPatternsInLocal( + data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { + val result = data.flatMap { x => + getPatternsWithPrefix(x._1, x._2) + } + result + } + + /** + * calculate the patterns with one prefix in local. + * @param prefix prefix + * @param data patterns and projected sequences data + * @return patterns + */ + private def getPatternsWithPrefix( + prefix: Seq[Int], + data: Array[Array[Int]]): Array[(Seq[Int], Int)] = { + val elements = data + .map(x => x.distinct) + .flatMap(x => x) + .groupBy(x => x) + .map(x => (x._1, x._2.length)) + + val selectedSingleElements = elements.filter(x => x._2 >= minSupport) + + val selectedElements = selectedSingleElements + .map(x => (prefix ++ Seq(x._1), x._2)) + .toArray + + val cleanedSearchSpace = data + .map(x => x.filter(y => selectedSingleElements.contains(y))) + + val newSearchSpace = selectedSingleElements.map { x => + val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) + (prefix ++ Seq(x._1), sub) + }.filter(x => x._2.nonEmpty) + .toArray + + val continueProcess = newSearchSpace.nonEmpty && prefix.length + 1 < maxPatternLength + + if (continueProcess) { + val nextPatterns = newSearchSpace + .map(x => getPatternsWithPrefix(x._1, x._2)) + .reduce(_ ++ _) + selectedElements ++ nextPatterns + } else { + selectedElements + } + } + + /** + * calculate suffix sequence following a prefix in a sequence + * @param prefix prefix + * @param sequence original sequence + * @return suffix sequence + */ + private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { + val index = sequence.indexOf(prefix) + if (index == -1) { + Array() + } else { + sequence.takeRight(sequence.length - index - 1) + } + } +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala new file mode 100644 index 000000000000..770a0c0906f9 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.fpm + +import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { + + test("Prefixspan sequences mining using Integer type") { + val sequences = Array( + Array(3, 1, 3, 4, 5), + Array(2, 3, 1), + Array(3, 4, 4, 3), + Array(1, 3, 4, 5), + Array(2, 4, 1), + Array(6, 5, 3)) + + val rdd = sc.parallelize(sequences, 2).cache() + + val prefixspan1 = new Prefixspan(rdd, 2, 50) + val result1 = prefixspan1.run() + assert(result1.count() == 19) + + val prefixspan2 = new Prefixspan(rdd, 3, 50) + val result2 = prefixspan2.run() + assert(result2.count() == 5) + + val prefixspan3 = new Prefixspan(rdd, 2, 2) + val result3 = prefixspan3.run() + assert(result3.count() == 14) + } +} From 575995f69dadad825d97f2248599eb62c1743fe7 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Wed, 8 Jul 2015 17:07:37 +0800 Subject: [PATCH 02/12] Modified the code according to the review comments. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 209 ++++++++++++++++++ .../spark/mllib/fpm/PrefixSpanSuite.scala | 69 ++++++ 2 files changed, 278 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala new file mode 100644 index 000000000000..70218e2742da --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fpm + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + +/** + * + * :: Experimental :: + * + * A parallel PrefixSpan algorithm to mine sequential pattern. + * The PrefixSpan algorithm is described in + * [[http://doi.org/10.1109/ICDE.2001.914830]]. + * + * @param minSupport the minimal support level of the sequential pattern, any pattern appears + * more than (minSupport * size-of-the-dataset) times will be output + * @param maxPatternLength the maximal length of the sequential pattern, any pattern appears + * less than maxPatternLength will be output + * + * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining Sequential Pattern Mining + * (Wikipedia)]] + */ +@Experimental +class PrefixSpan( + private var minSupport: Double, + private var maxPatternLength: Int) extends java.io.Serializable { + + private var absMinSupport: Int = 0 + + /** + * Constructs a default instance with default parameters + * {minSupport: `0.1`, maxPatternLength: 10}. + */ + def this() = this(0.1, 10) + + /** + * Sets the minimal support level (default: `0.1`). + */ + def setMinSupport(minSupport: Double): this.type = { + this.minSupport = minSupport + this + } + + /** + * Sets maximal pattern length. + */ + def setMaxPatternLength(maxPatternLength: Int): this.type = { + this.maxPatternLength = maxPatternLength + this + } + + /** + * Calculate sequential patterns: + * a) find and collect length-one patterns + * b) for each length-one patterns and each sequence, + * emit (pattern (prefix), suffix sequence) as key-value pairs + * c) group by key and then map value iterator to array + * d) local PrefixSpan on each prefix + * @return sequential patterns + */ + def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = { + absMinSupport = getAbsoluteMinSupport(sequences) + val (lengthOnePatternsAndCounts, prefixAndCandidates) = + findLengthOnePatterns(sequences) + val repartitionedRdd = makePrefixProjectedDatabases(prefixAndCandidates) + val nextPatterns = getPatternsInLocal(repartitionedRdd) + val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), x._2)) ++ nextPatterns + allPatterns + } + + private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { + val result = if (minSupport <= 0) { + 0 + }else { + val count = sequences.count() + val support = if (minSupport <= 1) minSupport else 1 + (support * count).toInt + } + result + } + + /** + * Find the patterns that it's length is one + * @param sequences original sequences data + * @return length-one patterns and projection table + */ + private def findLengthOnePatterns( + sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { + val LengthOnePatternAndCounts = sequences + .flatMap(_.distinct.map((_, 1))) + .reduceByKey(_ + _) + val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts + .filter(_._2 < absMinSupport) + .map(_._1) + .collect() + val frequentLengthOnePatterns = LengthOnePatternAndCounts + .filter(_._2 >= absMinSupport) + val frequentLengthOnePatternsArray = frequentLengthOnePatterns + .map(_._1) + .collect() + val filteredSequences = + if (infrequentLengthOnePatterns.isEmpty) { + sequences + } else { + sequences.map { p => + p.filter { x => !infrequentLengthOnePatterns.contains(x) } + } + } + val prefixAndCandidates = filteredSequences.flatMap { x => + frequentLengthOnePatternsArray.map { y => + val sub = getSuffix(y, x) + (Seq(y), sub) + } + }.filter(x => x._2.nonEmpty) + (frequentLengthOnePatterns, prefixAndCandidates) + } + + /** + * Re-partition the RDD data, to get better balance and performance. + * @param data patterns and projected sequences data before re-partition + * @return patterns and projected sequences data after re-partition + */ + private def makePrefixProjectedDatabases( + data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { + val dataMerged = data + .groupByKey() + .mapValues(_.toArray) + dataMerged + } + + /** + * calculate the patterns in local. + * @param data patterns and projected sequences data data + * @return patterns + */ + private def getPatternsInLocal( + data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { + val result = data.flatMap { x => + getPatternsWithPrefix(x._1, x._2) + } + result + } + + /** + * calculate the patterns with one prefix in local. + * @param prefix prefix + * @param projectedDatabase patterns and projected sequences data + * @return patterns + */ + private def getPatternsWithPrefix( + prefix: Seq[Int], + projectedDatabase: Array[Array[Int]]): Array[(Seq[Int], Int)] = { + val prefixAndCounts = projectedDatabase + .flatMap(_.distinct) + .groupBy(x => x) + .mapValues(_.length) + val frequentPrefixExtensions = prefixAndCounts.filter(x => x._2 >= absMinSupport) + val frequentPrefixesAndCounts = frequentPrefixExtensions + .map(x => (prefix ++ Seq(x._1), x._2)) + .toArray + val cleanedSearchSpace = projectedDatabase + .map(x => x.filter(y => frequentPrefixExtensions.contains(y))) + val prefixProjectedDatabases = frequentPrefixExtensions.map { x => + val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) + (prefix ++ Seq(x._1), sub) + }.filter(x => x._2.nonEmpty) + .toArray + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength + if (continueProcess) { + val nextPatterns = prefixProjectedDatabases + .map(x => getPatternsWithPrefix(x._1, x._2)) + .reduce(_ ++ _) + frequentPrefixesAndCounts ++ nextPatterns + } else { + frequentPrefixesAndCounts + } + } + + /** + * calculate suffix sequence following a prefix in a sequence + * @param prefix prefix + * @param sequence original sequence + * @return suffix sequence + */ + private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { + val index = sequence.indexOf(prefix) + if (index == -1) { + Array() + } else { + sequence.drop(index + 1) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala new file mode 100644 index 000000000000..7796f1298891 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.fpm + +import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.rdd.RDD + +class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { + + test("Prefixspan sequences mining using Integer type") { + val sequences = Array( + Array(3, 1, 3, 4, 5), + Array(2, 3, 1), + Array(3, 4, 4, 3), + Array(1, 3, 4, 5), + Array(2, 4, 1), + Array(6, 5, 3)) + + val rdd = sc.parallelize(sequences, 2).cache() + + def formatResultString(data: RDD[(Seq[Int], Int)]): String = { + data.map(x => x._1.mkString(",") + ": " + x._2) + .collect() + .sortWith(_<_) + .mkString("; ") + } + + val prefixspan = new PrefixSpan() + .setMinSupport(0.34) + .setMaxPatternLength(50) + val result1 = prefixspan.run(rdd) + val len1 = result1.count().toInt + val actualValue1 = formatResultString(result1) + val expectedValue1 = + "1,3,4,5: 2; 1,3,4: 2; 1,3,5: 2; 1,3: 2; 1,4,5: 2;" + + " 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2; 3,3: 2;" + + " 3,4,5: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" + assert(expectedValue1 == actualValue1) + + prefixspan.setMinSupport(0.5).setMaxPatternLength(50) + val result2 = prefixspan.run(rdd) + val expectedValue2 = "1: 4; 3,4: 3; 3: 5; 4: 4; 5: 3" + val actualValue2 = formatResultString(result2) + assert(expectedValue2 == actualValue2) + + prefixspan.setMinSupport(0.34).setMaxPatternLength(2) + val result3 = prefixspan.run(rdd) + val actualValue3 = formatResultString(result3) + val expectedValue3 = + "1,3: 2; 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2;" + + " 3,3: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" + assert(expectedValue3 == actualValue3) + } +} From 951fd424ff189f9bf5619a84f3f19e942f592396 Mon Sep 17 00:00:00 2001 From: zhang jiajin Date: Wed, 8 Jul 2015 18:22:16 +0800 Subject: [PATCH 03/12] Delete Prefixspan.scala Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala --- .../apache/spark/mllib/fpm/Prefixspan.scala | 183 ------------------ 1 file changed, 183 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala deleted file mode 100644 index c110a37fce16..000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.fpm - -import org.apache.spark.rdd.RDD - -/** - * - * A parallel PrefixSpan algorithm to mine sequential pattern. - * The PrefixSpan algorithm is described in - * [[http://web.engr.illinois.edu/~hanj/pdf/span01.pdf]]. - * - * @param sequences original sequences data - * @param minSupport the minimal support level of the sequential pattern, any pattern appears - * more than minSupport times will be output - * @param maxPatternLength the maximal length of the sequential pattern, any pattern appears - * less than maxPatternLength will be output - * - * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining Sequential Pattern Mining - * (Wikipedia)]] - */ -class Prefixspan( - val sequences: RDD[Array[Int]], - val minSupport: Int = 2, - val maxPatternLength: Int = 50) extends java.io.Serializable { - - /** - * Calculate sequential patterns: - * a) find and collect length-one patterns - * b) for each length-one patterns and each sequence, - * emit (pattern (prefix), suffix sequence) as key-value pairs - * c) group by key and then map value iterator to array - * d) local PrefixSpan on each prefix - * @return sequential patterns - */ - def run(): RDD[(Seq[Int], Int)] = { - val (patternsOneLength, prefixAndCandidates) = findPatternsLengthOne() - val repartitionedRdd = repartitionSequences(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(repartitionedRdd) - val allPatterns = patternsOneLength.map(x => (Seq(x._1), x._2)) ++ nextPatterns - allPatterns - } - - /** - * Find the patterns that it's length is one - * @return length-one patterns and projection table - */ - private def findPatternsLengthOne(): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { - val patternsOneLength = sequences - .map(_.distinct) - .flatMap(p => p) - .map((_, 1)) - .reduceByKey(_ + _) - - val removedElements: Array[Int] = patternsOneLength - .filter(_._2 < minSupport) - .map(_._1) - .collect() - - val savedElements = patternsOneLength.filter(_._2 >= minSupport) - - val savedElementsArray = savedElements - .map(_._1) - .collect() - - val filteredSequences = - if (removedElements.isEmpty) { - sequences - } else { - sequences.map { p => - p.filter { x => !removedElements.contains(x) } - } - } - - val prefixAndCandidates = filteredSequences.flatMap { x => - savedElementsArray.map { y => - val sub = getSuffix(y, x) - (Seq(y), sub) - } - } - - (savedElements, prefixAndCandidates) - } - - /** - * Re-partition the RDD data, to get better balance and performance. - * @param data patterns and projected sequences data before re-partition - * @return patterns and projected sequences data after re-partition - */ - private def repartitionSequences( - data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { - val dataRemovedEmptyLine = data.filter(x => x._2.nonEmpty) - val dataMerged = dataRemovedEmptyLine - .groupByKey() - .map(x => (x._1, x._2.toArray)) - dataMerged - } - - /** - * calculate the patterns in local. - * @param data patterns and projected sequences data data - * @return patterns - */ - private def getPatternsInLocal( - data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { - val result = data.flatMap { x => - getPatternsWithPrefix(x._1, x._2) - } - result - } - - /** - * calculate the patterns with one prefix in local. - * @param prefix prefix - * @param data patterns and projected sequences data - * @return patterns - */ - private def getPatternsWithPrefix( - prefix: Seq[Int], - data: Array[Array[Int]]): Array[(Seq[Int], Int)] = { - val elements = data - .map(x => x.distinct) - .flatMap(x => x) - .groupBy(x => x) - .map(x => (x._1, x._2.length)) - - val selectedSingleElements = elements.filter(x => x._2 >= minSupport) - - val selectedElements = selectedSingleElements - .map(x => (prefix ++ Seq(x._1), x._2)) - .toArray - - val cleanedSearchSpace = data - .map(x => x.filter(y => selectedSingleElements.contains(y))) - - val newSearchSpace = selectedSingleElements.map { x => - val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) - (prefix ++ Seq(x._1), sub) - }.filter(x => x._2.nonEmpty) - .toArray - - val continueProcess = newSearchSpace.nonEmpty && prefix.length + 1 < maxPatternLength - - if (continueProcess) { - val nextPatterns = newSearchSpace - .map(x => getPatternsWithPrefix(x._1, x._2)) - .reduce(_ ++ _) - selectedElements ++ nextPatterns - } else { - selectedElements - } - } - - /** - * calculate suffix sequence following a prefix in a sequence - * @param prefix prefix - * @param sequence original sequence - * @return suffix sequence - */ - private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { - val index = sequence.indexOf(prefix) - if (index == -1) { - Array() - } else { - sequence.takeRight(sequence.length - index - 1) - } - } -} \ No newline at end of file From a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d Mon Sep 17 00:00:00 2001 From: zhang jiajin Date: Wed, 8 Jul 2015 18:23:31 +0800 Subject: [PATCH 04/12] Delete PrefixspanSuite.scala Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete PrefixspanSuite.scala. --- .../spark/mllib/fpm/PrefixspanSuite.scala | 47 ------------------- 1 file changed, 47 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala deleted file mode 100644 index 770a0c0906f9..000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.mllib.fpm - -import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.util.MLlibTestSparkContext - -class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { - - test("Prefixspan sequences mining using Integer type") { - val sequences = Array( - Array(3, 1, 3, 4, 5), - Array(2, 3, 1), - Array(3, 4, 4, 3), - Array(1, 3, 4, 5), - Array(2, 4, 1), - Array(6, 5, 3)) - - val rdd = sc.parallelize(sequences, 2).cache() - - val prefixspan1 = new Prefixspan(rdd, 2, 50) - val result1 = prefixspan1.run() - assert(result1.count() == 19) - - val prefixspan2 = new Prefixspan(rdd, 3, 50) - val result2 = prefixspan2.run() - assert(result2.count() == 5) - - val prefixspan3 = new Prefixspan(rdd, 2, 2) - val result3 = prefixspan3.run() - assert(result3.count() == 14) - } -} From 89bc368f76c40ad0090a928cec49cd9d28ce666e Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Wed, 8 Jul 2015 18:50:38 +0800 Subject: [PATCH 05/12] Fixed a Scala style error. --- .../main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 70218e2742da..4a78a25f39a4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -84,10 +84,10 @@ class PrefixSpan( allPatterns } - private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { + private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { val result = if (minSupport <= 0) { 0 - }else { + } else { val count = sequences.count() val support = if (minSupport <= 1) minSupport else 1 (support * count).toInt From 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Thu, 9 Jul 2015 22:40:29 +0800 Subject: [PATCH 06/12] Modified the code according to the review comments. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 211 +++++++++++------- .../spark/mllib/fpm/PrefixSpanSuite.scala | 98 ++++++-- 2 files changed, 201 insertions(+), 108 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 4a78a25f39a4..05f8c4186aaf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -17,8 +17,10 @@ package org.apache.spark.mllib.fpm +import org.apache.spark.Logging import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * @@ -37,15 +39,13 @@ import org.apache.spark.rdd.RDD * (Wikipedia)]] */ @Experimental -class PrefixSpan( +class PrefixSpan private ( private var minSupport: Double, - private var maxPatternLength: Int) extends java.io.Serializable { - - private var absMinSupport: Int = 0 + private var maxPatternLength: Int) extends Logging with Serializable { /** * Constructs a default instance with default parameters - * {minSupport: `0.1`, maxPatternLength: 10}. + * {minSupport: `0.1`, maxPatternLength: `10`}. */ def this() = this(0.1, 10) @@ -53,149 +53,192 @@ class PrefixSpan( * Sets the minimal support level (default: `0.1`). */ def setMinSupport(minSupport: Double): this.type = { + require(minSupport >= 0 && minSupport <= 1) this.minSupport = minSupport this } /** - * Sets maximal pattern length. + * Sets maximal pattern length (default: `10`). */ def setMaxPatternLength(maxPatternLength: Int): this.type = { + require(maxPatternLength >= 1) this.maxPatternLength = maxPatternLength this } /** - * Calculate sequential patterns: - * a) find and collect length-one patterns - * b) for each length-one patterns and each sequence, - * emit (pattern (prefix), suffix sequence) as key-value pairs - * c) group by key and then map value iterator to array - * d) local PrefixSpan on each prefix - * @return sequential patterns + * Find the complete set of sequential patterns in the input sequences. + * @param sequences input data set, contains a set of sequences, + * a sequence is an ordered list of elements. + * @return a set of sequential pattern pairs, + * the key of pair is pattern (a list of elements), + * the value of pair is the pattern's support value. */ - def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = { - absMinSupport = getAbsoluteMinSupport(sequences) + def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { + if (sequences.getStorageLevel == StorageLevel.NONE) { + logWarning("Input data is not cached.") + } + val minCount = getAbsoluteMinSupport(sequences) val (lengthOnePatternsAndCounts, prefixAndCandidates) = - findLengthOnePatterns(sequences) + findLengthOnePatterns(minCount, sequences) val repartitionedRdd = makePrefixProjectedDatabases(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(repartitionedRdd) - val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), x._2)) ++ nextPatterns + val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd) + val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)) ++ nextPatterns allPatterns } - private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { - val result = if (minSupport <= 0) { - 0 - } else { - val count = sequences.count() - val support = if (minSupport <= 1) minSupport else 1 - (support * count).toInt - } - result + /** + * Get the absolute minimum support value (sequences count * minSupport). + * @param sequences input data set, contains a set of sequences, + * @return absolute minimum support value, + */ + private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = { + if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong } /** - * Find the patterns that it's length is one + * Generates frequent items by filtering the input data using minimal support level. + * @param minCount the absolute minimum support * @param sequences original sequences data - * @return length-one patterns and projection table + * @return array of frequent pattern ordered by their frequencies */ - private def findLengthOnePatterns( - sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { - val LengthOnePatternAndCounts = sequences - .flatMap(_.distinct.map((_, 1))) + private def getFreqItemAndCounts( + minCount: Long, + sequences: RDD[Array[Int]]): RDD[(Int, Long)] = { + sequences.flatMap(_.distinct.map((_, 1L))) .reduceByKey(_ + _) - val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts - .filter(_._2 < absMinSupport) - .map(_._1) - .collect() - val frequentLengthOnePatterns = LengthOnePatternAndCounts - .filter(_._2 >= absMinSupport) - val frequentLengthOnePatternsArray = frequentLengthOnePatterns - .map(_._1) - .collect() - val filteredSequences = - if (infrequentLengthOnePatterns.isEmpty) { - sequences - } else { - sequences.map { p => - p.filter { x => !infrequentLengthOnePatterns.contains(x) } - } - } - val prefixAndCandidates = filteredSequences.flatMap { x => - frequentLengthOnePatternsArray.map { y => + .filter(_._2 >= minCount) + } + + /** + * Generates frequent items by filtering the input data using minimal support level. + * @param minCount the absolute minimum support + * @param sequences sequences data + * @return array of frequent pattern ordered by their frequencies + */ + private def getFreqItemAndCounts( + minCount: Long, + sequences: Array[Array[Int]]): Array[(Int, Long)] = { + sequences.flatMap(_.distinct) + .groupBy(x => x) + .mapValues(_.length.toLong) + .filter(_._2 >= minCount) + .toArray + } + + /** + * Get the frequent prefixes' projected database. + * @param frequentPrefixes frequent prefixes + * @param sequences sequences data + * @return prefixes and projected database + */ + private def getPatternAndProjectedDatabase( + frequentPrefixes: Array[Int], + sequences: RDD[Array[Int]]): RDD[(Array[Int], Array[Int])] = { + val filteredSequences = sequences.map { p => + p.filter (frequentPrefixes.contains(_) ) + } + filteredSequences.flatMap { x => + frequentPrefixes.map { y => val sub = getSuffix(y, x) - (Seq(y), sub) + (Array(y), sub) } }.filter(x => x._2.nonEmpty) - (frequentLengthOnePatterns, prefixAndCandidates) } /** - * Re-partition the RDD data, to get better balance and performance. + * Get the frequent prefixes' projected database. + * @param prePrefix the frequent prefixes' prefix + * @param frequentPrefixes frequent prefixes + * @param sequences sequences data + * @return prefixes and projected database + */ + private def getPatternAndProjectedDatabase( + prePrefix: Array[Int], + frequentPrefixes: Array[Int], + sequences: Array[Array[Int]]): Array[(Array[Int], Array[Array[Int]])] = { + val filteredProjectedDatabase = sequences + .map(x => x.filter(frequentPrefixes.contains(_))) + frequentPrefixes.map { x => + val sub = filteredProjectedDatabase.map(y => getSuffix(x, y)).filter(_.nonEmpty) + (prePrefix ++ Array(x), sub) + }.filter(x => x._2.nonEmpty) + } + + /** + * Find the patterns that it's length is one + * @param minCount the absolute minimum support + * @param sequences original sequences data + * @return length-one patterns and projection table + */ + private def findLengthOnePatterns( + minCount: Long, + sequences: RDD[Array[Int]]): (RDD[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { + val frequentLengthOnePatternAndCounts = getFreqItemAndCounts(minCount, sequences) + val prefixAndProjectedDatabase = getPatternAndProjectedDatabase( + frequentLengthOnePatternAndCounts.keys.collect(), sequences) + (frequentLengthOnePatternAndCounts, prefixAndProjectedDatabase) + } + + /** + * Constructs prefix-projected databases from (prefix, suffix) pairs. * @param data patterns and projected sequences data before re-partition * @return patterns and projected sequences data after re-partition */ private def makePrefixProjectedDatabases( - data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { - val dataMerged = data + data: RDD[(Array[Int], Array[Int])]): RDD[(Array[Int], Array[Array[Int]])] = { + data.map(x => (x._1.toSeq, x._2)) .groupByKey() - .mapValues(_.toArray) - dataMerged + .map(x => (x._1.toArray, x._2.toArray)) } /** * calculate the patterns in local. + * @param minCount the absolute minimum support * @param data patterns and projected sequences data data * @return patterns */ private def getPatternsInLocal( - data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { - val result = data.flatMap { x => - getPatternsWithPrefix(x._1, x._2) + minCount: Long, + data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(Array[Int], Long)] = { + data.flatMap { x => + getPatternsWithPrefix(minCount, x._1, x._2) } - result } /** * calculate the patterns with one prefix in local. + * @param minCount the absolute minimum support * @param prefix prefix * @param projectedDatabase patterns and projected sequences data * @return patterns */ private def getPatternsWithPrefix( - prefix: Seq[Int], - projectedDatabase: Array[Array[Int]]): Array[(Seq[Int], Int)] = { - val prefixAndCounts = projectedDatabase - .flatMap(_.distinct) - .groupBy(x => x) - .mapValues(_.length) - val frequentPrefixExtensions = prefixAndCounts.filter(x => x._2 >= absMinSupport) - val frequentPrefixesAndCounts = frequentPrefixExtensions - .map(x => (prefix ++ Seq(x._1), x._2)) - .toArray - val cleanedSearchSpace = projectedDatabase - .map(x => x.filter(y => frequentPrefixExtensions.contains(y))) - val prefixProjectedDatabases = frequentPrefixExtensions.map { x => - val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) - (prefix ++ Seq(x._1), sub) - }.filter(x => x._2.nonEmpty) - .toArray + minCount: Long, + prefix: Array[Int], + projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { + val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) + val frequentPatternAndCounts = frequentPrefixAndCounts + .map(x => (prefix ++ Array(x._1), x._2)) + val prefixProjectedDatabases = getPatternAndProjectedDatabase( + prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength if (continueProcess) { val nextPatterns = prefixProjectedDatabases - .map(x => getPatternsWithPrefix(x._1, x._2)) + .map(x => getPatternsWithPrefix(minCount, x._1, x._2)) .reduce(_ ++ _) - frequentPrefixesAndCounts ++ nextPatterns + frequentPatternAndCounts ++ nextPatterns } else { - frequentPrefixesAndCounts + frequentPatternAndCounts } } /** * calculate suffix sequence following a prefix in a sequence * @param prefix prefix - * @param sequence original sequence + * @param sequence sequence * @return suffix sequence */ private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 7796f1298891..133fa3b41a75 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -22,48 +22,98 @@ import org.apache.spark.rdd.RDD class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { - test("Prefixspan sequences mining using Integer type") { + test("PrefixSpan using Integer type") { + + /* + library("arulesSequences") + prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE")) + freqItemSeq = cspade( + prefixSpanSeqs, + parameter = list(support = 2 / length(unique(transactionInfo(prefixSpanSeqs)$sequenceID)), maxlen = 2 )) + resSeq = as(freqItemSeq, "data.frame") + resSeq + */ + val sequences = Array( - Array(3, 1, 3, 4, 5), - Array(2, 3, 1), - Array(3, 4, 4, 3), Array(1, 3, 4, 5), + Array(2, 3, 1), Array(2, 4, 1), + Array(3, 1, 3, 4, 5), + Array(3, 4, 4, 3), Array(6, 5, 3)) val rdd = sc.parallelize(sequences, 2).cache() - def formatResultString(data: RDD[(Seq[Int], Int)]): String = { - data.map(x => x._1.mkString(",") + ": " + x._2) - .collect() - .sortWith(_<_) - .mkString("; ") + def compareResult( + expectedValue: Array[(Array[Int], Long)], + actualValue: Array[(Array[Int], Long)]): Boolean = { + val sortedExpectedValue = expectedValue.sortWith{ (x, y) => + x._1.mkString(",") + ":" + x._2 < y._1.mkString(",") + ":" + y._2 + } + val sortedActualValue = actualValue.sortWith{ (x, y) => + x._1.mkString(",") + ":" + x._2 < y._1.mkString(",") + ":" + y._2 + } + sortedExpectedValue.zip(sortedActualValue) + .map(x => x._1._1.mkString(",") == x._2._1.mkString(",") && x._1._2 == x._2._2) + .reduce(_&&_) } val prefixspan = new PrefixSpan() .setMinSupport(0.34) .setMaxPatternLength(50) val result1 = prefixspan.run(rdd) - val len1 = result1.count().toInt - val actualValue1 = formatResultString(result1) - val expectedValue1 = - "1,3,4,5: 2; 1,3,4: 2; 1,3,5: 2; 1,3: 2; 1,4,5: 2;" + - " 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2; 3,3: 2;" + - " 3,4,5: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" - assert(expectedValue1 == actualValue1) + val expectedValue1 = Array( + (Array(1), 4L), + (Array(1,3),2L), + (Array(1,3,4), 2L), + (Array(1,3,4,5), 2L), + (Array(1,3,5), 2L), + (Array(1,4), 2L), + (Array(1,4,5), 2L), + (Array(1,5), 2L), + (Array(2), 2L), + (Array(2,1), 2L), + (Array(3), 5L), + (Array(3,1), 2L), + (Array(3,3), 2L), + (Array(3,4), 3L), + (Array(3,4,5), 2L), + (Array(3,5), 2L), + (Array(4), 4L), + (Array(4,5), 2L), + (Array(5), 3L) + ) + assert(compareResult(expectedValue1, result1.collect())) prefixspan.setMinSupport(0.5).setMaxPatternLength(50) val result2 = prefixspan.run(rdd) - val expectedValue2 = "1: 4; 3,4: 3; 3: 5; 4: 4; 5: 3" - val actualValue2 = formatResultString(result2) - assert(expectedValue2 == actualValue2) + val expectedValue2 = Array( + (Array(1), 4L), + (Array(3), 5L), + (Array(3,4), 3L), + (Array(4), 4L), + (Array(5), 3L) + ) + assert(compareResult(expectedValue2, result2.collect())) prefixspan.setMinSupport(0.34).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) - val actualValue3 = formatResultString(result3) - val expectedValue3 = - "1,3: 2; 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2;" + - " 3,3: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" - assert(expectedValue3 == actualValue3) + val expectedValue3 = Array( + (Array(1), 4L), + (Array(1,3), 2L), + (Array(1,4), 2L), + (Array(1,5), 2L), + (Array(2,1), 2L), + (Array(2), 2L), + (Array(3), 5L), + (Array(3,1), 2L), + (Array(3,3), 2L), + (Array(3,4), 3L), + (Array(3,5), 2L), + (Array(4), 4L), + (Array(4,5), 2L), + (Array(5), 3L) + ) + assert(compareResult(expectedValue3, result3.collect())) } } From 4c60fb36148206abd67fe51cea667ee3d63e490e Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Thu, 9 Jul 2015 23:01:45 +0800 Subject: [PATCH 07/12] Fix some Scala style errors. --- .../spark/mllib/fpm/PrefixSpanSuite.scala | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 133fa3b41a75..0cbbf3741d02 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -29,7 +29,8 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE")) freqItemSeq = cspade( prefixSpanSeqs, - parameter = list(support = 2 / length(unique(transactionInfo(prefixSpanSeqs)$sequenceID)), maxlen = 2 )) + parameter = list(support = + 2 / length(unique(transactionInfo(prefixSpanSeqs)$sequenceID)), maxlen = 2 )) resSeq = as(freqItemSeq, "data.frame") resSeq */ @@ -64,23 +65,23 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val result1 = prefixspan.run(rdd) val expectedValue1 = Array( (Array(1), 4L), - (Array(1,3),2L), - (Array(1,3,4), 2L), - (Array(1,3,4,5), 2L), - (Array(1,3,5), 2L), - (Array(1,4), 2L), - (Array(1,4,5), 2L), - (Array(1,5), 2L), + (Array(1, 3), 2L), + (Array(1, 3, 4), 2L), + (Array(1, 3, 4, 5), 2L), + (Array(1, 3, 5), 2L), + (Array(1, 4), 2L), + (Array(1, 4, 5), 2L), + (Array(1, 5), 2L), (Array(2), 2L), - (Array(2,1), 2L), + (Array(2, 1), 2L), (Array(3), 5L), - (Array(3,1), 2L), - (Array(3,3), 2L), - (Array(3,4), 3L), - (Array(3,4,5), 2L), - (Array(3,5), 2L), + (Array(3, 1), 2L), + (Array(3, 3), 2L), + (Array(3, 4), 3L), + (Array(3, 4, 5), 2L), + (Array(3, 5), 2L), (Array(4), 4L), - (Array(4,5), 2L), + (Array(4, 5), 2L), (Array(5), 3L) ) assert(compareResult(expectedValue1, result1.collect())) @@ -90,7 +91,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val expectedValue2 = Array( (Array(1), 4L), (Array(3), 5L), - (Array(3,4), 3L), + (Array(3, 4), 3L), (Array(4), 4L), (Array(5), 3L) ) @@ -100,18 +101,18 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val result3 = prefixspan.run(rdd) val expectedValue3 = Array( (Array(1), 4L), - (Array(1,3), 2L), + (Array(1, 3), 2L), (Array(1,4), 2L), - (Array(1,5), 2L), - (Array(2,1), 2L), + (Array(1, 5), 2L), + (Array(2, 1), 2L), (Array(2), 2L), (Array(3), 5L), - (Array(3,1), 2L), - (Array(3,3), 2L), - (Array(3,4), 3L), - (Array(3,5), 2L), + (Array(3, 1), 2L), + (Array(3, 3), 2L), + (Array(3, 4), 3L), + (Array(3, 5), 2L), (Array(4), 4L), - (Array(4,5), 2L), + (Array(4, 5), 2L), (Array(5), 3L) ) assert(compareResult(expectedValue3, result3.collect())) From ba5df346543e9aee119bd781b257860b65bbe7df Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Thu, 9 Jul 2015 23:10:25 +0800 Subject: [PATCH 08/12] Fix a Scala style error. --- .../test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 0cbbf3741d02..e4bc77849bd2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -102,7 +102,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val expectedValue3 = Array( (Array(1), 4L), (Array(1, 3), 2L), - (Array(1,4), 2L), + (Array(1, 4), 2L), (Array(1, 5), 2L), (Array(2, 1), 2L), (Array(2), 2L), From 574e56ccfb271d0ed86c3eba95d1a11a8688495d Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Fri, 10 Jul 2015 19:49:06 +0800 Subject: [PATCH 09/12] Add new object LocalPrefixSpan, and do some optimization. --- .../spark/mllib/fpm/LocalPrefixSpan.scala | 129 ++++++++++++++++++ .../apache/spark/mllib/fpm/PrefixSpan.scala | 127 ++++------------- .../spark/mllib/fpm/PrefixSpanSuite.scala | 4 +- 3 files changed, 158 insertions(+), 102 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala new file mode 100644 index 000000000000..dc555001b777 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fpm + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental + +/** + * + * :: Experimental :: + * + * Calculate all patterns of a projected database in local. + */ +@Experimental +private[fpm] object LocalPrefixSpan extends Logging with Serializable { + + /** + * Calculate all patterns of a projected database in local. + * @param minCount minimum count + * @param maxPatternLength maximum pattern length + * @param prefix prefix + * @param projectedDatabase the projected dabase + * @return a set of sequential pattern pairs, + * the key of pair is pattern (a list of elements), + * the value of pair is the pattern's count. + */ + def run( + minCount: Long, + maxPatternLength: Int, + prefix: Array[Int], + projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { + getPatternsWithPrefix(minCount, maxPatternLength, prefix, projectedDatabase) + } + + /** + * calculate suffix sequence following a prefix in a sequence + * @param prefix prefix + * @param sequence sequence + * @return suffix sequence + */ + def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { + val index = sequence.indexOf(prefix) + if (index == -1) { + Array() + } else { + sequence.drop(index + 1) + } + } + + /** + * Generates frequent items by filtering the input data using minimal count level. + * @param minCount the absolute minimum count + * @param sequences sequences data + * @return array of item and count pair + */ + private def getFreqItemAndCounts( + minCount: Long, + sequences: Array[Array[Int]]): Array[(Int, Long)] = { + sequences.flatMap(_.distinct) + .groupBy(x => x) + .mapValues(_.length.toLong) + .filter(_._2 >= minCount) + .toArray + } + + /** + * Get the frequent prefixes' projected database. + * @param prePrefix the frequent prefixes' prefix + * @param frequentPrefixes frequent prefixes + * @param sequences sequences data + * @return prefixes and projected database + */ + private def getPatternAndProjectedDatabase( + prePrefix: Array[Int], + frequentPrefixes: Array[Int], + sequences: Array[Array[Int]]): Array[(Array[Int], Array[Array[Int]])] = { + val filteredProjectedDatabase = sequences + .map(x => x.filter(frequentPrefixes.contains(_))) + frequentPrefixes.map { x => + val sub = filteredProjectedDatabase.map(y => getSuffix(x, y)).filter(_.nonEmpty) + (prePrefix ++ Array(x), sub) + }.filter(x => x._2.nonEmpty) + } + + /** + * Calculate all patterns of a projected database in local. + * @param minCount the minimum count + * @param maxPatternLength maximum pattern length + * @param prefix prefix + * @param projectedDatabase projected database + * @return patterns + */ + private def getPatternsWithPrefix( + minCount: Long, + maxPatternLength: Int, + prefix: Array[Int], + projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { + val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) + val frequentPatternAndCounts = frequentPrefixAndCounts + .map(x => (prefix ++ Array(x._1), x._2)) + val prefixProjectedDatabases = getPatternAndProjectedDatabase( + prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) + + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength + if (continueProcess) { + val nextPatterns = prefixProjectedDatabases + .map(x => getPatternsWithPrefix(minCount, maxPatternLength, x._1, x._2)) + .reduce(_ ++ _) + frequentPatternAndCounts ++ nextPatterns + } else { + frequentPatternAndCounts + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 05f8c4186aaf..2239aa529695 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -53,7 +53,8 @@ class PrefixSpan private ( * Sets the minimal support level (default: `0.1`). */ def setMinSupport(minSupport: Double): this.type = { - require(minSupport >= 0 && minSupport <= 1) + require(minSupport >= 0 && minSupport <= 1, + "The minimum support value must be between 0 and 1, including 0 and 1.") this.minSupport = minSupport this } @@ -62,7 +63,8 @@ class PrefixSpan private ( * Sets maximal pattern length (default: `10`). */ def setMaxPatternLength(maxPatternLength: Int): this.type = { - require(maxPatternLength >= 1) + require(maxPatternLength >= 1, + "The maximum pattern length value must be greater than 0.") this.maxPatternLength = maxPatternLength this } @@ -73,35 +75,38 @@ class PrefixSpan private ( * a sequence is an ordered list of elements. * @return a set of sequential pattern pairs, * the key of pair is pattern (a list of elements), - * the value of pair is the pattern's support value. + * the value of pair is the pattern's count. */ def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { if (sequences.getStorageLevel == StorageLevel.NONE) { logWarning("Input data is not cached.") } - val minCount = getAbsoluteMinSupport(sequences) + val minCount = getMinCount(sequences) val (lengthOnePatternsAndCounts, prefixAndCandidates) = findLengthOnePatterns(minCount, sequences) - val repartitionedRdd = makePrefixProjectedDatabases(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd) - val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)) ++ nextPatterns + val projectedDatabase = makePrefixProjectedDatabases(prefixAndCandidates) + val nextPatterns = getPatternsInLocal(minCount, projectedDatabase) + val lengthOnePatternsAndCountsRdd = + sequences.sparkContext.parallelize( + lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) + val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns allPatterns } /** - * Get the absolute minimum support value (sequences count * minSupport). + * Get the minimum count (sequences count * minSupport). * @param sequences input data set, contains a set of sequences, - * @return absolute minimum support value, + * @return minimum count, */ - private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = { - if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong + private def getMinCount(sequences: RDD[Array[Int]]): Long = { + if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong } /** - * Generates frequent items by filtering the input data using minimal support level. - * @param minCount the absolute minimum support + * Generates frequent items by filtering the input data using minimal count level. + * @param minCount the absolute minimum count * @param sequences original sequences data - * @return array of frequent pattern ordered by their frequencies + * @return array of item and count pair */ private def getFreqItemAndCounts( minCount: Long, @@ -111,22 +116,6 @@ class PrefixSpan private ( .filter(_._2 >= minCount) } - /** - * Generates frequent items by filtering the input data using minimal support level. - * @param minCount the absolute minimum support - * @param sequences sequences data - * @return array of frequent pattern ordered by their frequencies - */ - private def getFreqItemAndCounts( - minCount: Long, - sequences: Array[Array[Int]]): Array[(Int, Long)] = { - sequences.flatMap(_.distinct) - .groupBy(x => x) - .mapValues(_.length.toLong) - .filter(_._2 >= minCount) - .toArray - } - /** * Get the frequent prefixes' projected database. * @param frequentPrefixes frequent prefixes @@ -141,44 +130,25 @@ class PrefixSpan private ( } filteredSequences.flatMap { x => frequentPrefixes.map { y => - val sub = getSuffix(y, x) + val sub = LocalPrefixSpan.getSuffix(y, x) (Array(y), sub) - } - }.filter(x => x._2.nonEmpty) - } - - /** - * Get the frequent prefixes' projected database. - * @param prePrefix the frequent prefixes' prefix - * @param frequentPrefixes frequent prefixes - * @param sequences sequences data - * @return prefixes and projected database - */ - private def getPatternAndProjectedDatabase( - prePrefix: Array[Int], - frequentPrefixes: Array[Int], - sequences: Array[Array[Int]]): Array[(Array[Int], Array[Array[Int]])] = { - val filteredProjectedDatabase = sequences - .map(x => x.filter(frequentPrefixes.contains(_))) - frequentPrefixes.map { x => - val sub = filteredProjectedDatabase.map(y => getSuffix(x, y)).filter(_.nonEmpty) - (prePrefix ++ Array(x), sub) - }.filter(x => x._2.nonEmpty) + }.filter(_._2.nonEmpty) + } } /** * Find the patterns that it's length is one - * @param minCount the absolute minimum support + * @param minCount the minimum count * @param sequences original sequences data * @return length-one patterns and projection table */ private def findLengthOnePatterns( minCount: Long, - sequences: RDD[Array[Int]]): (RDD[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { + sequences: RDD[Array[Int]]): (Array[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { val frequentLengthOnePatternAndCounts = getFreqItemAndCounts(minCount, sequences) val prefixAndProjectedDatabase = getPatternAndProjectedDatabase( frequentLengthOnePatternAndCounts.keys.collect(), sequences) - (frequentLengthOnePatternAndCounts, prefixAndProjectedDatabase) + (frequentLengthOnePatternAndCounts.collect(), prefixAndProjectedDatabase) } /** @@ -195,7 +165,7 @@ class PrefixSpan private ( /** * calculate the patterns in local. - * @param minCount the absolute minimum support + * @param minCount the absolute minimum count * @param data patterns and projected sequences data data * @return patterns */ @@ -203,50 +173,7 @@ class PrefixSpan private ( minCount: Long, data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(Array[Int], Long)] = { data.flatMap { x => - getPatternsWithPrefix(minCount, x._1, x._2) - } - } - - /** - * calculate the patterns with one prefix in local. - * @param minCount the absolute minimum support - * @param prefix prefix - * @param projectedDatabase patterns and projected sequences data - * @return patterns - */ - private def getPatternsWithPrefix( - minCount: Long, - prefix: Array[Int], - projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { - val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) - val frequentPatternAndCounts = frequentPrefixAndCounts - .map(x => (prefix ++ Array(x._1), x._2)) - val prefixProjectedDatabases = getPatternAndProjectedDatabase( - prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) - - val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength - if (continueProcess) { - val nextPatterns = prefixProjectedDatabases - .map(x => getPatternsWithPrefix(minCount, x._1, x._2)) - .reduce(_ ++ _) - frequentPatternAndCounts ++ nextPatterns - } else { - frequentPatternAndCounts - } - } - - /** - * calculate suffix sequence following a prefix in a sequence - * @param prefix prefix - * @param sequence sequence - * @return suffix sequence - */ - private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { - val index = sequence.indexOf(prefix) - if (index == -1) { - Array() - } else { - sequence.drop(index + 1) + LocalPrefixSpan.run(minCount, maxPatternLength, x._1, x._2) } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index e4bc77849bd2..413436d3db85 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -60,7 +60,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { } val prefixspan = new PrefixSpan() - .setMinSupport(0.34) + .setMinSupport(0.33) .setMaxPatternLength(50) val result1 = prefixspan.run(rdd) val expectedValue1 = Array( @@ -97,7 +97,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { ) assert(compareResult(expectedValue2, result2.collect())) - prefixspan.setMinSupport(0.34).setMaxPatternLength(2) + prefixspan.setMinSupport(0.33).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) val expectedValue3 = Array( (Array(1), 4L), From ca9c4c8fa84202d8d533c51c277138461ba096a7 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Sat, 11 Jul 2015 10:40:24 +0800 Subject: [PATCH 10/12] Modified the code according to the review comments. --- .../spark/mllib/fpm/LocalPrefixSpan.scala | 50 +++++++------------ .../apache/spark/mllib/fpm/PrefixSpan.scala | 42 ++++------------ 2 files changed, 27 insertions(+), 65 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala index dc555001b777..39c48b084e55 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala @@ -30,13 +30,13 @@ import org.apache.spark.annotation.Experimental private[fpm] object LocalPrefixSpan extends Logging with Serializable { /** - * Calculate all patterns of a projected database in local. + * Calculate all patterns of a projected database. * @param minCount minimum count * @param maxPatternLength maximum pattern length * @param prefix prefix * @param projectedDatabase the projected dabase * @return a set of sequential pattern pairs, - * the key of pair is pattern (a list of elements), + * the key of pair is sequential pattern (a list of items), * the value of pair is the pattern's count. */ def run( @@ -44,7 +44,21 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { maxPatternLength: Int, prefix: Array[Int], projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { - getPatternsWithPrefix(minCount, maxPatternLength, prefix, projectedDatabase) + val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) + val frequentPatternAndCounts = frequentPrefixAndCounts + .map(x => (prefix ++ Array(x._1), x._2)) + val prefixProjectedDatabases = getPatternAndProjectedDatabase( + prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) + + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength + if (continueProcess) { + val nextPatterns = prefixProjectedDatabases + .map(x => run(minCount, maxPatternLength, x._1, x._2)) + .reduce(_ ++ _) + frequentPatternAndCounts ++ nextPatterns + } else { + frequentPatternAndCounts + } } /** @@ -96,34 +110,4 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { (prePrefix ++ Array(x), sub) }.filter(x => x._2.nonEmpty) } - - /** - * Calculate all patterns of a projected database in local. - * @param minCount the minimum count - * @param maxPatternLength maximum pattern length - * @param prefix prefix - * @param projectedDatabase projected database - * @return patterns - */ - private def getPatternsWithPrefix( - minCount: Long, - maxPatternLength: Int, - prefix: Array[Int], - projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { - val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) - val frequentPatternAndCounts = frequentPrefixAndCounts - .map(x => (prefix ++ Array(x._1), x._2)) - val prefixProjectedDatabases = getPatternAndProjectedDatabase( - prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) - - val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength - if (continueProcess) { - val nextPatterns = prefixProjectedDatabases - .map(x => getPatternsWithPrefix(minCount, maxPatternLength, x._1, x._2)) - .reduce(_ ++ _) - frequentPatternAndCounts ++ nextPatterns - } else { - frequentPatternAndCounts - } - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 2239aa529695..9d8c60ef0fc4 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -82,10 +82,15 @@ class PrefixSpan private ( logWarning("Input data is not cached.") } val minCount = getMinCount(sequences) - val (lengthOnePatternsAndCounts, prefixAndCandidates) = - findLengthOnePatterns(minCount, sequences) - val projectedDatabase = makePrefixProjectedDatabases(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(minCount, projectedDatabase) + val lengthOnePatternsAndCounts = + getFreqItemAndCounts(minCount, sequences).collect() + val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase( + lengthOnePatternsAndCounts.map(_._1), sequences) + val groupedProjectedDatabase = prefixAndProjectedDatabase + .map(x => (x._1.toSeq, x._2)) + .groupByKey() + .map(x => (x._1.toArray, x._2.toArray)) + val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase) val lengthOnePatternsAndCountsRdd = sequences.sparkContext.parallelize( lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) @@ -122,7 +127,7 @@ class PrefixSpan private ( * @param sequences sequences data * @return prefixes and projected database */ - private def getPatternAndProjectedDatabase( + private def getPrefixAndProjectedDatabase( frequentPrefixes: Array[Int], sequences: RDD[Array[Int]]): RDD[(Array[Int], Array[Int])] = { val filteredSequences = sequences.map { p => @@ -136,33 +141,6 @@ class PrefixSpan private ( } } - /** - * Find the patterns that it's length is one - * @param minCount the minimum count - * @param sequences original sequences data - * @return length-one patterns and projection table - */ - private def findLengthOnePatterns( - minCount: Long, - sequences: RDD[Array[Int]]): (Array[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { - val frequentLengthOnePatternAndCounts = getFreqItemAndCounts(minCount, sequences) - val prefixAndProjectedDatabase = getPatternAndProjectedDatabase( - frequentLengthOnePatternAndCounts.keys.collect(), sequences) - (frequentLengthOnePatternAndCounts.collect(), prefixAndProjectedDatabase) - } - - /** - * Constructs prefix-projected databases from (prefix, suffix) pairs. - * @param data patterns and projected sequences data before re-partition - * @return patterns and projected sequences data after re-partition - */ - private def makePrefixProjectedDatabases( - data: RDD[(Array[Int], Array[Int])]): RDD[(Array[Int], Array[Array[Int]])] = { - data.map(x => (x._1.toSeq, x._2)) - .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) - } - /** * calculate the patterns in local. * @param minCount the absolute minimum count From 22b0ef463beb0e0fe9cc696989245da79722a3a6 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Tue, 14 Jul 2015 10:21:04 +0800 Subject: [PATCH 11/12] Add feature: Collect enough frequent prefixes before projection in PrefixSpan. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 75 ++++++++++++++++--- 1 file changed, 65 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 9d8c60ef0fc4..82d864b44fa6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -43,6 +43,8 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { + private val minPatternsBeforeShuffle: Int = 20 + /** * Constructs a default instance with default parameters * {minSupport: `0.1`, maxPatternLength: `10`}. @@ -86,16 +88,69 @@ class PrefixSpan private ( getFreqItemAndCounts(minCount, sequences).collect() val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase( lengthOnePatternsAndCounts.map(_._1), sequences) - val groupedProjectedDatabase = prefixAndProjectedDatabase - .map(x => (x._1.toSeq, x._2)) - .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) - val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase) - val lengthOnePatternsAndCountsRdd = - sequences.sparkContext.parallelize( - lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) - val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns - allPatterns + + var patternsCount = lengthOnePatternsAndCounts.length + var allPatternAndCounts = sequences.sparkContext.parallelize( + lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) + var currentProjectedDatabase = prefixAndProjectedDatabase + while (patternsCount <= minPatternsBeforeShuffle && + currentProjectedDatabase.count() != 0) { + val (nextPatternAndCounts, nextProjectedDatabase) = + getPatternCountsAndProjectedDatabase(minCount, currentProjectedDatabase) + patternsCount = nextPatternAndCounts.count().toInt + currentProjectedDatabase = nextProjectedDatabase + allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + } + if (patternsCount > 0) { + val groupedProjectedDatabase = currentProjectedDatabase + .map(x => (x._1.toSeq, x._2)) + .groupByKey() + .map(x => (x._1.toArray, x._2.toArray)) + val nextPatternAndCounts = getPatternsInLocal(minCount, groupedProjectedDatabase) + allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + } + allPatternAndCounts + } + + /** + * Get the pattern and counts, and projected database + * @param minCount minimum count + * @param prefixAndProjectedDatabase prefix and projected database, + * @return pattern and counts, and projected database + * (Array[pattern, count], RDD[prefix, projected database ]) + */ + private def getPatternCountsAndProjectedDatabase( + minCount: Long, + prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]): + (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { + val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x => + x._2.distinct.map(y => ((x._1.toSeq, y), 1L)) + }.reduceByKey(_+_) + .filter(_._2 >= minCount) + val patternAndCounts = prefixAndFreqentItemAndCounts + .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2)) + val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length + if (prefixlength + 1 >= maxPatternLength) { + (patternAndCounts, prefixAndProjectedDatabase.filter(x => false)) + } else { + val frequentItemsMap = prefixAndFreqentItemAndCounts + .keys.map(x => (x._1, x._2)) + .groupByKey() + .mapValues(_.toSet) + .collect + .toMap + val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase + .filter(x => frequentItemsMap.contains(x._1)) + .flatMap { x => + val frequentItemSet = frequentItemsMap(x._1) + val filteredSequence = x._2.filter(frequentItemSet.contains(_)) + val subProjectedDabase = frequentItemSet.map{ y => + (y, LocalPrefixSpan.getSuffix(y, filteredSequence)) + }.filter(_._2.nonEmpty) + subProjectedDabase.map(y => (x._1 ++ Array(y._1), y._2)) + } + (patternAndCounts, nextPrefixAndProjectedDatabase) + } } /** From 078d4101f56c68c6f191de57f9e542a80f2c89b5 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Tue, 14 Jul 2015 10:46:05 +0800 Subject: [PATCH 12/12] fix a scala style error. --- .../src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 82d864b44fa6..33e381e6d4d6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -125,7 +125,7 @@ class PrefixSpan private ( (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x => x._2.distinct.map(y => ((x._1.toSeq, y), 1L)) - }.reduceByKey(_+_) + }.reduceByKey(_ + _) .filter(_._2 >= minCount) val patternAndCounts = prefixAndFreqentItemAndCounts .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2))