From e6935ebf841300a83f58afcdcc2f00fe94d53bc0 Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 6 Mar 2017 03:58:11 +0100 Subject: [PATCH 01/22] Inital implementation --- python/pyspark/ml/fpm.py | 74 +++++++++++++++++++++++++++++++ python/pyspark/ml/param/shared.py | 52 ++++++++++++++++++++++ python/pyspark/ml/tests.py | 40 +++++++++++++++++ 3 files changed, 166 insertions(+) create mode 100644 python/pyspark/ml/fpm.py diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py new file mode 100644 index 000000000000..f22f26b42bdd --- /dev/null +++ b/python/pyspark/ml/fpm.py @@ -0,0 +1,74 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark import keyword_only +from pyspark.ml.util import * +from pyspark.ml.wrapper import JavaEstimator, JavaModel +from pyspark.ml.param.shared import * + +__all__ = ["FPGrowth", "FPGrowthModel"] + + +class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): + """ + .. versionadded:: 2.2.0 + """ + @property + @since("2.2.0") + def freqItemsets(self): + """ + """ + return self._call_java("freqItemsets") + + @property + @since("2.2.0") + def associationRules(self): + """ + """ + return self._call_java("associationRules") + + +class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, + HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): + """ + A parallel FP-growth algorithm to mine frequent itemsets. + + .. versionadded:: 2.2.0 + """ + @keyword_only + def __init__(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", + predictionCol="prediction", numPartitions=None): + """ + """ + super(FPGrowth, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid) + self._setDefault(minSupport=0.3, minConfidence=0.8, + featuresCol="features", predictionCol="prediction") + kwargs = self._input_kwargs + self.setParams(**kwargs) + + @keyword_only + @since("2.2.0") + def setParams(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", + predictionCol="prediction", numPartitions=None): + """ + """ + kwargs = self._input_kwargs + return self._set(**kwargs) + + def _create_model(self, java_model): + return FPGrowthModel(java_model) diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 163a0e2b3a96..0a938caa383d 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -696,3 +696,55 @@ def getCacheNodeIds(self): """ return self.getOrDefault(self.cacheNodeIds) + +class HasSupport(Params): + """ + Mixin for param support: [0.0, 1.0]. + """ + + minSupport = Param( + Params._dummy(), + "minSupport", + "Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears more " + "than (minSupport * size-of-the-dataset) times will be output", + typeConverter=TypeConverters.toFloat) + + def setMinSupport(self, value): + """ + Sets the value of :py:attr:`minSupport`. + """ + if not 0 <= value <= 1: + ValueError("Support must be in range [0, 1]") + return self._set(minSupport=value) + + def getMinSupport(self): + """ + Gets the value of minSupport or its default value. + """ + return self.getOrDefault(self.minSupport) + + +class HasConfidence(Params): + """ + Mixin for param confidence: [0.0, 1.0]. + """ + + minConfidence = Param( + Params._dummy(), + "minConfidence", + "Minimal confidence for generating Association Rule. [0.0, 1.0]", + typeConverter=TypeConverters.toFloat) + + def setMinConfidence(self, value): + """ + Sets the value of :py:attr:`minConfidence`. + """ + if not 0 <= value <= 1: + ValueError("Confidence must be in range [0, 1]") + return self._set(minConfidence=value) + + def getMinConfidence(self): + """ + Gets the value of minConfidence or its default value. + """ + return self.getOrDefault(self.minConfidence) \ No newline at end of file diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index cc559db58720..cb5beebc942e 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -60,6 +60,7 @@ from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \ GeneralizedLinearRegression from pyspark.ml.tuning import * +from pyspark.ml.fpm import FPGrowth, FPGrowthModel from pyspark.ml.wrapper import JavaParams, JavaWrapper from pyspark.ml.common import _java2py, _py2java from pyspark.serializers import PickleSerializer @@ -1243,6 +1244,45 @@ def test_tweedie_distribution(self): self.assertTrue(np.isclose(model2.intercept, 0.6667, atol=1E-4)) +class FPGrowthTests(SparkSessionTestCase): + def setUp(self): + self.shuffle_partitions = self.spark.conf.get("spark.sql.shuffle.partitions") + self.spark.conf.set("spark.sql.shuffle.partitions", "1") + self.data = self.spark.createDataFrame( + [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )], + ["features"]) + + def test_association_rules(self): + fp = FPGrowth() + fpm = fp.fit(self.data) + + expected_association_rules = self.spark.createDataFrame( + [([3], [1], 1.0), ([2], [1], 1.0)], + ["antecedent", "consequent", "confidence"] + ) + actual_association_rules = fpm.associationRules + + self.assertEqual(actual_association_rules.subtract(expected_association_rules).count(), 0) + self.assertEqual(expected_association_rules.subtract(actual_association_rules).count(), 0) + + def test_freq_itemsets(self): + fp = FPGrowth() + fpm = fp.fit(self.data) + + expected_freq_itemsets = self.spark.createDataFrame( + [([1], 4), ([2], 3), ([2, 1], 3), ([3], 2), ([3, 1], 2)], + ["items", "freq"] + ) + actual_freq_itemsets = fpm.freqItemsets + + self.assertEqual(actual_freq_itemsets.subtract(expected_freq_itemsets).count(), 0) + self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0) + + def tearDown(self): + self.spark.conf.set("spark.sql.shuffle.partitions", self.shuffle_partitions) + del self.data + + class ALSTest(SparkSessionTestCase): def test_storage_levels(self): From 28d072bca6bc60d15062138156ba030bfa347b22 Mon Sep 17 00:00:00 2001 From: zero323 Date: Fri, 10 Mar 2017 23:57:56 +0100 Subject: [PATCH 02/22] Add docstring for FPGrowth and FPGrowthModel --- python/pyspark/ml/fpm.py | 75 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index f22f26b42bdd..7447163e7c38 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -24,28 +24,87 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): - """ + """Model fitted by FPGrowth. + .. versionadded:: 2.2.0 """ @property @since("2.2.0") def freqItemsets(self): - """ + """DataFrame with two columns: + * `items` - Itemset of the same type as the input column. + * `freq` - Frequency of the itemset (`LongType`). """ return self._call_java("freqItemsets") @property @since("2.2.0") def associationRules(self): - """ - """ + """Data with three columns: + * `antecedent` - Array of the same type as the input column. + * `consequent` - Single element array of the same type as the input column. + * `confidence` - Confidence for the rule (`DoubleType`).""" return self._call_java("associationRules") class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): - """ - A parallel FP-growth algorithm to mine frequent itemsets. + """A parallel FP-growth algorithm to mine frequent itemsets + + * Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_ + * Han et al., Mining frequent patterns without candidate generation [HAN2000]_ + + .. [LI2008] http://dx.doi.org/10.1145/1454008.1454027 + .. [HAN2000] http://dx.doi.org/10.1145/335191.335372 + + .. note:: Internally `transform` `collects` and `broadcasts` association rules. + + >>> from pyspark.sql.functions import split + >>> data = (spark.read + ... .text("data/mllib/sample_fpgrowth.txt") + ... .select(split("value", "\s+").alias("features"))) + >>> data.show(truncate=False) + +------------------------+ + |features | + +------------------------+ + |[r, z, h, k, p] | + |[z, y, x, w, v, u, t, s]| + |[s, x, o, n, r] | + |[x, z, y, m, t, s, q, e]| + |[z] | + |[x, z, y, r, q, t, p] | + +------------------------+ + >>> fp = FPGrowth(minSupport=0.2, minConfidence=0.7) + >>> fpm = fp.fit(data) + >>> fpm.freqItemsets.show(5) + +---------+----+ + | items|freq| + +---------+----+ + | [s]| 3| + | [s, x]| 3| + |[s, x, z]| 2| + | [s, z]| 2| + | [r]| 3| + +---------+----+ + only showing top 5 rows + >>> fpm.associationRules.show(5) + +----------+----------+----------+ + |antecedent|consequent|confidence| + +----------+----------+----------+ + | [t, s]| [y]| 1.0| + | [t, s]| [x]| 1.0| + | [t, s]| [z]| 1.0| + | [p]| [r]| 1.0| + | [p]| [z]| 1.0| + +----------+----------+----------+ + only showing top 5 rows + >>> new_data = spark.createDataFrame([(["t", "s"], )], ["features"]) + >>> fpm.transform(new_data).show(1, False) + +--------+---------------------+ + |features|prediction | + +--------+---------------------+ + |[t, s] |[y, x, z, x, y, x, z]| + +--------+---------------------+ .. versionadded:: 2.2.0 """ @@ -53,6 +112,8 @@ class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, def __init__(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", predictionCol="prediction", numPartitions=None): """ + __init__(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", \ + predictionCol="prediction", numPartitions=None) """ super(FPGrowth, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid) @@ -66,6 +127,8 @@ def __init__(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", def setParams(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", predictionCol="prediction", numPartitions=None): """ + setParams(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", \ + predictionCol="prediction", numPartitions=None) """ kwargs = self._input_kwargs return self._set(**kwargs) From a1920ab90a7cf4dd857085b74ffd17c62f23753a Mon Sep 17 00:00:00 2001 From: zero323 Date: Tue, 14 Mar 2017 14:57:40 +0100 Subject: [PATCH 03/22] Correct transform doctest to reflect SPARK-19940 --- python/pyspark/ml/fpm.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 7447163e7c38..f69dcda66e60 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -99,12 +99,8 @@ class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, +----------+----------+----------+ only showing top 5 rows >>> new_data = spark.createDataFrame([(["t", "s"], )], ["features"]) - >>> fpm.transform(new_data).show(1, False) - +--------+---------------------+ - |features|prediction | - +--------+---------------------+ - |[t, s] |[y, x, z, x, y, x, z]| - +--------+---------------------+ + >>> sorted(fpm.transform(new_data).first().prediction) + ['x', 'y', 'z'] .. versionadded:: 2.2.0 """ From d73f2545528de8f9cf00664ddb161d16ca85891a Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 16 Mar 2017 01:15:28 +0100 Subject: [PATCH 04/22] Move fpm specific Params from param.shared to fpm --- python/pyspark/ml/fpm.py | 53 +++++++++++++++++++++++++++++++ python/pyspark/ml/param/shared.py | 52 ------------------------------ 2 files changed, 53 insertions(+), 52 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index f69dcda66e60..7eb7093573e7 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -23,6 +23,33 @@ __all__ = ["FPGrowth", "FPGrowthModel"] +class HasSupport(Params): + """ + Mixin for param support: [0.0, 1.0]. + """ + + minSupport = Param( + Params._dummy(), + "minSupport", + "Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears more " + "than (minSupport * size-of-the-dataset) times will be output", + typeConverter=TypeConverters.toFloat) + + def setMinSupport(self, value): + """ + Sets the value of :py:attr:`minSupport`. + """ + if not 0 <= value <= 1: + ValueError("Support must be in range [0, 1]") + return self._set(minSupport=value) + + def getMinSupport(self): + """ + Gets the value of minSupport or its default value. + """ + return self.getOrDefault(self.minSupport) + + class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): """Model fitted by FPGrowth. @@ -47,6 +74,32 @@ def associationRules(self): return self._call_java("associationRules") +class HasConfidence(Params): + """ + Mixin for param confidence: [0.0, 1.0]. + """ + + minConfidence = Param( + Params._dummy(), + "minConfidence", + "Minimal confidence for generating Association Rule. [0.0, 1.0]", + typeConverter=TypeConverters.toFloat) + + def setMinConfidence(self, value): + """ + Sets the value of :py:attr:`minConfidence`. + """ + if not 0 <= value <= 1: + ValueError("Confidence must be in range [0, 1]") + return self._set(minConfidence=value) + + def getMinConfidence(self): + """ + Gets the value of minConfidence or its default value. + """ + return self.getOrDefault(self.minConfidence) + + class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): """A parallel FP-growth algorithm to mine frequent itemsets diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 0a938caa383d..163a0e2b3a96 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -696,55 +696,3 @@ def getCacheNodeIds(self): """ return self.getOrDefault(self.cacheNodeIds) - -class HasSupport(Params): - """ - Mixin for param support: [0.0, 1.0]. - """ - - minSupport = Param( - Params._dummy(), - "minSupport", - "Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears more " - "than (minSupport * size-of-the-dataset) times will be output", - typeConverter=TypeConverters.toFloat) - - def setMinSupport(self, value): - """ - Sets the value of :py:attr:`minSupport`. - """ - if not 0 <= value <= 1: - ValueError("Support must be in range [0, 1]") - return self._set(minSupport=value) - - def getMinSupport(self): - """ - Gets the value of minSupport or its default value. - """ - return self.getOrDefault(self.minSupport) - - -class HasConfidence(Params): - """ - Mixin for param confidence: [0.0, 1.0]. - """ - - minConfidence = Param( - Params._dummy(), - "minConfidence", - "Minimal confidence for generating Association Rule. [0.0, 1.0]", - typeConverter=TypeConverters.toFloat) - - def setMinConfidence(self, value): - """ - Sets the value of :py:attr:`minConfidence`. - """ - if not 0 <= value <= 1: - ValueError("Confidence must be in range [0, 1]") - return self._set(minConfidence=value) - - def getMinConfidence(self): - """ - Gets the value of minConfidence or its default value. - """ - return self.getOrDefault(self.minConfidence) \ No newline at end of file From 52604971e939a3367a3dcf70057b49fbaebf3e16 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 16 Mar 2017 22:13:50 +0100 Subject: [PATCH 05/22] Rename featuresCol to itemsCol (SPARK-19899) --- python/pyspark/ml/fpm.py | 90 ++++++++++++++++++++++++-------------- python/pyspark/ml/tests.py | 2 +- 2 files changed, 58 insertions(+), 34 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 7eb7093573e7..a7ed8c968d32 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -50,30 +50,6 @@ def getMinSupport(self): return self.getOrDefault(self.minSupport) -class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): - """Model fitted by FPGrowth. - - .. versionadded:: 2.2.0 - """ - @property - @since("2.2.0") - def freqItemsets(self): - """DataFrame with two columns: - * `items` - Itemset of the same type as the input column. - * `freq` - Frequency of the itemset (`LongType`). - """ - return self._call_java("freqItemsets") - - @property - @since("2.2.0") - def associationRules(self): - """Data with three columns: - * `antecedent` - Array of the same type as the input column. - * `consequent` - Single element array of the same type as the input column. - * `confidence` - Confidence for the rule (`DoubleType`).""" - return self._call_java("associationRules") - - class HasConfidence(Params): """ Mixin for param confidence: [0.0, 1.0]. @@ -100,7 +76,55 @@ def getMinConfidence(self): return self.getOrDefault(self.minConfidence) -class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, +class HasItemsCol(Params): + """ + Mixin for param itemsCol: items column name. + """ + + itemsCol = Param(Params._dummy(), "itemsCol", "items column name.", typeConverter=TypeConverters.toString) + + def __init__(self): + super(HasItemsCol, self).__init__() + self._setDefault(itemsCol='items') + + def setItemsCol(self, value): + """ + Sets the value of :py:attr:`itemsCol`. + """ + return self._set(itemsCol=value) + + def getItemsCol(self): + """ + Gets the value of itemsCol or its default value. + """ + return self.getOrDefault(self.itemsCol) + + +class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): + """Model fitted by FPGrowth. + + .. versionadded:: 2.2.0 + """ + @property + @since("2.2.0") + def freqItemsets(self): + """DataFrame with two columns: + * `items` - Itemset of the same type as the input column. + * `freq` - Frequency of the itemset (`LongType`). + """ + return self._call_java("freqItemsets") + + @property + @since("2.2.0") + def associationRules(self): + """Data with three columns: + * `antecedent` - Array of the same type as the input column. + * `consequent` - Single element array of the same type as the input column. + * `confidence` - Confidence for the rule (`DoubleType`).""" + return self._call_java("associationRules") + + +class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): """A parallel FP-growth algorithm to mine frequent itemsets @@ -115,10 +139,10 @@ class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, >>> from pyspark.sql.functions import split >>> data = (spark.read ... .text("data/mllib/sample_fpgrowth.txt") - ... .select(split("value", "\s+").alias("features"))) + ... .select(split("value", "\s+").alias("items"))) >>> data.show(truncate=False) +------------------------+ - |features | + |items | +------------------------+ |[r, z, h, k, p] | |[z, y, x, w, v, u, t, s]| @@ -151,32 +175,32 @@ class FPGrowth(JavaEstimator, HasFeaturesCol, HasPredictionCol, | [p]| [z]| 1.0| +----------+----------+----------+ only showing top 5 rows - >>> new_data = spark.createDataFrame([(["t", "s"], )], ["features"]) + >>> new_data = spark.createDataFrame([(["t", "s"], )], ["items"]) >>> sorted(fpm.transform(new_data).first().prediction) ['x', 'y', 'z'] .. versionadded:: 2.2.0 """ @keyword_only - def __init__(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", + def __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", predictionCol="prediction", numPartitions=None): """ - __init__(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", \ + __init__(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \ predictionCol="prediction", numPartitions=None) """ super(FPGrowth, self).__init__() self._java_obj = self._new_java_obj("org.apache.spark.ml.fpm.FPGrowth", self.uid) self._setDefault(minSupport=0.3, minConfidence=0.8, - featuresCol="features", predictionCol="prediction") + itemsCol="items", predictionCol="prediction") kwargs = self._input_kwargs self.setParams(**kwargs) @keyword_only @since("2.2.0") - def setParams(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", + def setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", predictionCol="prediction", numPartitions=None): """ - setParams(self, minSupport=0.3, minConfidence=0.8, featuresCol="features", \ + setParams(self, minSupport=0.3, minConfidence=0.8, itemsCol="items", \ predictionCol="prediction", numPartitions=None) """ kwargs = self._input_kwargs diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index cb5beebc942e..a24373632bff 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1250,7 +1250,7 @@ def setUp(self): self.spark.conf.set("spark.sql.shuffle.partitions", "1") self.data = self.spark.createDataFrame( [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )], - ["features"]) + ["items"]) def test_association_rules(self): fp = FPGrowth() From feb6af9c4b2734d29f0f29370792d0d2efbc56ef Mon Sep 17 00:00:00 2001 From: zero323 Date: Mon, 20 Mar 2017 19:52:57 +0100 Subject: [PATCH 06/22] Fix style --- python/pyspark/ml/fpm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index a7ed8c968d32..d5eb0515b97e 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -81,7 +81,8 @@ class HasItemsCol(Params): Mixin for param itemsCol: items column name. """ - itemsCol = Param(Params._dummy(), "itemsCol", "items column name.", typeConverter=TypeConverters.toString) + itemsCol = Param(Params._dummy(), "itemsCol", + "items column name.", typeConverter=TypeConverters.toString) def __init__(self): super(HasItemsCol, self).__init__() From 159f2ad15c59150923507cfde19103a70ed8d0c5 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 21:19:35 +0100 Subject: [PATCH 07/22] Add missing raise --- python/pyspark/ml/fpm.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index d5eb0515b97e..8367b5730ce6 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -39,8 +39,8 @@ def setMinSupport(self, value): """ Sets the value of :py:attr:`minSupport`. """ - if not 0 <= value <= 1: - ValueError("Support must be in range [0, 1]") + if not (0 <= value <= 1): + raise ValueError("Support must be in range [0, 1]") return self._set(minSupport=value) def getMinSupport(self): @@ -65,8 +65,8 @@ def setMinConfidence(self, value): """ Sets the value of :py:attr:`minConfidence`. """ - if not 0 <= value <= 1: - ValueError("Confidence must be in range [0, 1]") + if not (0 <= value <= 1): + raise ValueError("Confidence must be in range [0, 1]") return self._set(minConfidence=value) def getMinConfidence(self): From 154b5bac6f061ebfa925ac9b39e932a64fa6ab01 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 21:21:33 +0100 Subject: [PATCH 08/22] Note that confidence is not used for fitting --- python/pyspark/ml/fpm.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 8367b5730ce6..f6e93dbe82bb 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -58,7 +58,8 @@ class HasConfidence(Params): minConfidence = Param( Params._dummy(), "minConfidence", - "Minimal confidence for generating Association Rule. [0.0, 1.0]", + """"Minimal confidence for generating Association Rule. [0.0, 1.0] + Note that minConfidence has no effect during fitting.""", typeConverter=TypeConverters.toFloat) def setMinConfidence(self, value): From df6777fb467499086f1566d1bad0f7594d01e40b Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 21:26:25 +0100 Subject: [PATCH 09/22] Remove dot from itemscol description --- python/pyspark/ml/fpm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index f6e93dbe82bb..11c752d25be2 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -83,7 +83,7 @@ class HasItemsCol(Params): """ itemsCol = Param(Params._dummy(), "itemsCol", - "items column name.", typeConverter=TypeConverters.toString) + "items column name", typeConverter=TypeConverters.toString) def __init__(self): super(HasItemsCol, self).__init__() From 90918c5992a21ebd29503c7477311b647f491937 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 21:26:56 +0100 Subject: [PATCH 10/22] Remove __init__ from HasItemsCol --- python/pyspark/ml/fpm.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 11c752d25be2..76cb070ebd89 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -85,10 +85,6 @@ class HasItemsCol(Params): itemsCol = Param(Params._dummy(), "itemsCol", "items column name", typeConverter=TypeConverters.toString) - def __init__(self): - super(HasItemsCol, self).__init__() - self._setDefault(itemsCol='items') - def setItemsCol(self, value): """ Sets the value of :py:attr:`itemsCol`. From 43c9dccc7b8ef16cb1b5ef12f6656f18375e1381 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 21:30:02 +0100 Subject: [PATCH 11/22] Add experimental notes --- python/pyspark/ml/fpm.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 76cb070ebd89..0f7caa939426 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -101,6 +101,8 @@ def getItemsCol(self): class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): """Model fitted by FPGrowth. + .. note:: Experimental + .. versionadded:: 2.2.0 """ @property @@ -177,6 +179,8 @@ class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, >>> sorted(fpm.transform(new_data).first().prediction) ['x', 'y', 'z'] + .. note:: Experimental + .. versionadded:: 2.2.0 """ @keyword_only From aa45479fa4cbad8bf1bd2ac2cabe083777a44c0b Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 21:35:32 +0100 Subject: [PATCH 12/22] Place docstring quotes in separate lines --- python/pyspark/ml/fpm.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 0f7caa939426..9d82dd7327a1 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -108,7 +108,8 @@ class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): @property @since("2.2.0") def freqItemsets(self): - """DataFrame with two columns: + """ + DataFrame with two columns: * `items` - Itemset of the same type as the input column. * `freq` - Frequency of the itemset (`LongType`). """ @@ -117,10 +118,12 @@ def freqItemsets(self): @property @since("2.2.0") def associationRules(self): - """Data with three columns: + """ + Data with three columns: * `antecedent` - Array of the same type as the input column. * `consequent` - Single element array of the same type as the input column. - * `confidence` - Confidence for the rule (`DoubleType`).""" + * `confidence` - Confidence for the rule (`DoubleType`). + """ return self._call_java("associationRules") From d4ae39ae992836451df7e5f8cebbd2c7a0229cc0 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 22:03:53 +0100 Subject: [PATCH 13/22] Add since import --- python/pyspark/ml/fpm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 9d82dd7327a1..e21a27ed22a9 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -15,7 +15,7 @@ # limitations under the License. # -from pyspark import keyword_only +from pyspark import keyword_only, since from pyspark.ml.util import * from pyspark.ml.wrapper import JavaEstimator, JavaModel from pyspark.ml.param.shared import * From 6740581588046e6716ab1ecf303885e4ea4e12d5 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 22:13:43 +0100 Subject: [PATCH 14/22] Remove set spark.sql.shuffle.partition from FPGrowthTests --- python/pyspark/ml/tests.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index a24373632bff..5a1b23413332 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1246,8 +1246,6 @@ def test_tweedie_distribution(self): class FPGrowthTests(SparkSessionTestCase): def setUp(self): - self.shuffle_partitions = self.spark.conf.get("spark.sql.shuffle.partitions") - self.spark.conf.set("spark.sql.shuffle.partitions", "1") self.data = self.spark.createDataFrame( [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )], ["items"]) @@ -1279,7 +1277,6 @@ def test_freq_itemsets(self): self.assertEqual(expected_freq_itemsets.subtract(actual_freq_itemsets).count(), 0) def tearDown(self): - self.spark.conf.set("spark.sql.shuffle.partitions", self.shuffle_partitions) del self.data From 33c8971780d3bca2a429b52488e4e8db13f11283 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 22:14:27 +0100 Subject: [PATCH 15/22] Add super().setUp() in FPGrowthTests --- python/pyspark/ml/tests.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index 5a1b23413332..e0a94ca30b95 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -1246,6 +1246,7 @@ def test_tweedie_distribution(self): class FPGrowthTests(SparkSessionTestCase): def setUp(self): + super(FPGrowthTests, self).setUp() self.data = self.spark.createDataFrame( [([1, 2], ), ([1, 2], ), ([1, 2, 3], ), ([1, 3], )], ["items"]) From eb4ec2613cc3c6367a1a7b632c584cf4d8ad9711 Mon Sep 17 00:00:00 2001 From: zero323 Date: Thu, 23 Mar 2017 22:32:26 +0100 Subject: [PATCH 16/22] Sort imports in ml.tests --- python/pyspark/ml/tests.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/python/pyspark/ml/tests.py b/python/pyspark/ml/tests.py index e0a94ca30b95..527db9b66793 100755 --- a/python/pyspark/ml/tests.py +++ b/python/pyspark/ml/tests.py @@ -42,7 +42,7 @@ import array as pyarray import numpy as np from numpy import ( - array, array_equal, zeros, inf, random, exp, dot, all, mean, abs, arange, tile, ones) + abs, all, arange, array, array_equal, dot, exp, inf, mean, ones, random, tile, zeros) from numpy import sum as array_sum import inspect @@ -50,19 +50,20 @@ from pyspark.ml import Estimator, Model, Pipeline, PipelineModel, Transformer from pyspark.ml.classification import * from pyspark.ml.clustering import * +from pyspark.ml.common import _java2py, _py2java from pyspark.ml.evaluation import BinaryClassificationEvaluator, RegressionEvaluator from pyspark.ml.feature import * -from pyspark.ml.linalg import Vector, SparseVector, DenseVector, VectorUDT,\ - DenseMatrix, SparseMatrix, Vectors, Matrices, MatrixUDT, _convert_to_vector +from pyspark.ml.fpm import FPGrowth, FPGrowthModel +from pyspark.ml.linalg import ( + DenseMatrix, DenseMatrix, DenseVector, Matrices, MatrixUDT, + SparseMatrix, SparseVector, Vector, VectorUDT, Vectors, _convert_to_vector) from pyspark.ml.param import Param, Params, TypeConverters -from pyspark.ml.param.shared import HasMaxIter, HasInputCol, HasSeed +from pyspark.ml.param.shared import HasInputCol, HasMaxIter, HasSeed from pyspark.ml.recommendation import ALS -from pyspark.ml.regression import LinearRegression, DecisionTreeRegressor, \ - GeneralizedLinearRegression +from pyspark.ml.regression import ( + DecisionTreeRegressor, GeneralizedLinearRegression, LinearRegression) from pyspark.ml.tuning import * -from pyspark.ml.fpm import FPGrowth, FPGrowthModel from pyspark.ml.wrapper import JavaParams, JavaWrapper -from pyspark.ml.common import _java2py, _py2java from pyspark.serializers import PickleSerializer from pyspark.sql import DataFrame, Row, SparkSession from pyspark.sql.functions import rand From 3c7f4f754cfba5a9a93ec01d39ad75b4f873a816 Mon Sep 17 00:00:00 2001 From: zero323 Date: Fri, 24 Mar 2017 00:57:41 +0100 Subject: [PATCH 17/22] Copy Scala docs and add doc entry --- python/docs/pyspark.ml.rst | 8 ++++++++ python/pyspark/ml/fpm.py | 13 ++++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/python/docs/pyspark.ml.rst b/python/docs/pyspark.ml.rst index 26f7415e1a42..a68183445d78 100644 --- a/python/docs/pyspark.ml.rst +++ b/python/docs/pyspark.ml.rst @@ -80,3 +80,11 @@ pyspark.ml.evaluation module :members: :undoc-members: :inherited-members: + +pyspark.ml.fpm module +---------------------------- + +.. automodule:: pyspark.ml.fpm + :members: + :undoc-members: + :inherited-members: diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index e21a27ed22a9..3207660f86d1 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -129,15 +129,18 @@ def associationRules(self): class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): - """A parallel FP-growth algorithm to mine frequent itemsets - - * Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_ - * Han et al., Mining frequent patterns without candidate generation [HAN2000]_ + """A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in + Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_. + PFP distributes computation in such a way that each worker executes an + independent group of mining tasks. The FP-Growth algorithm is described in + Han et al., Mining frequent patterns without candidate generation [HAN2000]_ .. [LI2008] http://dx.doi.org/10.1145/1454008.1454027 .. [HAN2000] http://dx.doi.org/10.1145/335191.335372 - .. note:: Internally `transform` `collects` and `broadcasts` association rules. + .. note:: Experimental + .. note:: null values in the feature column are ignored during fit(). + .. note:: Internally `transform` `collects` and `broadcasts` association rules. >>> from pyspark.sql.functions import split >>> data = (spark.read From 3521d405a251961b39b599038d26e8250543c528 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sat, 25 Mar 2017 14:12:00 +0100 Subject: [PATCH 18/22] Add ml.fpm to sparktestsupport/modules --- dev/sparktestsupport/modules.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 10ad1fe3aa2c..eaf1f3a1db2f 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -423,15 +423,16 @@ def __hash__(self): "python/pyspark/ml/" ], python_test_goals=[ - "pyspark.ml.feature", "pyspark.ml.classification", "pyspark.ml.clustering", + "pyspark.ml.evaluation", + "pyspark.ml.feature", + "pyspark.ml.fpm", "pyspark.ml.linalg.__init__", "pyspark.ml.recommendation", "pyspark.ml.regression", "pyspark.ml.tuning", "pyspark.ml.tests", - "pyspark.ml.evaluation", ], blacklisted_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy and it isn't available there From bdea0ff5e126c34264e6a417cf4e7b3b0e470d10 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 26 Mar 2017 05:10:03 +0200 Subject: [PATCH 19/22] Drop range tests and clean docstrings --- python/pyspark/ml/fpm.py | 16 ++++++---------- 1 file changed, 6 insertions(+), 10 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 3207660f86d1..46febd744d8f 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -25,23 +25,21 @@ class HasSupport(Params): """ - Mixin for param support: [0.0, 1.0]. + Mixin for param support. """ minSupport = Param( Params._dummy(), "minSupport", - "Minimal support level of the frequent pattern. [0.0, 1.0]. Any pattern that appears more " - "than (minSupport * size-of-the-dataset) times will be output", + """Minimal support level of the frequent pattern. [0.0, 1.0]. + Any pattern that appears more than (minSupport * size-of-the-dataset) + times will be output""", typeConverter=TypeConverters.toFloat) def setMinSupport(self, value): """ Sets the value of :py:attr:`minSupport`. """ - if not (0 <= value <= 1): - raise ValueError("Support must be in range [0, 1]") - return self._set(minSupport=value) def getMinSupport(self): """ @@ -52,13 +50,13 @@ def getMinSupport(self): class HasConfidence(Params): """ - Mixin for param confidence: [0.0, 1.0]. + Mixin for param confidence. """ minConfidence = Param( Params._dummy(), "minConfidence", - """"Minimal confidence for generating Association Rule. [0.0, 1.0] + """Minimal confidence for generating Association Rule. [0.0, 1.0] Note that minConfidence has no effect during fitting.""", typeConverter=TypeConverters.toFloat) @@ -66,8 +64,6 @@ def setMinConfidence(self, value): """ Sets the value of :py:attr:`minConfidence`. """ - if not (0 <= value <= 1): - raise ValueError("Confidence must be in range [0, 1]") return self._set(minConfidence=value) def getMinConfidence(self): From deb2ce7f8586ce27aeefefa8b689e558d20f07de Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 26 Mar 2017 05:18:05 +0200 Subject: [PATCH 20/22] Move experimental annotation to the top of the docstrings --- python/pyspark/ml/fpm.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 46febd744d8f..4c2a456290ec 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -95,10 +95,11 @@ def getItemsCol(self): class FPGrowthModel(JavaModel, JavaMLWritable, JavaMLReadable): - """Model fitted by FPGrowth. - + """ .. note:: Experimental + Model fitted by FPGrowth. + .. versionadded:: 2.2.0 """ @property @@ -125,7 +126,10 @@ def associationRules(self): class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, HasSupport, HasConfidence, JavaMLWritable, JavaMLReadable): - """A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in + """ + .. note:: Experimental + + A parallel FP-growth algorithm to mine frequent itemsets. The algorithm is described in Li et al., PFP: Parallel FP-Growth for Query Recommendation [LI2008]_. PFP distributes computation in such a way that each worker executes an independent group of mining tasks. The FP-Growth algorithm is described in @@ -134,7 +138,6 @@ class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, .. [LI2008] http://dx.doi.org/10.1145/1454008.1454027 .. [HAN2000] http://dx.doi.org/10.1145/335191.335372 - .. note:: Experimental .. note:: null values in the feature column are ignored during fit(). .. note:: Internally `transform` `collects` and `broadcasts` association rules. @@ -181,8 +184,6 @@ class FPGrowth(JavaEstimator, HasItemsCol, HasPredictionCol, >>> sorted(fpm.transform(new_data).first().prediction) ['x', 'y', 'z'] - .. note:: Experimental - .. versionadded:: 2.2.0 """ @keyword_only From bf0a2853939c391fb6c3f38661f3d7c8b43cb0d5 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 26 Mar 2017 14:01:59 +0200 Subject: [PATCH 21/22] Fix getMinSupport --- python/pyspark/ml/fpm.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 4c2a456290ec..707d25685eae 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -40,6 +40,7 @@ def setMinSupport(self, value): """ Sets the value of :py:attr:`minSupport`. """ + return self._set(minSupport=value) def getMinSupport(self): """ From 66b85e5fc9c6a57978df0494c4a7174070534636 Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 26 Mar 2017 14:02:48 +0200 Subject: [PATCH 22/22] Remove 'single item' note from consequent description --- python/pyspark/ml/fpm.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/ml/fpm.py b/python/pyspark/ml/fpm.py index 707d25685eae..b30d4edb1990 100644 --- a/python/pyspark/ml/fpm.py +++ b/python/pyspark/ml/fpm.py @@ -119,7 +119,7 @@ def associationRules(self): """ Data with three columns: * `antecedent` - Array of the same type as the input column. - * `consequent` - Single element array of the same type as the input column. + * `consequent` - Array of the same type as the input column. * `confidence` - Confidence for the rule (`DoubleType`). """ return self._call_java("associationRules")