Skip to content

Conversation

@icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Apr 16, 2018

What changes were proposed in this pull request?

This PR enables using a grouped aggregate pandas UDFs as window functions. The semantics is the same as using SQL aggregation function as window functions.

       >>> from pyspark.sql.functions import pandas_udf, PandasUDFType
       >>> from pyspark.sql import Window
       >>> df = spark.createDataFrame(
       ...     [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
       ...     ("id", "v"))
       >>> @pandas_udf("double", PandasUDFType.GROUPED_AGG)
       ... def mean_udf(v):
       ...     return v.mean()
       >>> w = Window.partitionBy('id')
       >>> df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
       +---+----+------+
       | id|   v|mean_v|
       +---+----+------+
       |  1| 1.0|   1.5|
       |  1| 2.0|   1.5|
       |  2| 3.0|   6.0|
       |  2| 5.0|   6.0|
       |  2|10.0|   6.0|
       +---+----+------+

The scope of this PR is somewhat limited in terms of:
(1) Only supports unbounded window, which acts essentially as group by.
(2) Only supports aggregation functions, not "transform" like window functions (n -> n mapping)

Both of these are left as future work. Especially, (1) needs careful thinking w.r.t. how to pass rolling window data to python efficiently. (2) is a bit easier but does require more changes therefore I think it's better to leave it as a separate PR.

How was this patch tested?

WindowPandasUDFTests

@icexelloss icexelloss force-pushed the SPARK-22239-window-udf branch from 6a964d4 to 9fdcfe6 Compare April 16, 2018 22:09
@SparkQA
Copy link

SparkQA commented Apr 16, 2018

Test build #89416 has finished for PR 21082 at commit 9fdcfe6.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class WindowInPandasExec(

@icexelloss icexelloss changed the title [SPARK-22239][SQL][Python][WIP] Enable grouped aggregate pandas UDFs as window functions [SPARK-22239][SQL][Python][WIP] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames Apr 17, 2018
@SparkQA
Copy link

SparkQA commented Apr 17, 2018

Test build #89462 has finished for PR 21082 at commit dfdb03f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89584 has finished for PR 21082 at commit a2825bf.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 19, 2018

Test build #89586 has finished for PR 21082 at commit 67b3dd0.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@icexelloss icexelloss changed the title [SPARK-22239][SQL][Python][WIP] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames [SPARK-22239][SQL][Python] Enable grouped aggregate pandas UDFs as window functions with unbounded window frames Apr 19, 2018
@icexelloss
Copy link
Contributor Author

cc @BryanCutler @ueshin @HyukjinKwon @viirya

cc @yhuai because of window related changes.

This PR is ready for review now

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89595 has finished for PR 21082 at commit 85159b8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89597 has finished for PR 21082 at commit 29b0395.

  • This patch fails from timeout after a configured wait of `300m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Apr 20, 2018

From a very quick look, the flakiness looks global.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89612 has finished for PR 21082 at commit 29b0395.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Apr 20, 2018

retest this please.

@SparkQA
Copy link

SparkQA commented Apr 20, 2018

Test build #89616 has finished for PR 21082 at commit 29b0395.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because an early analysis exception is thrown by rule ExtractWindowExpressions

Copy link
Contributor Author

@icexelloss icexelloss Apr 21, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is unrelated, but I figured its shouldn't hurt to add an array test in GroupedAggPandasUDFTests..

@SparkQA
Copy link

SparkQA commented Apr 22, 2018

Test build #89682 has finished for PR 21082 at commit 657a6a5.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 23, 2018

Test build #89693 has finished for PR 21082 at commit 27158d9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@icexelloss
Copy link
Contributor Author

Hey @HyukjinKwon @ueshin @BryanCutler I've fixed the tests and I think the PR is in good shape for review now. Could you please take a look when you have time? Thanks!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:class:`pyspark.sql.Window`?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: shows.

@HyukjinKwon
Copy link
Member

Will take a close look soon within this weekend as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't have PandasUDFType.WINDOW_AGG and a pandas udf defined as PandasUDFType.GROUPED_AGG can be both used with groupby and Window?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly. The idea is that the producer of the UDF can produce a grouped agg udf, such as weighted mean, and the consumer can use the UDF in both groupby and window, similar to how SQL aggregation function work.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent style.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do this analysis check in Analyzer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to do this in Analyzer, then we would carry the WindowFunctionType in the logical plan.

I did it this way to avoid changing the logical node. I am open to add WindowFunctionType to the logical plan though. What do other people think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(BTW:

.reduceLeft {
  ...
}

)

Copy link
Member

@HyukjinKwon HyukjinKwon Apr 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icexelloss, actually should we keep this note? I think this is matched with https://spark.apache.org/docs/latest/sql-programming-guide.html#supported-sql-types which we documented there and SQLConf.

Probably, just leaving a link could be fine. Removing out is okay to me too. I think just adding a note for all the Pandas udfs works too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am leaning towards keeping this in the API doc and maybe make sql-programming-guide link to this.

I think most user would look for API docs first rather than sql-programming-guide, so it's probably a bit more convenient to have it here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, I think that works too. I left a comment only because it looked mismatched with this api doc and the sql programming guide.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

indentation :-)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: indent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would do

else {

}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inlined

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

p{ -> p {

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

but just for clarification, @icexelloss, do you have a WIP work or plan to support bounded ones too?

@SparkQA
Copy link

SparkQA commented Jun 6, 2018

Test build #91499 has finished for PR 21082 at commit 1c6b5d5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@icexelloss
Copy link
Contributor Author

@HyukjinKwon Thanks for the review! I will address the comments shortly.

And yes, I will work on bounded windows on top of this PR.

@icexelloss
Copy link
Contributor Author

Thanks @HyukjinKwon for your review! @ueshin Do you want to take another look too?

@HyukjinKwon
Copy link
Member

Yea, let's leave this open for few more days in case someone has more comments.

@SparkQA
Copy link

SparkQA commented Jun 8, 2018

Test build #91574 has finished for PR 21082 at commit 6350408.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ueshin
Copy link
Member

ueshin commented Jun 8, 2018

LGTM.

@HyukjinKwon
Copy link
Member

@icexelloss, mind resolving the conflict?

@icexelloss icexelloss force-pushed the SPARK-22239-window-udf branch from 6350408 to 328b2c4 Compare June 12, 2018 17:41
@SparkQA
Copy link

SparkQA commented Jun 12, 2018

Test build #91718 has finished for PR 21082 at commit 328b2c4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in 9786ce6 Jun 13, 2018
@icexelloss
Copy link
Contributor Author

Thanks everyone for the review!

@adsk2050
Copy link

Hello! this is great work! Thank you for contributing. This code will enable to run functions on window, which take in pd.Series -> Any.

I am wondering if GROUPED_MAP pandas UDF as window functions is also in pipeline or not?
(Basically pd.Series -> pd.Series over Window.)
For example:

from pyspark.sql import functions as F
from pyspark.sql.types import *

def doCoolStuff(df: pd.DataFrame) -> pd.DataFrame:
  events = df["event"].to_list()
  count = 1
  sets = []
  for event in events:
    sets.append(str(count))
    if event=="buy":
      count+=1   
  df["coolStuff"] = pd.Series(data=sets)
  return df

df = spark.createDataFrame(pd.DataFrame([[1, random.choice(list(range(10))), i, random.random()] for i in range(100)], columns=["user_id", "source_id", "epoch_timestamp", "event_prob"]))\
.withColumn("event", F.when(F.col("event_prob")>F.lit(0.9), "buy").otherwise("view"))\
.withColumn("coolStuff", F.lit(""))\
.persist()

doCoolStuffPDUDF = F.pandas_udf(
  f=doCoolStuff,
  returnType=df.schema,
  functionType=F.PandasUDFType.GROUPED_MAP)

df\
.orderBy(F.col("epoch_timestamp"))\
.groupby("user_id", "source_id")\
.apply(doCoolStuffPDUDF)\
.orderBy(F.col("user_id"), F.col("source_id"), F.col("epoch_timestamp").desc())\
.display()

This could simplified to:

from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

def doCoolStuff(events: pd.Series) -> pd.Series:
  count = 1
  sets = []
  for event in events:
    sets.append(str(count))
    if event=="buy":
      count+=1   
  return pd.Series(data=sets)

doCoolStuffPDUDF = F.pandas_udf(
  f=doCoolStuff,
  returnType=StringType(),
  functionType=F.PandasUDFType.GROUPED_MAP)

df = spark.createDataFrame(pd.DataFrame([[1, random.choice(list(range(10))), i, random.random()] for i in range(100)], columns=["user_id", "source_id", "epoch_timestamp", "event_prob"]))\
.withColumn("event", F.when(F.col("event_prob")>F.lit(0.9), "buy").otherwise("view"))\
.withColumn("coolStuff", doCoolStuffPDUDF(F.col("event"))\
                                        .over(Window.partitionBy("user_id", "source_id").orderBy(F.col("epoch_timestamp"))\
.orderBy(F.col("user_id"), F.col("source_id"), F.col("epoch_timestamp").desc())\
.persist()

df.display()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants