File tree Expand file tree Collapse file tree 2 files changed +23
-2
lines changed
main/scala/org/apache/spark/util
test/scala/org/apache/spark/util Expand file tree Collapse file tree 2 files changed +23
-2
lines changed Original file line number Diff line number Diff line change @@ -486,7 +486,9 @@ class LegacyAccumulatorWrapper[R, T](
486486 param : org.apache.spark.AccumulableParam [R , T ]) extends AccumulatorV2 [T , R ] {
487487 private [spark] var _value = initialValue // Current value on driver
488488
489- override def isZero : Boolean = _value == param.zero(initialValue)
489+ @ transient private lazy val _zero = param.zero(initialValue)
490+
491+ override def isZero : Boolean = _value.asInstanceOf [AnyRef ].eq(_zero.asInstanceOf [AnyRef ])
490492
491493 override def copy (): LegacyAccumulatorWrapper [R , T ] = {
492494 val acc = new LegacyAccumulatorWrapper (initialValue, param)
@@ -495,7 +497,7 @@ class LegacyAccumulatorWrapper[R, T](
495497 }
496498
497499 override def reset (): Unit = {
498- _value = param.zero(initialValue)
500+ _value = _zero
499501 }
500502
501503 override def add (v : T ): Unit = _value = param.addAccumulator(_value, v)
Original file line number Diff line number Diff line change 1818package org .apache .spark .util
1919
2020import org .apache .spark ._
21+ import org .apache .spark .serializer .JavaSerializer
2122
2223class AccumulatorV2Suite extends SparkFunSuite {
2324
@@ -162,4 +163,22 @@ class AccumulatorV2Suite extends SparkFunSuite {
162163 assert(acc3.isZero)
163164 assert(acc3.value === " " )
164165 }
166+
167+ test(" LegacyAccumulatorWrapper with AccumulatorParam that has no equals/hashCode" ) {
168+ class MyData (val i : Int ) extends Serializable
169+ val param = new AccumulatorParam [MyData ] {
170+ override def zero (initialValue : MyData ): MyData = new MyData (0 )
171+ override def addInPlace (r1 : MyData , r2 : MyData ): MyData = new MyData (r1.i + r2.i)
172+ }
173+
174+ val acc = new LegacyAccumulatorWrapper (new MyData (0 ), param)
175+ acc.metadata = AccumulatorMetadata (
176+ AccumulatorContext .newId(),
177+ Some (" test" ),
178+ countFailedValues = false )
179+ AccumulatorContext .register(acc)
180+
181+ val ser = new JavaSerializer (new SparkConf ).newInstance()
182+ ser.serialize(acc)
183+ }
165184}
You can’t perform that action at this time.
0 commit comments