Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions python/pyspark/sql/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,27 @@ def apply(self, udf):
| 2| 1.1094003924504583|
+---+-------------------+

Notes on grouping column:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This explains the general idea. I plan to improve the doc if people think this change is good.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks more reasonable to me to pass the grouping columns to UDF and let the UDF to decide if it wants to include the grouping columns or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for ^

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan That's what I thought too initially. Let's consider this use case,

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(group_column, *x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    group_key = pdf[group_column].iloc[0]
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[group_key] + [model.params[i] for i in   x_columns]], columns=[group_column] + x_columns)

beta = df.groupby(group_column).apply(ols)

This is a simple pandas UDF that does a linear regression. The issue is, although the UDF (linear regression) has nothing to do with the grouping column, the user needs to deal with grouping column in the UDF. In other words, the UDF is coupled with the grouping column.

If we make it such that grouping columns are prepend to UDF result, then the user can write something like this:

import statsmodels.api as sm
# df has four columns: id, y, x1, x2

group_column = 'id'
y_column = 'y'
x_columns = ['x1', 'x2']
schema = df.select(*x_columns).schema

@pandas_udf(schema, PandasUDFType.GROUP_MAP)
# Input/output are both a pandas.DataFrame
def ols(pdf):
    y = pdf[y_column]
    X = pdf[x_columns]
      X = sm.add_constant(X)
    model = sm.OLS(y, X).fit()

    return pd.DataFrame([[model.params[i] for i in   x_columns]], columns=x_columns)

beta = df.groupby(group_column).apply(ols)

Now the UDF is cleaner because it only deals with columns that are relevant to the regression. It also make the UDF more reusable, as the user can now do something like:

beta1 = df.groupby('a').apply(ols)
beta2 = df.groupby('a', 'b').apply(ols)

Because the UDF is now decoupled with the grouping column, the user can reuse the same udf with different grouping, which is not possible with the current API.

@cloud-fan @HyukjinKwon What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I saw this usecase as described in the JIRA and I got that the specific case can be simplified; however, I am not sure if it's straightforward to the end users.

For example, if I use pandas_udf I think I would simply expect the return schema is matched as described in returnType. I think pandas_udf already need some background and I think we should make it simpler as possible as we can.

It might be convenient to make the guarantee on grouping columns in some cases vs this might be a kind of magic inside.

I would prefer to let the UDF to specify the grouping columns to make this more straightforward more ..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HyukjinKwon's proposal sounds good to me too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to confirm what the result schema will be like finally.
If users want to include the keys, the udf should include the keys in its output and the keys will not be prepended automatically?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, sounds good. Thanks!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks all for the discussion. I will update the Jira and open a new PR.


Depending on whether the UDF returns grouping columns as part of its return type, this
function may or may not prepend grouping columns to the result. This is explained as
following:

1. UDF returns all grouping columns:

This function will not prepend any grouping columns to the result.

2. UDF returns some grouping columns:

This function will prepend grouping columns that are not returned by the UDF.

3. UDF returns no grouping columns:

This function will prepend all grouping columns.

In all cases, if the grouping column and the UDF output conflict, the value in the UDF
output will override the origin value of the grouping column.

.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`

"""
Expand Down
46 changes: 36 additions & 10 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -3995,23 +3995,49 @@ def test_coerce(self):
self.assertFramesEqual(expected, result)

def test_complex_groupby(self):
import pandas as pd
from pyspark.sql.functions import pandas_udf, col, PandasUDFType
df = self.data
pdf = df.toPandas()

@pandas_udf(
'id long, v int, norm double',
'v int, v2 double',
PandasUDFType.GROUP_MAP
)
def normalize(pdf):
def foo(pdf):
v = pdf.v
return pdf.assign(norm=(v - v.mean()) / v.std())

result = df.groupby(col('id') % 2 == 0).apply(normalize).sort('id', 'v').toPandas()
pdf = df.toPandas()
expected = pdf.groupby(pdf['id'] % 2 == 0).apply(normalize.func)
expected = expected.sort_values(['id', 'v']).reset_index(drop=True)
expected = expected.assign(norm=expected.norm.astype('float64'))
self.assertFramesEqual(expected, result)
return pd.DataFrame({'v': v + 1, 'v2': v - v.mean()})[:]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should we copy here by the way?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just for simplifying the test - pandas has very complicated behavior when it comes to what's the index of the return value when using groupby apply

If interested, take a look at http://nbviewer.jupyter.org/gist/mbirdi/05f8a83d340476e5f03a


# Use expression in groupby. The grouping expression should be prepended to the result.
result1 = df.groupby(col('id') % 2 == 0).apply(foo).sort('((id % 2) = 0)', 'v').toPandas()
expected1 = pdf.groupby(pdf['id'] % 2 == 0).apply(foo.func)
expected1.index.names = ['((id % 2) = 0)', None]
expected1 = expected1.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v'])\
.reset_index(drop=True)

# Grouping column is not returned by the udf. The grouping column should be prepended.
result2 = df.groupby('id').apply(foo).sort('id', 'v').toPandas()
expected2 = pdf.groupby('id').apply(foo.func).reset_index(level=0) \
.sort_values(['id', 'v'])

# Only one of the grouping column is returned by the udf. In this case, the grouping column
# that is not returned by the udf should be prepended.
result3 = df.groupby('id', 'v').apply(foo).sort('id', 'v').toPandas()
expected3 = pdf.groupby(['id', 'v']).apply(foo.func).reset_index(level=0) \
.reset_index(drop=True).sort_values(['id', 'v'])

# Mix expression and column
result4 = df.groupby(col('id') % 2 == 0, 'v').apply(foo).sort('((id % 2) = 0)', 'v')\
.toPandas()
expected4 = pdf.groupby([pdf['id'] % 2 == 0, 'v']).apply(foo.func)
expected4.index.names = ['((id % 2) = 0)', 'v', None]
expected4 = expected4.reset_index(level=0).sort_values(['((id % 2) = 0)', 'v'])\
.reset_index(drop=True)

self.assertFramesEqual(expected1, result1)
self.assertFramesEqual(expected2, result2)
self.assertFramesEqual(expected3, result3)
self.assertFramesEqual(expected4, result4)

def test_empty_groupby(self):
from pyspark.sql.functions import pandas_udf, col, PandasUDFType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,8 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Prunes the unused columns from child of Aggregate/Expand/Generate
case a @ Aggregate(_, _, child) if (child.outputSet -- a.references).nonEmpty =>
a.copy(child = prunedChild(child, a.references))
case f @ FlatMapGroupsInPandas(_, _, _, child) if (child.outputSet -- f.references).nonEmpty =>
case f @ FlatMapGroupsInPandas(_, _, _, _, child)
if (child.outputSet -- f.references).nonEmpty =>
f.copy(child = prunedChild(child, f.references))
case e @ Expand(_, _, child) if (child.outputSet -- e.references).nonEmpty =>
e.copy(child = prunedChild(child, e.references))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, Expre
*/
case class FlatMapGroupsInPandas(
groupingAttributes: Seq[Attribute],
additionalGroupingAttributes: Seq[Attribute],
functionExpr: Expression,
output: Seq[Attribute],
child: LogicalPlan) extends UnaryNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import java.util.Locale

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.language.implicitConversions

import org.apache.spark.annotation.InterfaceStability
Expand Down Expand Up @@ -457,13 +458,26 @@ class RelationalGroupedDataset protected[sql](

val groupingNamedExpressions = groupingExprs.map {
case ne: NamedExpression => ne
case other => Alias(other, other.toString)()
case other => Alias(other, toPrettySQL(other))()
}
val groupingAttributes = groupingNamedExpressions.map(_.toAttribute)
val child = df.logicalPlan
val project = Project(groupingNamedExpressions ++ child.output, child)
val output = expr.dataType.asInstanceOf[StructType].toAttributes
val plan = FlatMapGroupsInPandas(groupingAttributes, expr, output, project)
val udfOutput: Seq[Attribute] = expr.dataType.asInstanceOf[StructType].toAttributes
val additionalGroupingAttributes = mutable.ArrayBuffer[Attribute]()

for (attribute <- groupingAttributes) {
if (!udfOutput.map(_.name).contains(attribute.name)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering whether we should decide the additional grouping attributes by only their names?

For example from tests:

result3 = df.groupby('id', 'v').apply(foo).sort('id', 'v').toPandas()

The column v in result3 is not the actual grouping value, which is overwritten by the returned value from the UDF because the returned column name contains the name. I'm not sure it is the desired behavior.

Copy link
Contributor Author

@icexelloss icexelloss Jan 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ueshin You brought up a very good point about an issue I struggle a bit with - conflicting column names in grouping column and UDF output.

When this happens, we have a few choices:

  1. Keep both columns and rename one of them
    The benefit of this approach is that it gives the user the most information, but might result in arbitrary column names such like v_. Also another downside is if the UDF just adds or replace columns, this will result duplicate columns.
  2. Keep both columns and don't rename
    This is consistent with groupby agg behavior, so probably better than (1), but still, will result in duplicate columns if the UDF only adds or replaces columns on input
  3. Drop conflict group columns
    This is the approach implemented in this PR. The reason I choose this is because I think it's a rare case that the user want to change the grouping column and at the same time, want the original grouping column. Therefore, I think it makes most sense to make the user do a bit extra work - explicitly create a another column rather than overriding the grouping column.
  4. Drop conflict UDF columns
    I don't think drop UDF output is reasonable behavior.

@ueshin which one do you prefer?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this relates to the discussion above (#20211 (comment)).
Let's wait and see for now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's do that.

additionalGroupingAttributes += attribute
}
}

val output = additionalGroupingAttributes ++ udfOutput

val plan = FlatMapGroupsInPandas(
groupingAttributes,
additionalGroupingAttributes,
expr, output, project)

Dataset.ofRows(df.sparkSession, plan)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -452,8 +452,9 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.FlatMapGroupsInR(f, p, b, is, os, key, value, grouping, data, objAttr, child) =>
execution.FlatMapGroupsInRExec(f, p, b, is, os, key, value, grouping,
data, objAttr, planLater(child)) :: Nil
case logical.FlatMapGroupsInPandas(grouping, func, output, child) =>
execution.python.FlatMapGroupsInPandasExec(grouping, func, output, planLater(child)) :: Nil
case logical.FlatMapGroupsInPandas(grouping, additionalGrouping, func, output, child) =>
execution.python.FlatMapGroupsInPandasExec(
grouping, additionalGrouping, func, output, planLater(child)) :: Nil
case logical.MapElements(f, _, _, objAttr, child) =>
execution.MapElementsExec(f, objAttr, planLater(child)) :: Nil
case logical.AppendColumns(f, _, _, in, out, child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,19 @@

package org.apache.spark.sql.execution.python

import java.io.File

import scala.collection.JavaConverters._

import org.apache.spark.TaskContext
import org.apache.spark.{SparkEnv, TaskContext}
import org.apache.spark.api.python.{ChainedPythonFunctions, PythonEvalType}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, Distribution, Partitioning}
import org.apache.spark.sql.execution.{GroupedIterator, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

/**
* Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
Expand All @@ -47,6 +50,7 @@ import org.apache.spark.sql.types.StructType
*/
case class FlatMapGroupsInPandasExec(
groupingAttributes: Seq[Attribute],
additionalGroupingAttributes: Seq[Attribute],
func: Expression,
output: Seq[Attribute],
child: SparkPlan)
Expand Down Expand Up @@ -80,27 +84,77 @@ case class FlatMapGroupsInPandasExec(
val sessionLocalTimeZone = conf.sessionLocalTimeZone
val pandasRespectSessionTimeZone = conf.pandasRespectSessionTimeZone

inputRDD.mapPartitionsInternal { iter =>
val grouped = if (groupingAttributes.isEmpty) {
Iterator(iter)
} else {
if (additionalGroupingAttributes.isEmpty) {
// Fast path if additional grouping attributes is empty

inputRDD.mapPartitionsInternal { iter =>
val grouped = if (groupingAttributes.isEmpty) {
Iterator(iter)
} else {
val groupedIter = GroupedIterator(iter, groupingAttributes, child.output)
val dropGrouping =
UnsafeProjection.create(child.output.drop(groupingAttributes.length), child.output)
groupedIter.map {
case (_, groupedRowIter) => groupedRowIter.map(dropGrouping)
}
}

val context = TaskContext.get()

val columnarBatchIter = new ArrowPythonRunner(
chainedFunc, bufferSize, reuseWorker,
PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema,
sessionLocalTimeZone, pandasRespectSessionTimeZone)
.compute(grouped, context.partitionId(), context)

columnarBatchIter
.flatMap(_.rowIterator.asScala)
.map(UnsafeProjection.create(output, output))
}
} else {
// If additionGroupingAttributes is not empty, join the grouping attributes with
// the udf output to get the final result

inputRDD.mapPartitionsInternal { iter =>
assert(groupingAttributes.nonEmpty)

val groupedIter = GroupedIterator(iter, groupingAttributes, child.output)

val context = TaskContext.get()

val queue = HybridRowQueue(context.taskMemoryManager(),
new File(Utils.getLocalDir(SparkEnv.get.conf)), additionalGroupingAttributes.length)
context.addTaskCompletionListener { _ =>
queue.close()
}
val additionalGroupingProj = UnsafeProjection.create(
additionalGroupingAttributes, groupingAttributes)
val dropGrouping =
UnsafeProjection.create(child.output.drop(groupingAttributes.length), child.output)
groupedIter.map {
case (_, groupedRowIter) => groupedRowIter.map(dropGrouping)
val grouped = groupedIter.map {
case (k, groupedRowIter) =>
val additionalGrouping = additionalGroupingProj(k)
queue.add(additionalGrouping)
(additionalGrouping, groupedRowIter.map(dropGrouping))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can return only groupedRowIter.map(dropGrouping).

}
}

val context = TaskContext.get()
val columnarBatchIter = new ArrowPythonRunner(
chainedFunc, bufferSize, reuseWorker,
PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema,
sessionLocalTimeZone, pandasRespectSessionTimeZone)
.compute(grouped.map(_._2), context.partitionId(), context)

val columnarBatchIter = new ArrowPythonRunner(
chainedFunc, bufferSize, reuseWorker,
PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema,
sessionLocalTimeZone, pandasRespectSessionTimeZone)
.compute(grouped, context.partitionId(), context)
val joinedRow = new JoinedRow
val outputProj = UnsafeProjection.create(output, output)

columnarBatchIter.flatMap(_.rowIterator.asScala).map(UnsafeProjection.create(output, output))
columnarBatchIter
.flatMap{ batchIter =>
val additionalGrouping = queue.remove()
batchIter.rowIterator().asScala.map { row =>
outputProj(joinedRow(additionalGrouping, row))
}
}
}
}
}
}