Skip to content

Conversation

@beliefer
Copy link
Contributor

@beliefer beliefer commented Nov 3, 2021

What changes were proposed in this pull request?

Currently,

val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
   namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.collect()
namedObservation.get

throws exception as follows:

15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

This PR will fix the issue. After the change,
assert(namedObservation.get === Map("percentile_approx_val" -> 49))
java.io.NotSerializableException will not happen.

Why are the changes needed?

Fix NotSerializableException when observe with TypedImperativeAggregate.

Does this PR introduce any user-facing change?

No. This PR change the implement of AggregatingAccumulator who uses serialize and deserialize of TypedImperativeAggregate now.

How was this patch tested?

New tests.

@github-actions github-actions bot added the SQL label Nov 3, 2021
@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49343/

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49343/

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49345/

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49345/

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Test build #144873 has finished for PR 34474 at commit 1eacf82.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* @param summaries underlying probabilistic data structure [[QuantileSummaries]].
*/
class PercentileDigest(private var summaries: QuantileSummaries) {
class PercentileDigest(private var summaries: QuantileSummaries) extends Serializable {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, there are special methods for (de-)serialization of PercentileDigest:

override def serialize(obj: PercentileDigest): Array[Byte] = {
ApproximatePercentile.serializer.serialize(obj)
}
override def deserialize(bytes: Array[Byte]): PercentileDigest = {
ApproximatePercentile.serializer.deserialize(bytes)
}

Do you know why the methods are used instead of extending of Serializable?

cc @cloud-fan

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just add the proper logic to observe? Adding serialization will fix this particular problem, but it won't work for other aggregates.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree if that's possible - I couldn't tell if this was something observe() should never have to pull back and does accidentally, or whether it is necessary to return metrics about percentile_approx somehow

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably forgot to add it...

Copy link
Contributor Author

@beliefer beliefer Nov 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MaxGekk @hvanhovell @srowen I updated the code and it will fix the issue for AggregatingAccumulator takes any TypedImperativeAggregate.

@SparkQA
Copy link

SparkQA commented Nov 3, 2021

Test build #144875 has finished for PR 34474 at commit a2f1fbb.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Nov 3, 2021

Looking at the implementation, it's never meant to be serialized by Java or Kryo, but is serialized 'manually' and internally by Spark. I think it must be related to observe() here, that somehow this object gets caught up in what is returned? I don't know a lot about observe().

It's possible the answer is that observe() should do something different, but not sure.

We can make this Serializable, but then need to implement methods for the Java serializer that would reuse the existing serializer code. That seems plausible.

@github-actions github-actions bot added the CORE label Nov 4, 2021

override def withBufferSerialized(): AggregatingAccumulator = {
if (!isAtDriverSide) {
val input = getOrCreateTempBuffer()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to create temp buffer here?

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49368/

*/
def value: OUT

// We assume that serialization of AccumulatorV2 runs on executor is not necessary.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// Serialize the buffer of this accumulator before sending back this accumulator to the driver.
// By default this method does nothing.

}

override def withBufferSerialized(): AggregatingAccumulator = {
if (!isAtDriverSide) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do assert(!isAtDriverSide)


/**
* In-place replaces SparkSQL internally supported underlying storage format (BinaryType),
* with the aggregation buffer object stored at buffer's index `mutableAggBufferOffset`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stored at buffer's index mutableAggBufferOffset describes the binary, should be put before with the aggregation buffer object ...

* with the aggregation buffer object stored at buffer's index `mutableAggBufferOffset`.
*
* This is only called when AggregatingAccumulator running on driver, after the framework
* shuffle in aggregate buffers.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is nothing to do with shuffle

val otherBuffer = agg.buffer
// If AggregatingAccumulator runs on driver,
// we should deserialize all TypedImperativeAggregate.
if (isAtDriverSide) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to add a new code. We can just change the existing code

while (i < typedImperatives.length) {
  typedImperatives(i).mergeBuffersObjects(buffer, otherBuffer)
  i += 1
}

to

if (atDriverSide) {
  while (i < typedImperatives.length) {
    // The input buffer stores serialized data
    typedImperatives(i).merge(buffer, otherBuffer)
    i += 1
  }
} else {
  while (i < typedImperatives.length) {
    // The input buffer stores deserialized object
    typedImperatives(i). mergeBuffersObjects(buffer, otherBuffer)
    i += 1
  }
}

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49368/

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49369/

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49369/

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Test build #144898 has finished for PR 34474 at commit 6c2f71c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@beliefer beliefer changed the title [SPARK-37203][SQL] Fix NotSerializableException when observe with percentile_approx [SPARK-37203][SQL] Fix NotSerializableException when observe with TypedImperativeAggregate Nov 4, 2021
val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test a DataFrame with no data?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49373/

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49373/

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Test build #144899 has finished for PR 34474 at commit 84f9657.

  • This patch fails from timeout after a configured wait of 500m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 4, 2021

Test build #144903 has finished for PR 34474 at commit 5e7afac.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49384/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/49384/

@SparkQA
Copy link

SparkQA commented Nov 5, 2021

Test build #144913 has finished for PR 34474 at commit acddbf3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

@beliefer can you update the PR description? Yes. PercentileDigest extends Serializable. this is wrong now.

@beliefer
Copy link
Contributor Author

beliefer commented Nov 5, 2021

@beliefer can you update the PR description? Yes. PercentileDigest extends Serializable. this is wrong now.

OK

@cloud-fan
Copy link
Contributor

Yes. This PR let AggregatingAccumulator uses serialize and deserialize of TypedImperativeAggregate.

Is it a user-facing change?

@beliefer
Copy link
Contributor Author

beliefer commented Nov 5, 2021

Yes. This PR let AggregatingAccumulator uses serialize and deserialize of TypedImperativeAggregate.

Is it a user-facing change?

No

@cloud-fan cloud-fan closed this in 3f3201a Nov 5, 2021
@beliefer
Copy link
Contributor Author

beliefer commented Nov 5, 2021

@cloud-fan Thanks for your hard work!@srowen @MaxGekk @hvanhovell , thank you for review.

cloud-fan pushed a commit that referenced this pull request Nov 5, 2021
…edImperativeAggregate

Currently,
```
val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
   namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.collect()
namedObservation.get
```
throws exception as follows:
```
15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```
This PR will fix the issue. After the change,
`assert(namedObservation.get === Map("percentile_approx_val" -> 49))`
`java.io.NotSerializableException` will not happen.

Fix `NotSerializableException` when observe with `TypedImperativeAggregate`.

No. This PR change the implement of `AggregatingAccumulator` who uses serialize and deserialize of `TypedImperativeAggregate` now.

New tests.

Closes #34474 from beliefer/SPARK-37203.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 3f3201a)
Signed-off-by: Wenchen Fan <[email protected]>
cloud-fan pushed a commit that referenced this pull request Nov 5, 2021
…edImperativeAggregate

Currently,
```
val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
   namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.collect()
namedObservation.get
```
throws exception as follows:
```
15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```
This PR will fix the issue. After the change,
`assert(namedObservation.get === Map("percentile_approx_val" -> 49))`
`java.io.NotSerializableException` will not happen.

Fix `NotSerializableException` when observe with `TypedImperativeAggregate`.

No. This PR change the implement of `AggregatingAccumulator` who uses serialize and deserialize of `TypedImperativeAggregate` now.

New tests.

Closes #34474 from beliefer/SPARK-37203.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 3f3201a)
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

thanks, merging to master/3.2/3.1!

sunchao pushed a commit to sunchao/spark that referenced this pull request Dec 8, 2021
…edImperativeAggregate

Currently,
```
val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
   namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.collect()
namedObservation.get
```
throws exception as follows:
```
15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```
This PR will fix the issue. After the change,
`assert(namedObservation.get === Map("percentile_approx_val" -> 49))`
`java.io.NotSerializableException` will not happen.

Fix `NotSerializableException` when observe with `TypedImperativeAggregate`.

No. This PR change the implement of `AggregatingAccumulator` who uses serialize and deserialize of `TypedImperativeAggregate` now.

New tests.

Closes apache#34474 from beliefer/SPARK-37203.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 3f3201a)
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 11c4745)
Signed-off-by: Dongjoon Hyun <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
…edImperativeAggregate

Currently,
```
val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
   namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.collect()
namedObservation.get
```
throws exception as follows:
```
15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```
This PR will fix the issue. After the change,
`assert(namedObservation.get === Map("percentile_approx_val" -> 49))`
`java.io.NotSerializableException` will not happen.

Fix `NotSerializableException` when observe with `TypedImperativeAggregate`.

No. This PR change the implement of `AggregatingAccumulator` who uses serialize and deserialize of `TypedImperativeAggregate` now.

New tests.

Closes apache#34474 from beliefer/SPARK-37203.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 3f3201a)
Signed-off-by: Wenchen Fan <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Feb 22, 2022
…edImperativeAggregate

Currently,
```
val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
   namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.collect()
namedObservation.get
```
throws exception as follows:
```
15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```
This PR will fix the issue. After the change,
`assert(namedObservation.get === Map("percentile_approx_val" -> 49))`
`java.io.NotSerializableException` will not happen.

Fix `NotSerializableException` when observe with `TypedImperativeAggregate`.

No. This PR change the implement of `AggregatingAccumulator` who uses serialize and deserialize of `TypedImperativeAggregate` now.

New tests.

Closes apache#34474 from beliefer/SPARK-37203.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 3f3201a)
Signed-off-by: Wenchen Fan <[email protected]>
catalinii pushed a commit to lyft/spark that referenced this pull request Mar 4, 2022
…edImperativeAggregate

Currently,
```
val namedObservation = Observation("named")

val df = spark.range(100)
val observed_df = df.observe(
   namedObservation, percentile_approx($"id", lit(0.5), lit(100)).as("percentile_approx_val"))

observed_df.collect()
namedObservation.get
```
throws exception as follows:
```
15:16:27.994 ERROR org.apache.spark.util.Utils: Exception encountered
java.io.NotSerializableException: org.apache.spark.sql.catalyst.expressions.aggregate.ApproximatePercentile$PercentileDigest
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2(TaskResult.scala:55)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$2$adapted(TaskResult.scala:55)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.scheduler.DirectTaskResult.$anonfun$writeExternal$1(TaskResult.scala:55)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1434)
	at org.apache.spark.scheduler.DirectTaskResult.writeExternal(TaskResult.scala:51)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1459)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1430)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:44)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:616)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
```
This PR will fix the issue. After the change,
`assert(namedObservation.get === Map("percentile_approx_val" -> 49))`
`java.io.NotSerializableException` will not happen.

Fix `NotSerializableException` when observe with `TypedImperativeAggregate`.

No. This PR change the implement of `AggregatingAccumulator` who uses serialize and deserialize of `TypedImperativeAggregate` now.

New tests.

Closes apache#34474 from beliefer/SPARK-37203.

Authored-by: Jiaan Geng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 3f3201a)
Signed-off-by: Wenchen Fan <[email protected]>
var i = 0
// AggregatingAccumulator runs on executor, we should serialize all TypedImperativeAggregate.
while (i < typedImperatives.length) {
typedImperatives(i).serializeAggregateBufferInPlace(buffer)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have an example in which we get NPE here because buffer is null, see SPARK-40535

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will investigate it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants