Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.deploy.history
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There are couple of unused imports at the beginning of the file. Maybe worth to clean them up.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's occurred from here I'll remove. If not, it might make other PRs be broken, so I might take it carefully.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since FsHistoryProvider.scala is one of the main target of this development (I mean not the PR) it would be good to clean it up somewhere. What do you think where should we do it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's OK to do with minor PR, but commiter could finally judge the worth. (treat my comment as 2 cents)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with this resolution. Such way we separate the concerns. I've created #26436, in case of disagreement it can be just dropped.


import java.io.{File, FileNotFoundException, IOException}
import java.lang.{Long => JLong}
import java.nio.file.Files
import java.util.{Date, ServiceLoader}
import java.util.concurrent.{ConcurrentHashMap, ExecutorService, Future, TimeUnit}
Expand All @@ -30,6 +31,7 @@ import scala.io.Source
import scala.xml.Node

import com.fasterxml.jackson.annotation.JsonIgnore
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import com.google.common.util.concurrent.MoreExecutors
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.hdfs.DistributedFileSystem
Expand Down Expand Up @@ -1167,13 +1169,15 @@ private[history] case class LogInfo(
appId: Option[String],
attemptId: Option[String],
fileSize: Long,
@JsonDeserialize(contentAs = classOf[JLong])
lastIndex: Option[Long],
isComplete: Boolean)

private[history] class AttemptInfoWrapper(
val info: ApplicationAttemptInfo,
val logPath: String,
val fileSize: Long,
@JsonDeserialize(contentAs = classOf[JLong])
val lastIndex: Option[Long],
val adminAcls: Option[String],
val viewAcls: Option[String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1283,6 +1283,56 @@ class FsHistoryProviderSuite extends SparkFunSuite with Matchers with Logging {
assert(deserializedOldObj.isComplete === false)
}

test("SPARK-29755 LogInfo should be serialized/deserialized by jackson properly") {
def assertSerDe(serializer: KVStoreScalaSerializer, info: LogInfo): Unit = {
val infoAfterSerDe = serializer.deserialize(serializer.serialize(info), classOf[LogInfo])
assert(infoAfterSerDe === info)
assertOptionAfterSerde(infoAfterSerDe.lastIndex, info.lastIndex)
}

val serializer = new KVStoreScalaSerializer()
val logInfoWithIndexAsNone = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"),
Some("attemptId"), 100, None, false)
assertSerDe(serializer, logInfoWithIndexAsNone)

val logInfoWithIndex = LogInfo("dummy", 0, LogType.EventLogs, Some("appId"),
Some("attemptId"), 100, Some(3), false)
assertSerDe(serializer, logInfoWithIndex)
}

test("SPARK-29755 AttemptInfoWrapper should be serialized/deserialized by jackson properly") {
def assertSerDe(serializer: KVStoreScalaSerializer, attempt: AttemptInfoWrapper): Unit = {
val attemptAfterSerDe = serializer.deserialize(serializer.serialize(attempt),
classOf[AttemptInfoWrapper])
assert(attemptAfterSerDe.info === attempt.info)
// skip comparing some fields, as they've not triggered SPARK-29755
assertOptionAfterSerde(attemptAfterSerDe.lastIndex, attempt.lastIndex)
}

val serializer = new KVStoreScalaSerializer()
val appInfo = new ApplicationAttemptInfo(None, new Date(1), new Date(1), new Date(1),
10, "spark", false, "dummy")
val attemptInfoWithIndexAsNone = new AttemptInfoWrapper(appInfo, "dummyPath", 10, None,
None, None, None, None)
assertSerDe(serializer, attemptInfoWithIndexAsNone)

val attemptInfoWithIndex = new AttemptInfoWrapper(appInfo, "dummyPath", 10, Some(1),
None, None, None, None)
assertSerDe(serializer, attemptInfoWithIndex)
}

private def assertOptionAfterSerde(opt: Option[Long], expected: Option[Long]): Unit = {
if (expected.isEmpty) {
assert(opt.isEmpty)
} else {
// The issue happens only when the value in Option is being unboxed. Here we ensure unboxing
// to Long succeeds: even though IDE suggests `.toLong` is redundant, direct comparison
// doesn't trigger unboxing and passes even without SPARK-29755, so don't remove
// `.toLong` below. Please refer SPARK-29755 for more details.
assert(opt.get.toLong === expected.get.toLong)
}
}

/**
* Asks the provider to check for logs and calls a function to perform checks on the updated
* app list. Example:
Expand Down