From 94c44ba368acb3c7fa648ad66cfd3cac352af911 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Mon, 16 Jan 2017 16:43:34 +0800 Subject: [PATCH 1/5] [SPARK-19239][PySparK] Check the lowerBound and upperBound whether equal None in jdbc API The ``jdbc`` API do not check the lowerBound and upperBound when we specified the ``column``, and just throw the following exception: ```int() argument must be a string or a number, not 'NoneType'``` If we check the parameter, we can give a more friendly suggestion. --- python/pyspark/sql/readwriter.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index b0c51b1e9992..4a2f5285838a 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -399,7 +399,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar accessible via JDBC URL ``url`` and connection ``properties``. Partitions of the table will be retrieved in parallel if either ``column`` or - ``predicates`` is specified. + ``predicates`` is specified. ``lowerBound` and ``upperBound`` is needed when ``column`` + is specified. If both ``column`` and ``predicates`` are specified, ``column`` will be used. @@ -431,6 +432,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar if column is not None: if numPartitions is None: numPartitions = self._spark._sc.defaultParallelism + assert lowerBound != None, "lowerBound can not be None when ``column`` is specified" + assert upperBound != None, "upperBound can not be None when ``column`` is specified" return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound), int(numPartitions), jprop)) if predicates is not None: From 43602b56d6099213a103a0c0389ac37ebb2c326b Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Tue, 17 Jan 2017 13:42:21 +0800 Subject: [PATCH 2/5] update the None equal comparison --- python/pyspark/sql/readwriter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 4a2f5285838a..20fcf5ef8ef2 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -432,8 +432,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar if column is not None: if numPartitions is None: numPartitions = self._spark._sc.defaultParallelism - assert lowerBound != None, "lowerBound can not be None when ``column`` is specified" - assert upperBound != None, "upperBound can not be None when ``column`` is specified" + assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified" + assert upperBound is not None, "upperBound can not be None when ``column`` is specified" return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound), int(numPartitions), jprop)) if predicates is not None: From f2dad9cdf77cab31152aab0f16df86cf004e41fe Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Tue, 17 Jan 2017 16:14:11 +0800 Subject: [PATCH 3/5] Update numPartitions check to keep API consistent In the Scala API, the `numPartitions` is needed when we specify the `column`, we remove the default parallelism to keep consistent --- python/pyspark/sql/readwriter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 20fcf5ef8ef2..66c2248cc0be 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -430,10 +430,10 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar for k in properties: jprop.setProperty(k, properties[k]) if column is not None: - if numPartitions is None: - numPartitions = self._spark._sc.defaultParallelism assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified" assert upperBound is not None, "upperBound can not be None when ``column`` is specified" + assert numPartitions is not None, "numPartitions can not be None " \ + "when ``column`` is specified" return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound), int(numPartitions), jprop)) if predicates is not None: From e9a1bcad452dc231d185d1f4150f2a7a907f7ab7 Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Tue, 17 Jan 2017 16:19:11 +0800 Subject: [PATCH 4/5] Update the document for numPartition --- python/pyspark/sql/readwriter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 66c2248cc0be..f362a65fd936 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -399,8 +399,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar accessible via JDBC URL ``url`` and connection ``properties``. Partitions of the table will be retrieved in parallel if either ``column`` or - ``predicates`` is specified. ``lowerBound` and ``upperBound`` is needed when ``column`` - is specified. + ``predicates`` is specified. ``lowerBound`, ``upperBound`` and ``numPartitions`` + is needed when ``column`` is specified. If both ``column`` and ``predicates`` are specified, ``column`` will be used. From d8513820d990d4235069a88249056b76bd35f60b Mon Sep 17 00:00:00 2001 From: DjvuLee Date: Tue, 17 Jan 2017 16:43:06 +0800 Subject: [PATCH 5/5] Fix the python style --- python/pyspark/sql/readwriter.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f362a65fd936..d31f3fb8f604 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -432,8 +432,8 @@ def jdbc(self, url, table, column=None, lowerBound=None, upperBound=None, numPar if column is not None: assert lowerBound is not None, "lowerBound can not be None when ``column`` is specified" assert upperBound is not None, "upperBound can not be None when ``column`` is specified" - assert numPartitions is not None, "numPartitions can not be None " \ - "when ``column`` is specified" + assert numPartitions is not None, \ + "numPartitions can not be None when ``column`` is specified" return self._df(self._jreader.jdbc(url, table, column, int(lowerBound), int(upperBound), int(numPartitions), jprop)) if predicates is not None: