Skip to content

Conversation

@viirya
Copy link
Member

@viirya viirya commented Oct 27, 2017

What changes were proposed in this pull request?

Under the current execution mode of Python UDFs, we don't well support Python UDFs as branch values or else value in CaseWhen expression. The execution of batch Python UDFs evaluates the UDFs in an operator at all rows. It breaks the semantics of the conditional expressions and causes failures under some cases:

from pyspark.sql import Row
from pyspark.sql.functions import col, udf, when
from pyspark.sql.types import IntegerType

df = sc.parallelize([Row(x=5), Row(x=0)]).toDF()
f = udf(lambda value: 10 // int(value), IntegerType())
whenExpr1 = when((col('x') > 0), f(col('x')))
df.select(whenExpr1).collect() ## Raise a division by zero error

Even from performance perspective, to evaluate all Python UDFs used in conditional expressions can be waste of computing, if only small portion of rows satisfies the conditions.

The patch fixes the issue by adding an extra argument for Python UDFs used with conditional expressions. The argument takes the evaluated value of conditions. In Python side, we can optionally run Python UDFs based on the condition value.

Question: How about vectorized Python UDFs?

Seems it doesn't make much sense to do similar with vectorized UDFs. Vectoroized UDFs process input as batch of rows, instead of single row at one time. We can't simply optionally run vectorized UDFs only on valid rows. But as pandas Series can be more resistant to such error and evaluate to inf for shown case, it should be less serious than batch UDFs. As vectorized Python UDFs are not in releases, maybe we can consider to disable using it with conditional expression and don't worry breaking compatibility.

How was this patch tested?

Added python tests.

@SparkQA
Copy link

SparkQA commented Oct 27, 2017

Test build #83139 has finished for PR 19592 at commit 0515435.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BatchEvalPythonExecBase(
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
  • case class BatchOptEvalPythonExec(

@viirya viirya force-pushed the SPARK-22347 branch 2 times, most recently from 1c523d1 to 9744e77 Compare October 28, 2017 00:22
@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

@SparkQA
Copy link

SparkQA commented Oct 28, 2017

Test build #83140 has finished for PR 19592 at commit 1c523d1.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BatchEvalPythonExecBase(
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
  • case class BatchOptEvalPythonExec(

@SparkQA
Copy link

SparkQA commented Oct 28, 2017

Test build #83141 has finished for PR 19592 at commit 9744e77.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BatchEvalPythonExecBase(
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
  • case class BatchOptEvalPythonExec(

@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

Seems it fails on python3.4, let me check it locally.

Note: The failure is due to using / under python3 which returns a double that doesn't match the schema.

@rxin
Copy link
Contributor

rxin commented Oct 28, 2017

Is this complexity worth it? Can we just document it as a behavior and users need to be careful with it?

@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

Yeah, it is also an option. It is relatively easy to incorporate the conditional logic into Python UDFs in user side.

@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

One question is this behavior isn't so much intuitive for end users without knowledge of Python UDFs internals. It might be a bit weird to think that Python UDFs in conditional expressions are not really conditional. So I'm just not sure if making it as it is is a best option, even with document for it.

@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

A large part of the change is refactoring. IMHO, if possibly, it is better to allow Python UDFs running with conditional expressions normally. Thanks.

@SparkQA
Copy link

SparkQA commented Oct 28, 2017

Test build #83150 has finished for PR 19592 at commit 3a5c4c8.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BatchEvalPythonExecBase(
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
  • case class BatchOptEvalPythonExec(

@SparkQA
Copy link

SparkQA commented Oct 28, 2017

Test build #83152 has finished for PR 19592 at commit 138a366.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BatchEvalPythonExecBase(
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
  • case class BatchOptEvalPythonExec(

@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

retest this please.

@HyukjinKwon
Copy link
Member

To me, I think I slightly more prefer documenting this limitation, given complexity vs gain for now. But want to know what others think.

@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

@HyukjinKwon Thanks for the comment. Yeah, want to know if we have consensus that just to document it.

@SparkQA
Copy link

SparkQA commented Oct 28, 2017

Test build #83156 has finished for PR 19592 at commit 138a366.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BatchEvalPythonExecBase(
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
  • case class BatchOptEvalPythonExec(

@viirya
Copy link
Member Author

viirya commented Oct 28, 2017

retest this please.

@SparkQA
Copy link

SparkQA commented Oct 28, 2017

Test build #83167 has finished for PR 19592 at commit 138a366.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • abstract class BatchEvalPythonExecBase(
  • case class BatchEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], child: SparkPlan)
  • case class BatchOptEvalPythonExec(

@viirya
Copy link
Member Author

viirya commented Oct 29, 2017

ping @ueshin @BryanCutler @cloud-fan Would you mind to provide some insights? Should we add just a document for it or fix it in your opinions? Thanks.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having another python udf eval type just for this special case in addition to the added complexity, I am also slightly leaning to just documenting this for now.

else:
elif eval_type == PythonEvalType.SQL_BATCHED_UDF:
return arg_offsets, wrap_udf(row_func, return_type)
elif eval_type == PythonEvalType.SQL_BATCHED_OPT_UDF:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to do this type of wrapping in BatchEvalPython, and remove the need to add another eval_type? If so then you could just the true/false result as is and not have to add anything in python. I think that would reduce the scope of this and simplify things a bit.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the python functions are serialized and maybe broadcasted further, I didn't figure out a way to do this wrapping in BatchEvalPython in Scala side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible is, we do the wrapping when creating UDFs in Python side. Even for UDFs not used in conditional expressions, we still add an extra boolean argument to the end of its argument list. We don't need another eval_type with this fix.

But currently I think documenting it seems a more acceptable fix for others.

@viirya
Copy link
Member Author

viirya commented Oct 31, 2017

After collected the opinions so far, doing just document is the consensus. I will close this for now and submit a simple PR to document it later.

@viirya viirya closed this Oct 31, 2017
ghost pushed a commit to dbtsai/spark that referenced this pull request Nov 1, 2017
…fs with conditional expressions

## What changes were proposed in this pull request?

Under the current execution mode of Python UDFs, we don't well support Python UDFs as branch values or else value in CaseWhen expression.

Since to fix it might need the change not small (e.g., apache#19592) and this issue has simpler workaround. We should just notice users in the document about this.

## How was this patch tested?

Only document change.

Author: Liang-Chi Hsieh <[email protected]>

Closes apache#19617 from viirya/SPARK-22347-3.
@viirya viirya deleted the SPARK-22347 branch December 27, 2023 18:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants