Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2000,26 +2000,24 @@ def toPandas(self):
require_minimum_pyarrow_version()
to_arrow_schema(self.schema)
except Exception as e:
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n" % _exception_message(e))

if self.sql_ctx.getConf("spark.sql.execution.arrow.fallback.enabled", "true") \
.lower() == "true":
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"Attempts non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to "
"true." % _exception_message(e))
msg += (
"Attempting non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to true.")
warnings.warn(msg)
use_arrow = False
else:
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
msg += (
"For fallback to non-optimization automatically, please set true to "
"'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
raise RuntimeError(msg)
"'spark.sql.execution.arrow.fallback.enabled'.")
warnings.warn(msg)
raise

# Try to use Arrow optimization when the schema is supported and the required version
# of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled.
Expand Down Expand Up @@ -2047,7 +2045,8 @@ def toPandas(self):
"Note that 'spark.sql.execution.arrow.fallback.enabled' does "
"not have an effect in such failure in the middle of "
"computation." % _exception_message(e))
raise RuntimeError(msg)
warnings.warn(msg)
raise

# Below is toPandas without Arrow optimization.
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
Expand Down
27 changes: 13 additions & 14 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -668,24 +668,23 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
except Exception as e:
from pyspark.util import _exception_message

msg = (
"createDataFrame attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n" % _exception_message(e))

if self.conf.get("spark.sql.execution.arrow.fallback.enabled", "true") \
.lower() == "true":
msg = (
"createDataFrame attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"Attempts non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to "
"true." % _exception_message(e))
msg += (
"Attempting non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to true.")
warnings.warn(msg)
else:
msg = (
"createDataFrame attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"For fallback to non-optimization automatically, please set true to "
"'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
raise RuntimeError(msg)
msg += (
"For fallback to non-optimization automatically, please set "
"true to 'spark.sql.execution.arrow.fallback.enabled'.")
warnings.warn(msg)
raise
data = self._convert_from_pandas(data, schema, timezone)

if isinstance(schema, StructType):
Expand Down
10 changes: 5 additions & 5 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3533,7 +3533,7 @@ def test_toPandas_fallback_enabled(self):
warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Attempts non-optimization" in _exception_message(user_warns[-1]))
"Attempting non-optimization" in _exception_message(user_warns[-1]))
self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))

def test_toPandas_fallback_disabled(self):
Expand Down Expand Up @@ -3661,7 +3661,7 @@ def test_createDataFrame_with_incorrect_schema(self):
pdf = self.create_pandas_data_frame()
wrong_schema = StructType(list(reversed(self.schema)))
with QuietTest(self.sc):
with self.assertRaisesRegexp(RuntimeError, ".*No cast.*string.*timestamp.*"):
with self.assertRaisesRegexp(Exception, ".*No cast.*string.*timestamp.*"):
self.spark.createDataFrame(pdf, schema=wrong_schema)

def test_createDataFrame_with_names(self):
Expand All @@ -3686,7 +3686,7 @@ def test_createDataFrame_column_name_encoding(self):
def test_createDataFrame_with_single_data_type(self):
import pandas as pd
with QuietTest(self.sc):
with self.assertRaisesRegexp(RuntimeError, ".*IntegerType.*not supported.*"):
with self.assertRaisesRegexp(ValueError, ".*IntegerType.*not supported.*"):
self.spark.createDataFrame(pd.DataFrame({"a": [1]}), schema="int")

def test_createDataFrame_does_not_modify_input(self):
Expand Down Expand Up @@ -3754,14 +3754,14 @@ def test_createDataFrame_fallback_enabled(self):
warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Attempts non-optimization" in _exception_message(user_warns[-1]))
"Attempting non-optimization" in _exception_message(user_warns[-1]))
self.assertEqual(df.collect(), [Row(a={u'a': 1})])

def test_createDataFrame_fallback_disabled(self):
import pandas as pd

with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, 'Unsupported type'):
with self.assertRaisesRegexp(TypeError, 'Unsupported type'):
self.spark.createDataFrame(
pd.DataFrame([[{u'a': 1}]]), "a: map<string, int>")

Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ def require_minimum_pandas_version():
from distutils.version import LooseVersion
try:
import pandas
have_pandas = True
except ImportError:
have_pandas = False
if not have_pandas:
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is for making the error message clean when it's reraised in Python 3? I think it's fine to leave it as was. I believe the downside of it is to lose the information where exactly the error was thrown in Python 3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having the traceback to the raise ImportError below is all the information needed. If that happens, then the only possible cause is that the import failed from here. The problem with how it was before is that for Python 3, it will print out During handling of the above exception, another exception occurred: which makes it seem like it is not being handled correctly, since it's really just a failed import.

raise ImportError("Pandas >= %s must be installed; however, "
"it was not found." % minimum_pandas_version)
if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
Expand All @@ -138,7 +141,10 @@ def require_minimum_pyarrow_version():
from distutils.version import LooseVersion
try:
import pyarrow
have_arrow = True
except ImportError:
have_arrow = False
if not have_arrow:
raise ImportError("PyArrow >= %s must be installed; however, "
"it was not found." % minimum_pyarrow_version)
if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version):
Expand Down