Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
code review comments
  • Loading branch information
d80tb7 committed Oct 30, 2019
commit f7b9b801506295e19507b0ffd9e79ebe8151f037
10 changes: 5 additions & 5 deletions docs/sql-pyspark-pandas-with-arrow.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,13 +180,13 @@ For detailed usage, please see [`pyspark.sql.functions.pandas_udf`](api/python/p

### Cogrouped Map

CoGrouped map Pandas UDFs allow two DataFrames to be cogrouped a by a common key and then a python function applied to
Cogrouped map Pandas UDFs allow two DataFrames to be cogrouped by a common key and then a python function applied to
each cogroup. They are used with `groupBy().cogroup().apply()` which consists of the following steps:

* Shuffle the data such that the groups of each dataframe which share a key are cogrouped together.
* Apply a function to each cogroup. The input of of the function is two `pandas.DataFrame` (with an optional Tuple
* Apply a function to each cogroup. The input of the function is two `pandas.DataFrame` (with an optional Tuple
representing the key). The output of the function is a `pandas.DataFrame`.
* Combine the results into a new `DataFrame`.
* Combine the pandas.DataFrames from all groups into a new `DataFrame`.

To use `groupBy().cogroup().apply()`, the user needs to define the following:
* A Python function that defines the computation for each cogroup.
Expand All @@ -198,8 +198,8 @@ strings, e.g. integer indices. See [pandas.DataFrame](https://pandas.pydata.org/
on how to label columns when constructing a `pandas.DataFrame`.

Note that all data for a cogroup will be loaded into memory before the function is applied. This can lead to out of
memory exceptions, especially if the group sizes are skewed. The configuration for[maxRecordsPerBatch](#setting-arrow-batch-size)
is not applied and it is up to the userto ensure that the cogrouped data will fit into the available memory.
memory exceptions, especially if the group sizes are skewed. The configuration for [maxRecordsPerBatch](#setting-arrow-batch-size)
is not applied and it is up to the user to ensure that the cogrouped data will fit into the available memory.

The following example shows how to use `groupby().cogroup().apply()` to perform an asof join between two datasets.

Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3235,12 +3235,12 @@ def pandas_udf(f=None, returnType=None, functionType=None):

6. COGROUPED_MAP

A cogrouped map UDF defines transformation: two `pandas.DataFrame` -> `pandas.DataFrame`
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`. The column labels of the returned `pandas.DataFrame` must either match
the field names in the defined returnType schema if specified as strings, or match the
field data types by position if not strings, e.g. integer indices.
The length of the returned `pandas.DataFrame` can be arbitrary.
A cogrouped map UDF defines transformation: (`pandas.DataFrame`, `pandas.DataFrame`) ->
`pandas.DataFrame`. The `returnType` should be a :class:`StructType` describing the schema
of the returned `pandas.DataFrame`. The column labels of the returned `pandas.DataFrame`
must either match the field names in the defined `returnType` schema if specified as strings,
or match the field data types by position if not strings, e.g. integer indices. The length
of the returned `pandas.DataFrame` can be arbitrary.

CoGrouped map UDFs are used with :meth:`pyspark.sql.CoGroupedData.apply`.

Expand Down