Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ private[spark] object PythonEvalType {
val SQL_SCALAR_PANDAS_UDF = 200
val SQL_GROUPED_MAP_PANDAS_UDF = 201
val SQL_GROUPED_AGG_PANDAS_UDF = 202
val SQL_WINDOW_AGG_PANDAS_UDF = 203

def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
case SQL_GROUPED_AGG_PANDAS_UDF => "SQL_GROUPED_AGG_PANDAS_UDF"
case SQL_WINDOW_AGG_PANDAS_UDF => "SQL_WINDOW_AGG_PANDAS_UDF"
}
}

Expand Down
1 change: 1 addition & 0 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ class PythonEvalType(object):
SQL_SCALAR_PANDAS_UDF = 200
SQL_GROUPED_MAP_PANDAS_UDF = 201
SQL_GROUPED_AGG_PANDAS_UDF = 202
SQL_WINDOW_AGG_PANDAS_UDF = 203


def portable_hash(x):
Expand Down
34 changes: 30 additions & 4 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2580,10 +2580,12 @@ def pandas_udf(f=None, returnType=None, functionType=None):
The returned scalar can be either a python primitive type, e.g., `int` or `float`
or a numpy data type, e.g., `numpy.int64` or `numpy.float64`.

:class:`ArrayType`, :class:`MapType` and :class:`StructType` are currently not supported as
output types.
:class:`MapType` and :class:`StructType` are currently not supported as output types.
Copy link
Member

@HyukjinKwon HyukjinKwon Apr 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icexelloss, actually should we keep this note? I think this is matched with https://spark.apache.org/docs/latest/sql-programming-guide.html#supported-sql-types which we documented there and SQLConf.

Probably, just leaving a link could be fine. Removing out is okay to me too. I think just adding a note for all the Pandas udfs works too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am leaning towards keeping this in the API doc and maybe make sql-programming-guide link to this.

I think most user would look for API docs first rather than sql-programming-guide, so it's probably a bit more convenient to have it here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think that works too. I left a comment only because it looked mismatched with this api doc and the sql programming guide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg`
Group aggregate UDFs are used with :meth:`pyspark.sql.GroupedData.agg` and
:class:`pyspark.sql.Window`

This example shows using grouped aggregated UDFs with groupby:

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
Expand All @@ -2600,7 +2602,31 @@ def pandas_udf(f=None, returnType=None, functionType=None):
| 2| 6.0|
+---+-----------+

.. seealso:: :meth:`pyspark.sql.GroupedData.agg`
This example shows using grouped aggregated UDFs as window functions. Note that only
unbounded window frame is supported at the moment:

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> from pyspark.sql import Window
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf("double", PandasUDFType.GROUPED_AGG) # doctest: +SKIP
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't have PandasUDFType.WINDOW_AGG and a pandas udf defined as PandasUDFType.GROUPED_AGG can be both used with groupby and Window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly. The idea is that the producer of the UDF can produce a grouped agg udf, such as weighted mean, and the consumer can use the UDF in both groupby and window, similar to how SQL aggregation function work.

... def mean_udf(v):
... return v.mean()
>>> w = Window.partitionBy('id') \\
... .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
>>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show() # doctest: +SKIP
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+

.. seealso:: :meth:`pyspark.sql.GroupedData.agg` and :class:`pyspark.sql.Window`

.. note:: The user-defined functions are considered deterministic by default. Due to
optimization, duplicate invocations may be eliminated or the function may even be invoked
Expand Down
238 changes: 238 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -5454,6 +5454,15 @@ def test_retain_group_columns(self):
expected1 = df.groupby(df.id).agg(sum(df.v))
self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_array_type(self):
Copy link
Contributor Author

@icexelloss icexelloss Apr 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated, but I figured its shouldn't hurt to add an array test in GroupedAggPandasUDFTests..

from pyspark.sql.functions import pandas_udf, PandasUDFType

df = self.data

array_udf = pandas_udf(lambda x: [1.0, 2.0], 'array<double>', PandasUDFType.GROUPED_AGG)
result1 = df.groupby('id').agg(array_udf(df['v']).alias('v2'))
self.assertEquals(result1.first()['v2'], [1.0, 2.0])

def test_invalid_args(self):
from pyspark.sql.functions import mean

Expand All @@ -5479,6 +5488,235 @@ def test_invalid_args(self):
'mixture.*aggregate function.*group aggregate pandas UDF'):
df.groupby(df.id).agg(mean_udf(df.v), mean(df.v)).collect()


@unittest.skipIf(
not _have_pandas or not _have_pyarrow,
_pandas_requirement_message or _pyarrow_requirement_message)
class WindowPandasUDFTests(ReusedSQLTestCase):
@property
def data(self):
from pyspark.sql.functions import array, explode, col, lit
return self.spark.range(10).toDF('id') \
.withColumn("vs", array([lit(i * 1.0) + col('id') for i in range(20, 30)])) \
.withColumn("v", explode(col('vs'))) \
.drop('vs') \
.withColumn('w', lit(1.0))

@property
def python_plus_one(self):
Copy link
Member

@ueshin ueshin May 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we move pandas_udfs for tests to the common place?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree those should be shared, but let's do it maybe in a separate PR? Because the refactor will probably touch many test cases and the PR is quite large already..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

from pyspark.sql.functions import udf
return udf(lambda v: v + 1, 'double')

@property
def pandas_scalar_time_two(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType
return pandas_udf(lambda v: v * 2, 'double')

@property
def pandas_agg_mean_udf(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def avg(v):
return v.mean()
return avg

@property
def pandas_agg_max_udf(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def max(v):
return v.max()
return max

@property
def pandas_agg_min_udf(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.GROUPED_AGG)
def min(v):
return v.min()
return min

@property
def unbounded_window(self):
return Window.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

@property
def ordered_window(self):
return Window.partitionBy('id').orderBy('v')

@property
def unpartitioned_window(self):
return Window.partitionBy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we test Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) too?

Copy link
Contributor Author

@icexelloss icexelloss Jun 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can reply on that Window.partitionBy() is the same as Window,.partitionBy().rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) here, otherwise there might be too many combinations to test. But I am ok to add the tests for Window,.partitionBy().rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) in addition to the existing ones. WDYT?


def test_simple(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType, percent_rank, mean, max

df = self.data
w = self.unbounded_window

mean_udf = self.pandas_agg_mean_udf

result1 = df.withColumn('mean_v', mean_udf(df['v']).over(w))
expected1 = df.withColumn('mean_v', mean(df['v']).over(w))

result2 = df.select(mean_udf(df['v']).over(w))
expected2 = df.select(mean(df['v']).over(w))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())

def test_multiple_udfs(self):
from pyspark.sql.functions import max, min, mean

df = self.data
w = self.unbounded_window

result1 = df.withColumn('mean_v', self.pandas_agg_mean_udf(df['v']).over(w)) \
.withColumn('max_v', self.pandas_agg_max_udf(df['v']).over(w)) \
.withColumn('min_w', self.pandas_agg_min_udf(df['w']).over(w))

expected1 = df.withColumn('mean_v', mean(df['v']).over(w)) \
.withColumn('max_v', max(df['v']).over(w)) \
.withColumn('min_w', min(df['w']).over(w))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_replace_existing(self):
from pyspark.sql.functions import mean

df = self.data
w = self.unbounded_window

result1 = df.withColumn('v', self.pandas_agg_mean_udf(df['v']).over(w))
expected1 = df.withColumn('v', mean(df['v']).over(w))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_mixed_sql(self):
from pyspark.sql.functions import mean

df = self.data
w = self.unbounded_window
mean_udf = self.pandas_agg_mean_udf

result1 = df.withColumn('v', mean_udf(df['v'] * 2).over(w) + 1)
expected1 = df.withColumn('v', mean(df['v'] * 2).over(w) + 1)

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())

def test_mixed_udf(self):
from pyspark.sql.functions import mean

df = self.data
w = self.unbounded_window

plus_one = self.python_plus_one
time_two = self.pandas_scalar_time_two
mean_udf = self.pandas_agg_mean_udf

result1 = df.withColumn(
'v2',
plus_one(mean_udf(plus_one(df['v'])).over(w)))
expected1 = df.withColumn(
'v2',
plus_one(mean(plus_one(df['v'])).over(w)))

result2 = df.withColumn(
'v2',
time_two(mean_udf(time_two(df['v'])).over(w)))
expected2 = df.withColumn(
'v2',
time_two(mean(time_two(df['v'])).over(w)))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())

def test_without_partitionBy(self):
from pyspark.sql.functions import mean

df = self.data
w = self.unpartitioned_window
mean_udf = self.pandas_agg_mean_udf

result1 = df.withColumn('v2', mean_udf(df['v']).over(w))
expected1 = df.withColumn('v2', mean(df['v']).over(w))

result2 = df.select(mean_udf(df['v']).over(w))
expected2 = df.select(mean(df['v']).over(w))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())

def test_mixed_sql_and_udf(self):
from pyspark.sql.functions import max, min, rank, col

df = self.data
w = self.unbounded_window
ow = self.ordered_window
max_udf = self.pandas_agg_max_udf
min_udf = self.pandas_agg_min_udf

result1 = df.withColumn('v_diff', max_udf(df['v']).over(w) - min_udf(df['v']).over(w))
expected1 = df.withColumn('v_diff', max(df['v']).over(w) - min(df['v']).over(w))

# Test mixing sql window function and window udf in the same expression
result2 = df.withColumn('v_diff', max_udf(df['v']).over(w) - min(df['v']).over(w))
expected2 = expected1

# Test chaining sql aggregate function and udf
result3 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
.withColumn('min_v', min(df['v']).over(w)) \
.withColumn('v_diff', col('max_v') - col('min_v')) \
.drop('max_v', 'min_v')
expected3 = expected1

# Test mixing sql window function and udf
result4 = df.withColumn('max_v', max_udf(df['v']).over(w)) \
.withColumn('rank', rank().over(ow))
expected4 = df.withColumn('max_v', max(df['v']).over(w)) \
.withColumn('rank', rank().over(ow))

self.assertPandasEqual(expected1.toPandas(), result1.toPandas())
self.assertPandasEqual(expected2.toPandas(), result2.toPandas())
self.assertPandasEqual(expected3.toPandas(), result3.toPandas())
self.assertPandasEqual(expected4.toPandas(), result4.toPandas())

def test_array_type(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = self.data
w = self.unbounded_window

array_udf = pandas_udf(lambda x: [1.0, 2.0], 'array<double>', PandasUDFType.GROUPED_AGG)
result1 = df.withColumn('v2', array_udf(df['v']).over(w))
self.assertEquals(result1.first()['v2'], [1.0, 2.0])

def test_invalid_args(self):
from pyspark.sql.functions import mean, pandas_udf, PandasUDFType

df = self.data
w = self.unbounded_window
ow = self.ordered_window
mean_udf = self.pandas_agg_mean_udf

with QuietTest(self.sc):
with self.assertRaisesRegexp(
AnalysisException,
'.*not supported within a window function'):
foo_udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP)
df.withColumn('v2', foo_udf(df['v']).over(w))

with QuietTest(self.sc):
with self.assertRaisesRegexp(
AnalysisException,
'.*Only unbounded window frame is supported.*'):
df.withColumn('mean_v', mean_udf(df['v']).over(ow))


if __name__ == "__main__":
from pyspark.sql.tests import *
if xmlrunner:
Expand Down
20 changes: 19 additions & 1 deletion python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,21 @@ def wrapped(*series):
return lambda *a: (wrapped(*a), arrow_return_type)


def wrap_window_agg_pandas_udf(f, return_type):
# This is similar to grouped_agg_pandas_udf, the only difference
# is that window_agg_pandas_udf needs to repeat the return value
# to match window length, where grouped_agg_pandas_udf just returns
# the scalar value.
arrow_return_type = to_arrow_type(return_type)

def wrapped(*series):
import pandas as pd
result = f(*series)
return pd.Series([result]).repeat(len(series[0]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering why this needs to be repeated to the length of the series and grouped agg doesn't?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

window aggregation results are broadcasted to each input row and therefore we repeat the value here to match the input rows.

Copy link
Member

@HyukjinKwon HyukjinKwon May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave a short comment while we are here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added comments to describe the function

Copy link
Member

@HyukjinKwon HyukjinKwon May 31, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, this place is the only place where it's diverted (by repeating?); therefore, needs Windows specific attribute to distinguish grouped agg vs windows agg?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - I tried to do this on the Java side but it's tricky and complicated to merging the input row and output of udf if they are not 1-1 mapping. So I ended up doing this..


return lambda *a: (wrapped(*a), arrow_return_type)


def read_single_udf(pickleSer, infile, eval_type):
num_arg = read_int(infile)
arg_offsets = [read_int(infile) for i in range(num_arg)]
Expand All @@ -151,6 +166,8 @@ def read_single_udf(pickleSer, infile, eval_type):
return arg_offsets, wrap_grouped_map_pandas_udf(func, return_type, argspec)
elif eval_type == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
return arg_offsets, wrap_grouped_agg_pandas_udf(func, return_type)
elif eval_type == PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF:
return arg_offsets, wrap_window_agg_pandas_udf(func, return_type)
elif eval_type == PythonEvalType.SQL_BATCHED_UDF:
return arg_offsets, wrap_udf(func, return_type)
else:
Expand Down Expand Up @@ -195,7 +212,8 @@ def read_udfs(pickleSer, infile, eval_type):

if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF):
PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF):
timezone = utf8_deserializer.loads(infile)
ser = ArrowStreamPandasSerializer(timezone)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1739,9 +1739,10 @@ class Analyzer(
* 1. For a list of [[Expression]]s (a projectList or an aggregateExpressions), partitions
* it two lists of [[Expression]]s, one for all [[WindowExpression]]s and another for
* all regular expressions.
* 2. For all [[WindowExpression]]s, groups them based on their [[WindowSpecDefinition]]s.
* 3. For every distinct [[WindowSpecDefinition]], creates a [[Window]] operator and inserts
* it into the plan tree.
* 2. For all [[WindowExpression]]s, groups them based on their [[WindowSpecDefinition]]s
* and [[WindowFunctionType]]s.
* 3. For every distinct [[WindowSpecDefinition]] and [[WindowFunctionType]], creates a
* [[Window]] operator and inserts it into the plan tree.
*/
object ExtractWindowExpressions extends Rule[LogicalPlan] {
private def hasWindowFunction(exprs: Seq[Expression]): Boolean =
Expand Down Expand Up @@ -1901,15 +1902,15 @@ class Analyzer(
s"Please file a bug report with this error message, stack trace, and the query.")
} else {
val spec = distinctWindowSpec.head
(spec.partitionSpec, spec.orderSpec)
(spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))
}
}.toSeq

// Third, we aggregate them by adding each Window operator for each Window Spec and then
// setting this to the child of the next Window operator.
val windowOps =
groupedWindowExpressions.foldLeft(child) {
case (last, ((partitionSpec, orderSpec), windowExpressions)) =>
case (last, ((partitionSpec, orderSpec, _), windowExpressions)) =>
Window(windowExpressions, partitionSpec, orderSpec, last)
}

Expand Down
Loading