Skip to content
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -413,8 +413,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)

/**
* Comparison function that defines the sort order for application attempts within the same
* application. Order is: running attempts before complete attempts, running attempts sorted
* by start time, completed attempts sorted by end time.
* application. Order is: completed attempts before running attempts, running attempts sorted
* by start time showing whichever started first,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wording is weird. What does "showing" mean?

I'd say: "sorted by descending start time", "sorted by descending end time".

Note, also, that your current code sorts in ascending order, while I think it should be the opposite.

* completed attempts sorted by end time showing whichever ended first.
*
* Normally applications should have a single running attempt; but failure to call sc.stop()
* may cause multiple running attempts to show up.
Expand All @@ -425,9 +426,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
a1: FsApplicationAttemptInfo,
a2: FsApplicationAttemptInfo): Boolean = {
if (a1.completed == a2.completed) {
if (a1.completed) a1.endTime >= a2.endTime else a1.startTime >= a2.startTime
if (a1.completed) a1.endTime <= a2.endTime else a1.startTime <= a2.startTime
} else {
!a1.completed
a1.completed
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,12 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
appListAfterRename.size should be (1)
}

test("apps with multiple attempts") {
test("apps with multiple attempts with order") {
val provider = new FsHistoryProvider(createTestConf())

val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = false)
val attempt1 = newLogFile("app1", Some("attempt1"), inProgress = true)
writeFile(attempt1, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1")),
SparkListenerApplicationEnd(2L)
SparkListenerApplicationStart("app1", Some("app1"), 1L, "test", Some("attempt1"))
)

updateAndCheck(provider) { list =>
Expand All @@ -267,27 +266,26 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

val attempt2 = newLogFile("app1", Some("attempt2"), inProgress = true)
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2"))
SparkListenerApplicationStart("app1", Some("app1"), 2L, "test", Some("attempt2"))
)

updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (2)
list.head.attempts.head.attemptId should be (Some("attempt2"))
list.head.attempts.head.attemptId should be (Some("attempt1"))
}

val completedAttempt2 = newLogFile("app1", Some("attempt2"), inProgress = false)
attempt2.delete()
writeFile(attempt2, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt2")),
val attempt3 = newLogFile("app1", Some("attempt3"), inProgress = false)
writeFile(attempt3, true, None,
SparkListenerApplicationStart("app1", Some("app1"), 3L, "test", Some("attempt3")),
SparkListenerApplicationEnd(4L)
)

updateAndCheck(provider) { list =>
list should not be (null)
list.size should be (1)
list.head.attempts.size should be (2)
list.head.attempts.head.attemptId should be (Some("attempt2"))
list.head.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt3"))
}

val app2Attempt1 = newLogFile("app2", Some("attempt1"), inProgress = false)
Expand All @@ -299,7 +297,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
updateAndCheck(provider) { list =>
list.size should be (2)
list.head.attempts.size should be (1)
list.last.attempts.size should be (2)
list.last.attempts.size should be (3)
list.head.attempts.head.attemptId should be (Some("attempt1"))

list.foreach { case app =>
Expand Down Expand Up @@ -343,7 +341,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
updateAndCheck(provider) { list =>
list.size should be (1)
list.head.attempts.size should be (1)
list.head.attempts.head.attemptId should be (Some("attempt2"))
list.head.attempts.head.attemptId should be (Some("attempt1"))
}
assert(!log1.exists())

Expand Down