Skip to content
Prev Previous commit
Next Next commit
Review feedback.
Use monotonic time, plus other stylistic things.
  • Loading branch information
Marcelo Vanzin committed Jun 2, 2014
commit 0bb7b57ead82d109b74cbb40bed698f0b1507014
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
private val fs = Utils.getHadoopFileSystem(logDir)

// A timestamp of when the disk was last accessed to check for log updates
private var lastLogCheckTime = -1L
private var lastLogCheckTimeMs = -1L

// List of applications, in order from newest to oldest.
private val appList = new AtomicReference[Seq[ApplicationHistoryInfo]](Nil)
Expand All @@ -55,13 +55,13 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
private val logCheckingThread = new Thread("LogCheckingThread") {
override def run() = Utils.logUncaughtExceptions {
while (true) {
val now = System.currentTimeMillis
if (now - lastLogCheckTime > UPDATE_INTERVAL_MS) {
val now = getMonotonicTime()
if (now - lastLogCheckTimeMs > UPDATE_INTERVAL_MS) {
Thread.sleep(UPDATE_INTERVAL_MS)
} else {
// If the user has manually checked for logs recently, wait until
// UPDATE_INTERVAL_MS after the last check time
Thread.sleep(lastLogCheckTime + UPDATE_INTERVAL_MS - now)
Thread.sleep(lastLogCheckTimeMs + UPDATE_INTERVAL_MS - now)
}
checkForLogs()
}
Expand Down Expand Up @@ -108,13 +108,12 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
* applications that hasn't been updated since last time the logs were checked.
*/
def checkForLogs() = synchronized {
lastLogCheckTime = System.currentTimeMillis
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTime))
lastLogCheckTimeMs = getMonotonicTime()
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
try {
val logStatus = fs.listStatus(new Path(logDir))
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
val logInfos = logDirs
.sortBy { dir => getModificationTime(dir) }
.filter {
dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
}
Expand All @@ -125,7 +124,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
// For any application that either (i) is not listed or (ii) has changed since the last time
// the listing was created (defined by the log dir's modification time), load the app's info.
// Otherwise just reuse what's already in memory.
val newApps = new mutable.ListBuffer[ApplicationHistoryInfo]
val newApps = new mutable.ArrayBuffer[ApplicationHistoryInfo]
for (dir <- logInfos) {
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
Expand Down Expand Up @@ -198,4 +197,7 @@ class FsHistoryProvider(conf: SparkConf) extends ApplicationHistoryProvider
}
}

/** Returns the system's mononotically increasing time. */
private def getMonotonicTime() = System.nanoTime() / (1000 * 1000)

}
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,13 @@ class HistoryServer(
private val appLoader = new CacheLoader[String, SparkUI] {
override def load(key: String): SparkUI = {
val info = provider.getAppInfo(key)
if (info != null) {
info.ui.getSecurityManager.setUIAcls(uiAclsEnabled)
info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls)
attachSparkUI(info.ui)
info.ui
} else {
if (info == null) {
throw new NoSuchElementException()
}
info.ui.getSecurityManager.setUIAcls(uiAclsEnabled)
info.ui.getSecurityManager.setViewAcls(info.sparkUser, info.viewAcls)
attachSparkUI(info.ui)
info.ui
}
}

Expand Down