Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fixed udf and its test
  • Loading branch information
e-dorigatti committed May 24, 2018
commit b0af18e400c01095dd87589260ce80e9712a9f07
2 changes: 1 addition & 1 deletion python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ def foo(x):
raise StopIteration()

with self.assertRaises(Py4JJavaError):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we check for error message here?

self.spark.range(0, 1000).withColumn('v', udf(foo)).show()
self.spark.range(0, 1000).withColumn('v', udf(foo)('id')).show()

def test_validate_column_types(self):
from pyspark.sql.functions import udf, to_json
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/sql/udf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql.types import StringType, DataType, StructType, _parse_datatype_string,\
to_arrow_type, to_arrow_schema
from pyspark.util import _get_argspec
from pyspark.util import _get_argspec, fail_on_stopiteration

__all__ = ["UDFRegistration"]

Expand Down Expand Up @@ -92,7 +92,7 @@ def __init__(self, func,
raise TypeError(
"Invalid evalType: evalType should be an int but is {}".format(evalType))

self.func = func
self.func = fail_on_stopiteration(func)
Copy link
Member

@HyukjinKwon HyukjinKwon May 24, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because of this from a very quick look. This looks __doc__ missing. You should wrap it with @functools.wraps.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

self._returnType = returnType
# Stores UserDefinedPythonFunctions jobj, once initialized
self._returnType_placeholder = None
Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ def majorMinorVersion(sparkVersion):

def fail_on_stopiteration(f):
"""
Wraps the input function to fail on 'StopIteration' by raising a 'RuntimeError'
prevents silent loss of data when 'f' is used in a for loop
Wraps the input function to fail on 'StopIteration' by raising a 'RuntimeError'
prevents silent loss of data when 'f' is used in a for loop
"""
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    """
    Wraps the input function to fail on 'StopIteration' by raising a 'RuntimeError'
    prevents silent loss of data when 'f' is used in a for loop
    """

per PEP 8

def wrapper(*args, **kwargs):
try:
Expand Down