Skip to content
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 54 additions & 25 deletions core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.status.api.v1
import org.apache.spark.ui.scope._
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
import org.apache.spark.util.kvstore.{InMemoryStore, KVStore, LevelDB}

/**
* A wrapper around a KVStore that provides methods for accessing the API data stored within.
Expand Down Expand Up @@ -148,11 +148,20 @@ private[spark] class AppStatusStore(
// cheaper for disk stores (avoids deserialization).
val count = {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
if (store.isInstanceOf[LevelDB]) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this code path need to be different for disk vs memory? this part seemed like it could work efficiently either way.

Copy link
Contributor Author

@shahidki31 shahidki31 Dec 3, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Now, for diskStore case, it finds "total tasks count" and inMemory case only "successful tasks count".

This 'count' is used to find quantileIndices for all the tasks metrics.

val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

For eg: Assume 200 tasks, out of which 100 success and 100 failed.

For diskStore case => indices = [0, 50, 100, 150, 199], count = 200
For InMemory case => indices = [0, 25, 50, 75, 99], count = 100

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you invert the check so you don't need to import LevelDB? Just to avoid importing more implementation details of the kvstore module into this class...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks

store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.EXEC_RUN_TIME)
.first(0L)
.closeableIterator()
} else {
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(TaskIndexNames.STATUS)
.first("SUCCESS")
.last("SUCCESS")
.closeableIterator()
}
) { it =>
var _count = 0L
while (it.hasNext()) {
Expand Down Expand Up @@ -221,29 +230,49 @@ private[spark] class AppStatusStore(
// stabilize once the stage finishes. It's also slow, especially with disk stores.
val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) }

// TODO Summary metrics needs to display all the successful tasks' metrics (SPARK-26119).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not ideal but it's a reasonable solution. Are you OK with it @vanzin ?

// For InMemory case, it is efficient to find using the following code. But for diskStore case
// we need an efficient solution to avoid deserialization time overhead. For that, we need to
// rework on the way indexing works, so that we can index by specific metrics for successful
// and failed tasks differently (would be tricky). Also would require changing the disk store
// version (to invalidate old stores).
def scanTasks(index: String)(fn: TaskDataWrapper => Long): IndexedSeq[Double] = {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
last
} else {
val diff = idx - currentIdx
currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
if (store.isInstanceOf[LevelDB]) {
Utils.tryWithResource(
store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.closeableIterator()
) { it =>
var last = Double.NaN
var currentIdx = -1L
indices.map { idx =>
if (idx == currentIdx) {
last
} else {
Double.NaN
val diff = idx - currentIdx
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

diff could be negative here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gengliangwang I will update the PR. Actually there are more changes required to fix this. Thanks

currentIdx = idx
if (it.skip(diff - 1)) {
last = fn(it.next()).toDouble
last
} else {
Double.NaN
}
}
}
}.toIndexedSeq
}
} else {
val quantileTasks = store.view(classOf[TaskDataWrapper])
.parent(stageKey)
.index(index)
.first(0L)
.asScala
.filter { _.status == "SUCCESS"} // Filter "SUCCESS" tasks
.toIndexedSeq

indices.map { index =>
fn(quantileTasks(index.toInt)).toDouble
}.toIndexedSeq
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,34 @@ class AppStatusStoreSuite extends SparkFunSuite {
assert(store.count(classOf[CachedQuantile]) === 2)
}

test("only successfull task have taskSummary") {
val store = new InMemoryStore()
(0 until 5).foreach { i => store.write(newTaskData(i, "FAILED")) }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: status = "FAILED" when param has a default value

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

val appStore = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles)
assert(appStore.size === 0)
}

test("summary should contain task metrics of only successfull tasks") {
val store = new InMemoryStore()

for (i <- 0 to 5) {
if (i % 2 == 1) {
store.write(newTaskData(i, "FAILED"))
} else {
store.write(newTaskData(i))
}
}

val summary = new AppStatusStore(store).taskSummary(stageId, attemptId, uiQuantiles).get

val values = Array(0.0, 2.0, 4.0)

val dist = new Distribution(values, 0, values.length).getQuantiles(uiQuantiles.sorted)
dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
assert(expected === actual)
}
}

private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
val store = new InMemoryStore()
val values = (0 until count).map { i =>
Expand All @@ -93,12 +121,11 @@ class AppStatusStoreSuite extends SparkFunSuite {
}
}

private def newTaskData(i: Int): TaskDataWrapper = {
private def newTaskData(i: Int, status: String = "SUCCESS"): TaskDataWrapper = {
new TaskDataWrapper(
i, i, i, i, i, i, i.toString, i.toString, i.toString, i.toString, false, Nil, None,
i, i, i, i, i, i, i.toString, i.toString, status, i.toString, false, Nil, None,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, i, i, i, i, i, i,
i, i, i, i, stageId, attemptId)
}

}