Skip to content

Conversation

@zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Sep 17, 2022

What changes were proposed in this pull request?

implement a new expression CollectTopK, which uses Array instead of BoundedPriorityQueue in ser/deser

Why are the changes needed?

Reduce the shuffle size of ALS in prediction

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing testsuites

@zhengruifeng
Copy link
Contributor Author

take the ALSExample for example:

import org.apache.spark.ml.recommendation._

case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)

def parseRating(str: String): Rating = {
    val fields = str.split("::")
    assert(fields.size == 4)
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
}

val ratings = spark.read.textFile("data/mllib/als/sample_movielens_ratings.txt").map(parseRating).toDF()

val als = new ALS().setMaxIter(1).setRegParam(0.01).setUserCol("userId").setItemCol("movieId").setRatingCol("rating")

val model = als.fit(ratings)

model.recommendForAllItems(10).collect()

before:
image

after:
image

the shuffle size in this case was reduced from 298.4 KiB to 130.3 KiB

@zhengruifeng zhengruifeng changed the title [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS.recommend Sep 17, 2022
@zhengruifeng zhengruifeng changed the title [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS.recommend [SPARK-40476][ML][SQL] Reduce the shuffle size of ALS Sep 17, 2022
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @zhengruifeng .

  • If you don't mind, could you make an independent PR moving TopByKeyAggregator to CollectTopK because that is orthogonal from Reduce the shuffle size of ALS?
  • In addition, we need a test coverage for CollectTopK because we remove TopByKeyAggregatorSuite.

@zhengruifeng
Copy link
Contributor Author

@dongjoon-hyun

could you make an independent PR moving TopByKeyAggregator to CollectTopK because that is orthogonal from Reduce the shuffle size of ALS?

It is just the moving from TopByKeyAggregator to CollectTopK that reduce the shuffle size, since the ser/deser is optimized in CollectTopK, let me update the PR description

In addition, we need a test coverage for CollectTopK because we remove TopByKeyAggregatorSuite.

Sure, will update soon

@dongjoon-hyun
Copy link
Member

Thanks. If the PR title is clear, +1 for that.

@zhengruifeng
Copy link
Contributor Author

cc @srowen @WeichenXu123

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The naming is not clear,
Why not call it collect_top_k ?

and we can add it into spark.sql.functions collect_top_k

Comment on lines 504 to 505
Copy link
Contributor

@WeichenXu123 WeichenXu123 Sep 19, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can define a spark sql function and wrap this part within the function, like:

def collect_top_k(ratingColumn, outputColumn) = {
   CollectOrdered(struct(ratingColumn, outputColumn).expr, num, true).toAggregateExpression(false)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I think we don't want to expose it, so mark it private[spark]

*/
def collect_set(columnName: String): Column = collect_set(Column(columnName))

private[spark] def collect_top_k(e: Column, num: Int, reverse: Boolean): Column =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: shall we make it public ? It might be a useful function.

We don't need to do it in this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know, I also think it's useful and may further use it in Pandas-API-on-Spark.
But I don't know whether it is suitable to be public @cloud-fan @HyukjinKwon

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's keep it private for now.

@srowen
Copy link
Member

srowen commented Sep 22, 2022

Merged to master

@srowen srowen closed this in 0867845 Sep 22, 2022
@zhengruifeng zhengruifeng deleted the sql_collect_topk branch September 23, 2022 00:04
@zhengruifeng
Copy link
Contributor Author

Thanks for the reviews!

@WeichenXu123
Copy link
Contributor

Thanks! :)

srowen pushed a commit that referenced this pull request Oct 11, 2022
### What changes were proposed in this pull request?
Reduce the shuffle size of ALS by using `Array[V]` instead of `BoundedPriorityQueue[V]` in ser/deser
this is a corresponding change of #37918 on the `.mllib` side

### Why are the changes needed?
Reduce the shuffle size of ALS

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
existing UT

Closes #38203 from zhengruifeng/ml_topbykey.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants