Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
updated user guide ofr cogroup pandas udf
  • Loading branch information
d80tb7 committed Sep 29, 2019
commit 695e0ece9bd681c1788390c20a27d7b565b23916
35 changes: 35 additions & 0 deletions docs/sql-pyspark-pandas-with-arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,41 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p
[`pyspark.sql.DataFrame.mapsInPandas`](api/python/pyspark.sql.html#pyspark.sql.DataFrame.mapInPandas).


### Cogrouped Map

CoGrouped map Pandas UDFs allow two DataFrames to be cogrouped a by a common key and then a python function applied to
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it CoGrouped or Cogrouped :-)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cogrouped a by -> cogrouped by

each cogroup. They are used with `groupBy().cogroup().apply()` which consists of the following steps:

* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
* Apply a function to each cogroup. The input of of the function is two `pandas.DataFrame` (with an optional Tuple
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

duplicate of in input of of

representing the key). The output of the function is a `pandas.DataFrame`.
* Combine the results into a new `DataFrame`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe elaborate to explain results are pandas.DataFrames from all groups that are combined in a new pyspark.DataFrame


To use `groupBy().cogroup().apply()`, the user needs to define the following:
* A Python function that defines the computation for each cogroup.
* A `StructType` object or a string that defines the schema of the output `DataFrame`.

The column labels of the returned `pandas.DataFrame` must either match the field names in the
defined output schema if specified as strings, or match the field data types by position if not
strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html#pandas.DataFrame)
on how to label columns when constructing a `pandas.DataFrame`.

Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
memory exceptions, especially if the group sizes are skewed. The configuration for[maxRecordsPerBatch](#setting-arrow-batch-size)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typoe -> for[maxRecordsPerBatch] -> for [maxRecordsPerBatch]

is not applied and it is up to the userto ensure that the cogrouped data will fit into the available memory.

The following example shows how to use `groupby().cogroup().apply()` to perform an asof join between two datasets.

<div class="codetabs">
<div data-lang="python" markdown="1">
{% include_example cogrouped_map_pandas_udf python/sql/arrow.py %}
</div>
</div>

For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf) and
[`pyspark.sql.CoGroupedData.apply`](api/python/pyspark.sql.html#pyspark.sql.CoGroupedData.apply).


## Usage Notes

### Supported SQL Types
Expand Down
31 changes: 31 additions & 0 deletions examples/src/main/python/sql/arrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,35 @@ def filter_func(batch_iter):
# +---+---+
# $example off:map_iter_pandas_udf$

def cogrouped_map_pandas_udf_example(spark):
# $example on:cogrouped_map_pandas_udf$
import pandas as pd

from pyspark.sql.functions import pandas_udf, PandasUDFType

df1 = spark.createDataFrame(
[(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
("time", "id", "v1"))

df2 = spark.createDataFrame(
[(20000101, 1, "x"), (20000101, 2, "y")],
("time", "id", "v2"))

@pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
def asof_join(l, r):
return pd.merge_asof(l, r, on="time", by="id")

df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show()
# +--------+---+---+---+
# | time| id| v1| v2|
# +--------+---+---+---+
# |20000101| 1|1.0| x|
# |20000102| 1|3.0| x|
# |20000101| 2|2.0| y|
# |20000102| 2|4.0| y|
# +--------+---+---+---+
# $example off:cogrouped_map_pandas_udf$


if __name__ == "__main__":
spark = SparkSession \
Expand All @@ -276,5 +305,7 @@ def filter_func(batch_iter):
grouped_agg_pandas_udf_example(spark)
print("Running pandas_udf map iterator example")
map_iter_pandas_udf_example(spark)
print("Running pandas_udf cogrouped map example")
cogrouped_map_pandas_udf_example(spark)

spark.stop()
4 changes: 2 additions & 2 deletions python/pyspark/sql/cogroup.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ def apply(self, udf):
... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
... ("time", "id", "v1"))
>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
>>> @pandas_udf("time int, id int, v1 double, v2 string", PandasUDFType.COGROUPED_MAP)
... def asof_join(l, r):
... return pd.merge_asof(l, r, on="time", by="id")
Expand Down
52 changes: 52 additions & 0 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3233,6 +3233,58 @@ def pandas_udf(f=None, returnType=None, functionType=None):
| 1| 21|
+---+---+

6. COGROUPED_MAP

A cogrouped map UDF defines transformation: two `pandas.DataFrame` -> `pandas.DataFrame`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think instead of "two pandas.DataFrame", better to show "(pandas.DataFrame, pandas.DataFrame) -> pandas.DataFrame"

The returnType should be a :class:`StructType` describing the schema of the returned
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

returnType -> returnType

`pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match
the field names in the defined returnType schema if specified as strings, or match the
field data types by position if not strings, e.g. integer indices.
The length of the returned `pandas.DataFrame` can be arbitrary.

CoGrouped map UDFs are used with :meth:`pyspark.sql.CoGroupedData.apply`.

>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df1 = spark.createDataFrame(
... [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
... ("time", "id", "v1"))
>>> df2 = spark.createDataFrame(
... [(20000101, 1, "x"), (20000101, 2, "y")],
... ("time", "id", "v2"))
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(l, r):
... return pd.merge_asof(l, r, on="time", by="id")
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
|20000101| 1|1.0| x|
|20000102| 1|3.0| x|
|20000101| 2|2.0| y|
|20000102| 2|4.0| y|
+--------+---+---+---+

Alternatively, the user can define a function that takes three arguments. In this case,
the grouping key(s) will be passed as the first argument and the data will be passed as the
second and third arguments. The grouping key(s) will be passed as a tuple of numpy data
types, e.g., `numpy.int32` and `numpy.float64`. The data will still be passed in as two
`pandas.DataFrame` containing all columns from the original Spark DataFrames.
>>> @pandas_udf("time int, id int, v1 double, v2 string",
... PandasUDFType.COGROUPED_MAP) # doctest: +SKIP
... def asof_join(k, l, r):
... if k == (1,):
... return pd.merge_asof(l, r, on="time", by="id")
... else:
... return pd.DataFrame(columns=['time', 'id', 'v1', 'v2'])
>>> df1.groupby("id").cogroup(df2.groupby("id")).apply(asof_join).show() # doctest: +SKIP
+--------+---+---+---+
| time| id| v1| v2|
+--------+---+---+---+
|20000101| 1|1.0| x|
|20000102| 1|3.0| x|
+--------+---+---+---+

.. note:: The user-defined functions are considered deterministic by default. Due to
optimization, duplicate invocations may be eliminated or the function may even be invoked
more times than it is present in the query. If your function is not deterministic, call
Expand Down