-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-23380][PYTHON] Make toPandas fallback to non-Arrow optimization if possible #20567
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) | ||
|
|
||
| dtype = {} | ||
| pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actual diff here is just else:. It was removed and it fixes the indentation.
python/pyspark/sql/dataframe.py
Outdated
| timezone = None | ||
|
|
||
| if self.sql_ctx.getConf("spark.sql.execution.arrow.enabled", "false").lower() == "true": | ||
| should_fall_back = False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the main change.
|
Seems it happened to fix this case too: spark.conf.set("spark.sql.execution.arrow.enabled", "false")
df = spark.createDataFrame([[bytearray("a")]])
df.toPandas()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df.toPandas()Before After |
|
cc @ueshin, @BryanCutler and @icexelloss, could you take a look please when you have some time? |
|
Test build #87284 has finished for PR 20567 at commit
|
| # Check if its schema is convertible in Arrow format. | ||
| to_arrow_schema(self.schema) | ||
| except Exception as e: | ||
| # Fallback to convert to Pandas DataFrame without arrow if raise some exception |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this PR fall back to the original path if any exception occurs? E.g. ImportError happens while the current code throws an exception with the message?
Would it be good to note this change in the description, too?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup. It does fall back for unsupported schema, PyArrow version mismatch and PyAarrow missing. Will add a note in PR description.
|
Since this PR is not a bug fix, we will not merge it to 2.3. How about submitting another PR to throw a better error message in to-be-released 2.3? |
|
#20567 (comment) case is actually closer to a bug as both output from one without Arrow and with Arrow are different and inconsistent. The problem is, that we already allow inconsistent conversion in In addition, I believe it is good to match the behaviour between The change is kind of safe. Actual change is basically: from if # 'spark.sql.execution.arrow.enabled' true?
require_minimum_pyarrow_version()
# return the one with Arrow
else:
# return the one without Arrowto if # 'spark.sql.execution.arrow.enabled' true?
should_fall_back = False
try:
require_minimum_pyarrow_version()
to_arrow_schema(self.schema)
except Exception as e:
should_fall_back = True
if not should_fall_back:
# return the one with Arrow
# return the one without ArrowThe error message looks already okay for now. If you feel strongly about this, I am fine with going ahead with this only into master. |
ueshin
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering whether we can do return the one with Arrow in the try block? I mean:
if # 'spark.sql.execution.arrow.enabled' true?
try:
require_minimum_pyarrow_version()
# return the one with Arrow
except Exception as e:
# warn
# return the one without Arrow| else: | ||
| import unittest | ||
|
|
||
| from pyspark.util import _exception_message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add an empty line between this import and _pandas_requirement_message line.
|
@ueshin, yup, I initially thought so but realised that it might collect twice ( For |
|
Test build #87324 has finished for PR 20567 at commit
|
|
retest this please |
|
Test build #87327 has finished for PR 20567 at commit
|
|
LGTM. I'd leave it to @HyukjinKwon and @gatorsmile whether we should merge this into branch-2.3 or not. |
|
retest this please |
|
Test build #87332 has finished for PR 20567 at commit
|
|
Sorry I am late to the party. #20567 (comment) Does look like a bug to me. However, I am a bit concerned that such magic behavior would be not ideal to some users. At least from python users at Two Sigma, most of they would prefer a "fail fast" exception rather than fall back to non-Arrow path, because the non-Arrow path could often take a long time to complete, or worse, "fail slow". Implementing this behavior could be problematic for users that transfers non trivial amounts of data from Spark to Pandas. |
|
This is kind of like what we did for whole-stage codegen. We have a conf like |
|
Regarding the error message, this is a good example to show how to provide a user-friendly message. To the external end users, most of them do not care internal implementation. They might not be aware of Apache ARROW is being used. They might not even know what is Apache Arrow. The conf might be set by the system admin or others. Thus, this error message is confusing to them. Ideally, we could let users know how to bypass the issue. For example, let them disable the conf |
|
A quick bit: fallback is a single word. |
BryanCutler
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the behavior should match createDataFrame to also fallback, but a big +1 on adding a conf to allow disabling of fallback. I can see how some users might want this and it would make it easier on development too so that if something Arrow related is failing, it is not passing tests because of falling back.
| from pyspark.sql.types import _check_dataframe_convert_date, \ | ||
| _check_dataframe_localize_timestamps | ||
|
|
||
| tables = self._collectAsArrow() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be in the try block?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please see #20567 (comment). @ueshin raised a similar concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, we don't want to collect twice and you manually run a schema conversion to fallback in that case. I think there still might be some cases where the Arrow path could fail, like maybe if there were incompatible arrow versions (like using a possible future version of pyarrow with Java still at 0.8) but this should cover the most common cases, so seems fine to me.
|
Yup, I also agree with adding a configuration to control this. I will work on it for master only later. For #20567 (comment), yup. I agree with that but to do this, we should do something like: if # 'spark.sql.execution.arrow.enabled' true?
require_minimum_pyarrow_version()
try:
to_arrow_schema(self.schema)
# return the one with Arrow
except Exception as e:
raise Exception("'spark.sql.execution.arrow.enabled' blah blah ...")
else:
# return the one without Arrowthe diff and complexity is pretty similar with fallback one: if # 'spark.sql.execution.arrow.enabled' true?
should_fall_back = False
try:
require_minimum_pyarrow_version()
to_arrow_schema(self.schema)
except Exception as e:
should_fall_back = True
if not should_fall_back:
# return the one with Arrow
# return the one without ArrowNote that, in case of I have been thought this feature is in transition and am trying to fix and match the behaviour first before the release. |
|
I mean I got that a nicer error message is useful of course but wouldn't it be better to match the behaviour between |
|
My proposal is to merge the fix after the 2.3 release. We can still backport it to SPARK 2.3, but it will be available in SPARK 2.3.1. |
python/pyspark/sql/dataframe.py
Outdated
| to_arrow_schema(self.schema) | ||
| except Exception as e: | ||
| # Fallback to convert to Pandas DataFrame without arrow if raise some exception | ||
| should_fall_back = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should_fall_back -> should_fallback other places below too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup.
Mind if I ask to elaborate why? Want to know why this one should be specially excluded in 2.3.0 alone although it can be backported to branch-2.3. I thought it's good to add it into 2.3.0 because this this is kind of safe, fixes a actual bug and matches the behaviour with |
|
This issue does not cause the regression since |
It doesn't block the release but we can still backport it because it fixes an actual bug fix with a minimal change whether 2.3.0 is released or not.
I thought this is another step. We need to make them consistent first.
Is there any specific worry from this change, that might shake the 2.3.0 release speficially? In this way, we can't backport anything. I am surprised that this PR is considered to be excluded specifically in 2.3.0. |
|
The feedback is partially from @rxin Maybe he can provide more inputs later. |
|
Test build #87355 has finished for PR 20567 at commit
|
Based on the comments from @icexelloss , I do not think we should blindly switch back to the original version. At least, provide an option to the end users. |
Yeah. This PR is not ready to merge yet. |
|
^ I am not saying that we should merge it now. I can do the opposite for |
|
RC3 is out. Just to avoid new regressions that might be introduced in the new PR. |
|
RC3 is out. This change could be in 2.3.1 f the vote passes, or in 2.3.0 If the vote fails. It sounds we can't backport and change anything in the main codes until the release 2.3.0 for the reason above. So, you are worried of delaying the release more because it has been delayed pretty much already? I understand this but I would like to ask to get this (whether it throws an exception for both @rxin, do you think we should take this out in 2.3.0 too? Was this your opinion (#20567 (comment))? |
|
@gatorsmile and @rxin, The problem here is that This is the last one left (for now) about PySpark/Pandas interoperability which I found while testing out and I was thinking about targeting 2.3.0. So, for clarification, would you be uncomfortable with one of:
to target 2.3.0 (or 2.3.1 if the vote passes)? FYI, the current one in this PR is 1. If so, let me have two PRs, one for the error message for now to target 2.3.0 (or 2.3.1 if the vote passes), and one for adding a configuration to control the fallback to target master (and maybe 2.3.1). Does that make sense to both of you? cc @cloud-fan too. |
|
We are unable to contain option 3 in Spark 2.3.0. This is too big to merge it in the current stage. We still can do it in 2.3.1. If needed, I am fine to throw a better error message if the PR size is very small; otherwise, keep it unchanged in 2.3.0. Also cc @liancheng @yhuai |
|
Just FYI, except option 3., the complexity in other options and the PR size will be all similar - #20567 (comment) and #20567 (comment) |
|
Then, let us wait for the release of Spark 2.3.0. Thanks! |
|
I mean the actual change here is small. The diff maybe looks larger here because of removed |
|
The behavior inconsistency between In the current stage, we are unable to merge the fix for these new features to Spark 2.3 branch. Let us wait for the release of Spark 2.3.0 |
|
There is one more thing - #20567 (comment) We didn't complete binary type support yet in Python side but there is a hole here .. |
|
What is the root cause? Do we have a trivial fix to resolve/block it? |
|
The root cause is Arrow conversion in Python side interprets binaries as This is the most trivial fix. I made a fix safe and small as possible as I can here. I can fix the error message only but the size of change and diff is virtually the same - #20567 (comment). |
|
The binary type bug sounds like a blocker, can we just fix it surgically by checking the supported data types before going to the arrow optimization path? For now we can stick with that the current behavior is, i.e. throw exception. The inconsistent behavior between |
That's basically (#20567 (comment)): if # 'spark.sql.execution.arrow.enabled' true?
require_minimum_pyarrow_version()
try:
to_arrow_schema(self.schema)
# return the one with Arrow
except Exception as e:
raise Exception("'spark.sql.execution.arrow.enabled' blah blah ...")
else:
# return the one without Arrowbecause |
|
^ this change LGTM. Can we make a PR for this change only and leave the fallback part for Spark 2.4? |
|
Sure. |
| require_minimum_pyarrow_version() | ||
| # Check if its schema is convertible in Arrow format. | ||
| to_arrow_schema(self.schema) | ||
| except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to catch more specific exceptions here? i.e. TypeError and ImportError?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, it might depend on which message we want to show. Will open another PR as discussed above.
|
@HyukjinKwon Will you submit a fix for the binary type today? We are very close to RC4. This is kind of urgent if we still want to block it in the Spark 2.3.0 release. |
|
Yup, I will. Sorry for delaying it. I was trying to make the fix small as possible as I can. Let me just open it as a simplest way. |
|
I just opened #20625. I believe this is the smallest and simplest change. Will turn this PR to add a configuration later for 2.4 as discussed. |
|
Thanks! Happy Lunar New Year! |
|
I just opened another PR for adding a configuration - #20678. Let me close this one. |
What changes were proposed in this pull request?
This PR proposes to fallback to non-Arrow optimization if possible - for unsupported schema, PyArrow version mismatch and PyAarrow missing.
For example, see the unsupported schema case below:
Before
After
Note that, in case of
createDataFrame, we already fallback to make this at least working even though the optimisation is disabled:How was this patch tested?
Manually tested and unit tests were added in
python/pyspark/sql/tests.py.