diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 70864d590988..e2f3314bc859 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.history import java.io.{File, FileNotFoundException, IOException} +import java.lang.{Long => JLong} import java.nio.file.Files import java.util.{Date, ServiceLoader} import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit} @@ -30,6 +31,7 @@ import scala.io.Source import scala.xml.Node import com.fasterxml.jackson.annotation.JsonIgnore +import com.fasterxml.jackson.databind.annotation.JsonDeserialize import com.google.common.util.concurrent.MoreExecutors import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.hdfs.DistributedFileSystem @@ -1167,6 +1169,7 @@ private[history] case class LogInfo( appId: Option[String], attemptId: Option[String], fileSize: Long, + @JsonDeserialize(contentAs = classOf[JLong]) lastIndex: Option[Long], isComplete: Boolean) @@ -1174,6 +1177,7 @@ private[history] class AttemptInfoWrapper( val info: ApplicationAttemptInfo, val logPath: String, val fileSize: Long, + @JsonDeserialize(contentAs = classOf[JLong]) val lastIndex: Option[Long], val adminAcls: Option[String], val viewAcls: Option[String], diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 281e6935de37..ed195dd44e91 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -1283,6 +1283,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging { assert(deserializedOldObj.isComplete === false) } + test("SPARK-29755 LogInfo should be serialized/deserialized by jackson properly") { + def assertSerDe(serializer: KVStoreScalaSerializer, info: LogInfo): Unit = { + val infoAfterSerDe = serializer.deserialize(serializer.serialize(info), classOf[LogInfo]) + assert(infoAfterSerDe === info) + assertOptionAfterSerde(infoAfterSerDe.lastIndex, info.lastIndex) + } + + val serializer = new KVStoreScalaSerializer() + val logInfoWithIndexAsNone = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), + Some("attemptId"), 100, None, false) + assertSerDe(serializer, logInfoWithIndexAsNone) + + val logInfoWithIndex = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"), + Some("attemptId"), 100, Some(3), false) + assertSerDe(serializer, logInfoWithIndex) + } + + test("SPARK-29755 AttemptInfoWrapper should be serialized/deserialized by jackson properly") { + def assertSerDe(serializer: KVStoreScalaSerializer, attempt: AttemptInfoWrapper): Unit = { + val attemptAfterSerDe = serializer.deserialize(serializer.serialize(attempt), + classOf[AttemptInfoWrapper]) + assert(attemptAfterSerDe.info === attempt.info) + // skip comparing some fields, as they've not triggered SPARK-29755 + assertOptionAfterSerde(attemptAfterSerDe.lastIndex, attempt.lastIndex) + } + + val serializer = new KVStoreScalaSerializer() + val appInfo = new ApplicationAttemptInfo(None, new Date(1), new Date(1), new Date(1), + 10, "spark", false, "dummy") + val attemptInfoWithIndexAsNone = new AttemptInfoWrapper(appInfo, "dummyPath", 10, None, + None, None, None, None) + assertSerDe(serializer, attemptInfoWithIndexAsNone) + + val attemptInfoWithIndex = new AttemptInfoWrapper(appInfo, "dummyPath", 10, Some(1), + None, None, None, None) + assertSerDe(serializer, attemptInfoWithIndex) + } + + private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = { + if (expected.isEmpty) { + assert(opt.isEmpty) + } else { + // The issue happens only when the value in Option is being unboxed. Here we ensure unboxing + // to Long succeeds: even though IDE suggests `.toLong` is redundant, direct comparison + // doesn't trigger unboxing and passes even without SPARK-29755, so don't remove + // `.toLong` below. Please refer SPARK-29755 for more details. + assert(opt.get.toLong === expected.get.toLong) + } + } + /** * Asks the provider to check for logs and calls a function to perform checks on the updated * app list. Example: