Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -2007,19 +2007,20 @@ def toPandas(self):
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"Attempts non-optimization as "
"Attempting non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to "
"true." % _exception_message(e))
warnings.warn(msg)
use_arrow = False
else:
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"For fallback to non-optimization automatically, please set true to "
"'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
raise RuntimeError(msg)
"'spark.sql.execution.arrow.enabled' is set to true, but has reached "
"the error below and will not continue because automatic fallback "
"with 'spark.sql.execution.arrow.fallback.enabled' has been set to "
"false.\n %s" % _exception_message(e))
warnings.warn(msg)
raise

# Try to use Arrow optimization when the schema is supported and the required version
# of PyArrow is found, if 'spark.sql.execution.arrow.enabled' is enabled.
Expand All @@ -2042,12 +2043,12 @@ def toPandas(self):
# be executed. So, simply fail in this case for now.
msg = (
"toPandas attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed unexpectedly:\n %s\n"
"Note that 'spark.sql.execution.arrow.fallback.enabled' does "
"not have an effect in such failure in the middle of "
"computation." % _exception_message(e))
raise RuntimeError(msg)
"'spark.sql.execution.arrow.enabled' is set to true, but has reached "
"the error below and can not continue. Note that "
"'spark.sql.execution.arrow.fallback.enabled' does not have an effect "
"on failures in the middle of computation.\n %s" % _exception_message(e))
warnings.warn(msg)
raise

# Below is toPandas without Arrow optimization.
pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
Expand Down
13 changes: 7 additions & 6 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,18 +674,19 @@ def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=Tr
"createDataFrame attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"Attempts non-optimization as "
"Attempting non-optimization as "
"'spark.sql.execution.arrow.fallback.enabled' is set to "
"true." % _exception_message(e))
warnings.warn(msg)
else:
msg = (
"createDataFrame attempted Arrow optimization because "
"'spark.sql.execution.arrow.enabled' is set to true; however, "
"failed by the reason below:\n %s\n"
"For fallback to non-optimization automatically, please set true to "
"'spark.sql.execution.arrow.fallback.enabled'." % _exception_message(e))
raise RuntimeError(msg)
"'spark.sql.execution.arrow.enabled' is set to true, but has reached "
"the error below and will not continue because automatic fallback "
"with 'spark.sql.execution.arrow.fallback.enabled' has been set to "
"false.\n %s" % _exception_message(e))
warnings.warn(msg)
raise
data = self._convert_from_pandas(data, schema, timezone)

if isinstance(schema, StructType):
Expand Down
10 changes: 5 additions & 5 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3533,7 +3533,7 @@ def test_toPandas_fallback_enabled(self):
warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Attempts non-optimization" in _exception_message(user_warns[-1]))
"Attempting non-optimization" in _exception_message(user_warns[-1]))
self.assertPandasEqual(pdf, pd.DataFrame({u'map': [{u'a': 1}]}))

def test_toPandas_fallback_disabled(self):
Expand Down Expand Up @@ -3661,7 +3661,7 @@ def test_createDataFrame_with_incorrect_schema(self):
pdf = self.create_pandas_data_frame()
wrong_schema = StructType(list(reversed(self.schema)))
with QuietTest(self.sc):
with self.assertRaisesRegexp(RuntimeError, ".*No cast.*string.*timestamp.*"):
with self.assertRaisesRegexp(Exception, ".*No cast.*string.*timestamp.*"):
self.spark.createDataFrame(pdf, schema=wrong_schema)

def test_createDataFrame_with_names(self):
Expand All @@ -3686,7 +3686,7 @@ def test_createDataFrame_column_name_encoding(self):
def test_createDataFrame_with_single_data_type(self):
import pandas as pd
with QuietTest(self.sc):
with self.assertRaisesRegexp(RuntimeError, ".*IntegerType.*not supported.*"):
with self.assertRaisesRegexp(ValueError, ".*IntegerType.*not supported.*"):
self.spark.createDataFrame(pd.DataFrame({"a": [1]}), schema="int")

def test_createDataFrame_does_not_modify_input(self):
Expand Down Expand Up @@ -3754,14 +3754,14 @@ def test_createDataFrame_fallback_enabled(self):
warn.message for warn in warns if isinstance(warn.message, UserWarning)]
self.assertTrue(len(user_warns) > 0)
self.assertTrue(
"Attempts non-optimization" in _exception_message(user_warns[-1]))
"Attempting non-optimization" in _exception_message(user_warns[-1]))
self.assertEqual(df.collect(), [Row(a={u'a': 1})])

def test_createDataFrame_fallback_disabled(self):
import pandas as pd

with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, 'Unsupported type'):
with self.assertRaisesRegexp(TypeError, 'Unsupported type'):
self.spark.createDataFrame(
pd.DataFrame([[{u'a': 1}]]), "a: map<string, int>")

Expand Down
6 changes: 6 additions & 0 deletions python/pyspark/sql/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,10 @@ def require_minimum_pandas_version():
from distutils.version import LooseVersion
try:
import pandas
have_pandas = True
except ImportError:
have_pandas = False
if not have_pandas:
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 22, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this is for making the error message clean when it's reraised in Python 3? I think it's fine to leave it as was. I believe the downside of it is to lose the information where exactly the error was thrown in Python 3.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think having the traceback to the raise ImportError below is all the information needed. If that happens, then the only possible cause is that the import failed from here. The problem with how it was before is that for Python 3, it will print out During handling of the above exception, another exception occurred: which makes it seem like it is not being handled correctly, since it's really just a failed import.

raise ImportError("Pandas >= %s must be installed; however, "
"it was not found." % minimum_pandas_version)
if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
Expand All @@ -138,7 +141,10 @@ def require_minimum_pyarrow_version():
from distutils.version import LooseVersion
try:
import pyarrow
have_arrow = True
except ImportError:
have_arrow = False
if not have_arrow:
raise ImportError("PyArrow >= %s must be installed; however, "
"it was not found." % minimum_pyarrow_version)
if LooseVersion(pyarrow.__version__) < LooseVersion(minimum_pyarrow_version):
Expand Down