From f505521b6691f0e9dc51c1291903864dfce9ca33 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Tue, 5 Nov 2019 18:20:25 +0900 Subject: [PATCH 1/5] [SPARK-29755][CORE] Provide @JsonDeserialize for Option[Long] in LogInfo & AttemptInfoWrapper --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 70864d590988..394407ce1db6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} +import java.lang.{Long => JLong} import java.nio.file.Files import java.util.{Date, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} @@ -30,6 +31,7 @@ import scala.io.Source import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.hdfs.DistributedFileSystem @@ -1167,14 +1169,14 @@ private[history] case class LogInfo( appId: Option[String], attemptId: Option[String], fileSize: Long, - lastIndex: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) lastIndex: Option[Long], isComplete: Boolean) private[history] class AttemptInfoWrapper( val info: ApplicationAttemptInfo, val logPath: String, val fileSize: Long, - val lastIndex: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) val lastIndex: Option[Long], val adminAcls: Option[String], val viewAcls: Option[String], val adminAclsGroups: Option[String], From ae2e43c9061982a02176d12087110eed933633a9 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Wed, 6 Nov 2019 12:59:57 +0900 Subject: [PATCH 2/5] Add UT which fails on master branch and passes with the patch --- .../history/FsHistoryProviderSuite.scala | 52 +++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 281e6935de37..9dd78e66d066 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -25,6 +25,7 @@ import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.collection.JavaConverters._ import scala.concurrent.duration._ +import scala.runtime.BoxesRunTime import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.FileUtils @@ -52,6 +53,7 @@ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} import org.apache.spark.util.logging.DriverLogger + class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private var testDir: File = null @@ -1283,6 +1285,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assert(deserializedOldObj.isComplete === false) } + test("SPARK-29755 LogInfo should be serialized/deserialized by jackson properly") { + def assertSerDe(serializer: KVStoreScalaSerializer, info: LogInfo): Unit = { + val infoAfterSerDe = serializer.deserialize( + serializer.serialize(info), classOf[LogInfo]) + assert(infoAfterSerDe === info) + assertOptionAfterSerde(infoAfterSerDe.lastIndex, info.lastIndex) + } + + val serializer = new KVStoreScalaSerializer() + val logInfoWithIndexAsNone = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), + Some("attemptId"), 100, None, false) + assertSerDe(serializer, logInfoWithIndexAsNone) + + val logInfoWithIndex = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), + Some("attemptId"), 100, Some(3), false) + assertSerDe(serializer, logInfoWithIndex) + } + + test("SPARK-29755 AttemptInfoWrapper should be serialized/deserialized by jackson properly") { + def assertSerDe(serializer: KVStoreScalaSerializer, attempt: AttemptInfoWrapper): Unit = { + val attemptAfterSerDe = serializer.deserialize(serializer.serialize(attempt), + classOf[AttemptInfoWrapper]) + assert(attemptAfterSerDe.info === attempt.info) + // skip comparing some fields, as they've not triggered SPARK-29755 + assertOptionAfterSerde(attemptAfterSerDe.lastIndex, attempt.lastIndex) + } + + val serializer = new KVStoreScalaSerializer() + val appInfo = new ApplicationAttemptInfo(None, new Date(1), new Date(1), new Date(1), + 10, "spark", false, "dummy") + val attemptInfoWithIndexAsNone = new AttemptInfoWrapper(appInfo, "dummyPath", 10, None, + None, None, None, None) + assertSerDe(serializer, attemptInfoWithIndexAsNone) + + val attemptInfoWithIndex = new AttemptInfoWrapper(appInfo, "dummyPath", 10, Some(1), + None, None, None, None) + assertSerDe(serializer, attemptInfoWithIndex) + } + + private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = { + if (expected.isEmpty) { + assert(opt.isEmpty) + } else { + // The issue happens only the value in Option is being unboxed. Simple comparison sometimes + // doesn't go though unboxing the value, hence ClassCastException is not occurred. + // Here we ensure unboxing to Long succeeds. Please refer SPARK-29755 for more details. + assert(BoxesRunTime.unboxToLong(opt.get) === expected.get) + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: From 8ba70141d94c2bcd4b6ae39b345e05952b5b3f5e Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 7 Nov 2019 06:19:21 +0900 Subject: [PATCH 3/5] Reflect review comment --- .../spark/deploy/history/FsHistoryProviderSuite.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 9dd78e66d066..eb9232e09a6f 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1328,10 +1328,11 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { if (expected.isEmpty) { assert(opt.isEmpty) } else { - // The issue happens only the value in Option is being unboxed. Simple comparison sometimes - // doesn't go though unboxing the value, hence ClassCastException is not occurred. - // Here we ensure unboxing to Long succeeds. Please refer SPARK-29755 for more details. - assert(BoxesRunTime.unboxToLong(opt.get) === expected.get) + // The issue happens only the value in Option is being unboxed. Here we ensure unboxing + // to Long succeeds: even though IDE suggests `.toLong` is redundant, direct comparison + // doesn't trigger unboxing and passes even without SPARK-29755, so don't remove + // `.toLong` below. Please refer SPARK-29755 for more details. + assert(opt.get.toLong === expected.get.toLong) } } From c665983204f591bcf5a343b4afd611db37011854 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Fri, 8 Nov 2019 10:10:01 +0900 Subject: [PATCH 4/5] Reflect review comments --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 6 ++++-- .../spark/deploy/history/FsHistoryProviderSuite.scala | 5 +---- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 394407ce1db6..e2f3314bc859 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -1169,14 +1169,16 @@ private[history] case class LogInfo( appId: Option[String], attemptId: Option[String], fileSize: Long, - @JsonDeserialize(contentAs = classOf[JLong]) lastIndex: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) + lastIndex: Option[Long], isComplete: Boolean) private[history] class AttemptInfoWrapper( val info: ApplicationAttemptInfo, val logPath: String, val fileSize: Long, - @JsonDeserialize(contentAs = classOf[JLong]) val lastIndex: Option[Long], + @JsonDeserialize(contentAs = classOf[JLong]) + val lastIndex: Option[Long], val adminAcls: Option[String], val viewAcls: Option[String], val adminAclsGroups: Option[String], diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index eb9232e09a6f..9cedf49b8c0c 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -25,7 +25,6 @@ import java.util.zip.{ZipInputStream, ZipOutputStream} import scala.collection.JavaConverters._ import scala.concurrent.duration._ -import scala.runtime.BoxesRunTime import com.google.common.io.{ByteStreams, Files} import org.apache.commons.io.FileUtils @@ -53,7 +52,6 @@ import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo} import org.apache.spark.util.{Clock, JsonProtocol, ManualClock, Utils} import org.apache.spark.util.logging.DriverLogger - class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { private var testDir: File = null @@ -1287,8 +1285,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { test("SPARK-29755 LogInfo should be serialized/deserialized by jackson properly") { def assertSerDe(serializer: KVStoreScalaSerializer, info: LogInfo): Unit = { - val infoAfterSerDe = serializer.deserialize( - serializer.serialize(info), classOf[LogInfo]) + val infoAfterSerDe = serializer.deserialize(serializer.serialize(info), classOf[LogInfo]) assert(infoAfterSerDe === info) assertOptionAfterSerde(infoAfterSerDe.lastIndex, info.lastIndex) } From 698a6571e6f798b74d8469be32b41999c5397d60 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 9 Nov 2019 07:42:45 +0900 Subject: [PATCH 5/5] Reflect review comment --- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 9cedf49b8c0c..ed195dd44e91 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1325,7 +1325,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { if (expected.isEmpty) { assert(opt.isEmpty) } else { - // The issue happens only the value in Option is being unboxed. Here we ensure unboxing + // The issue happens only when the value in Option is being unboxed. Here we ensure unboxing // to Long succeeds: even though IDE suggests `.toLong` is redundant, direct comparison // doesn't trigger unboxing and passes even without SPARK-29755, so don't remove // `.toLong` below. Please refer SPARK-29755 for more details.