Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
145 commits
Select commit Hold shift + click to select a range
48eab1d
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
b348901
[SPARK-23808][SQL] Set default Spark session in test-only spark sessi…
jose-torres Mar 30, 2018
df05fb6
[SPARK-23743][SQL] Changed a comparison logic from containing 'slf4j'…
jongyoul Mar 30, 2018
b02e76c
[SPARK-23727][SQL] Support for pushing down filters for DateType in p…
yucai Mar 30, 2018
5b5a36e
Roll forward "[SPARK-23096][SS] Migrate rate source to V2"
jose-torres Mar 30, 2018
bc8d093
[SPARK-23500][SQL][FOLLOWUP] Fix complex type simplification rules to…
gatorsmile Mar 30, 2018
ae91720
[SPARK-23640][CORE] Fix hadoop config may override spark config
wangyum Mar 30, 2018
15298b9
[SPARK-23827][SS] StreamingJoinExec should ensure that input data is …
tdas Mar 30, 2018
529f847
[SPARK-23040][CORE][FOLLOW-UP] Avoid double wrap result Iterator.
jiangxb1987 Mar 31, 2018
5e24fc6
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
44a9f8e
[SPARK-15009][PYTHON][FOLLOWUP] Add default param checks for CountVec…
BryanCutler Apr 2, 2018
6151f29
[SPARK-23825][K8S] Requesting memory + memory overhead for pod memory
dvogelbacher Apr 2, 2018
fe2b7a4
[SPARK-23285][K8S] Add a config property for specifying physical exec…
liyinan926 Apr 2, 2018
a7c19d9
[SPARK-23713][SQL] Cleanup UnsafeWriter and BufferHolder classes
kiszk Apr 2, 2018
28ea4e3
[SPARK-23834][TEST] Wait for connection before disconnect in Launcher…
Apr 2, 2018
a135182
[SPARK-23690][ML] Add handleinvalid to VectorAssembler
Apr 2, 2018
441d0d0
[SPARK-19964][CORE] Avoid reading from remote repos in SparkSubmitSuite.
Apr 3, 2018
8020f66
[MINOR][DOC] Fix a few markdown typos
Apr 3, 2018
7cf9fab
[MINOR][CORE] Show block manager id when remove RDD/Broadcast fails.
jiangxb1987 Apr 3, 2018
66a3a5a
[SPARK-23099][SS] Migrate foreach sink to DataSourceV2
jose-torres Apr 3, 2018
1035aaa
[SPARK-23587][SQL] Add interpreted execution for MapObjects expression
viirya Apr 3, 2018
359375e
[SPARK-23809][SQL] Active SparkSession should be set by getOrCreate
ericl Apr 4, 2018
5cfd5fa
[SPARK-23802][SQL] PropagateEmptyRelation can leave query plan in unr…
Apr 4, 2018
16ef6ba
[SPARK-23826][TEST] TestHiveSparkSession should set default session
gatorsmile Apr 4, 2018
5197562
[SPARK-21351][SQL] Update nullability based on children's output
maropu Apr 4, 2018
a355236
[SPARK-23583][SQL] Invoke should support interpreted execution
kiszk Apr 4, 2018
cccaaa1
[SPARK-23668][K8S] Add config option for passing through k8s Pod.spec…
Apr 4, 2018
d8379e5
[SPARK-23838][WEBUI] Running SQL query is displayed as "completed" in…
gengliangwang Apr 4, 2018
d3bd043
[SPARK-23637][YARN] Yarn might allocate more resource if a same execu…
Apr 4, 2018
c5c8b54
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean e…
viirya Apr 5, 2018
1822ecd
[SPARK-23582][SQL] StaticInvoke should support interpreted execution
kiszk Apr 5, 2018
b2329fb
Revert "[SPARK-23593][SQL] Add interpreted execution for InitializeJa…
hvanhovell Apr 5, 2018
d9ca1c9
[SPARK-23593][SQL] Add interpreted execution for InitializeJavaBean e…
viirya Apr 5, 2018
4807d38
[SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks to choose se…
kiszk Apr 6, 2018
f2ac087
[SPARK-23870][ML] Forward RFormula handleInvalid Param to VectorAssem…
Apr 6, 2018
d65e531
[SPARK-23823][SQL] Keep origin in transformExpression
Apr 6, 2018
249007e
[SPARK-19724][SQL] create a managed table with an existed default tab…
gengliangwang Apr 6, 2018
6ade5cb
[MINOR][DOC] Fix some typos and grammar issues
dsakuma Apr 6, 2018
9452401
[SPARK-23822][SQL] Improve error message for Parquet schema mismatches
yuchenhuo Apr 6, 2018
d766ea2
[SPARK-23861][SQL][DOC] Clarify default window frame with and without…
icexelloss Apr 6, 2018
c926acf
[SPARK-23882][CORE] UTF8StringSuite.writeToOutputStreamUnderflow() is…
kiszk Apr 6, 2018
d23a805
[SPARK-23859][ML] Initial PR for Instrumentation improvements: UUID a…
MrBago Apr 6, 2018
b6935ff
[SPARK-10399][SPARK-23879][HOTFIX] Fix Java lint errors
kiszk Apr 6, 2018
e998250
[SPARK-23828][ML][PYTHON] PySpark StringIndexerModel should have cons…
huaxingao Apr 6, 2018
6ab134c
[SPARK-21898][ML][FOLLOWUP] Fix Scala 2.12 build.
ueshin Apr 6, 2018
2c1fe64
[SPARK-23847][PYTHON][SQL] Add asc_nulls_first, asc_nulls_last to PyS…
huaxingao Apr 8, 2018
6a73457
[SPARK-23849][SQL] Tests for the samplingRatio option of JSON datasource
MaxGekk Apr 8, 2018
710a68c
[SPARK-23892][TEST] Improve converge and fix lint error in UTF8String…
kiszk Apr 8, 2018
8d40a79
[SPARK-23893][CORE][SQL] Avoid possible integer overflow in multiplic…
kiszk Apr 8, 2018
32471ba
Fix typo in Python docstring kinesis example
Apr 9, 2018
d81f29e
[SPARK-23881][CORE][TEST] Fix flaky test JobCancellationSuite."interr…
jiangxb1987 Apr 9, 2018
10f45bb
[SPARK-23816][CORE] Killed tasks should ignore FetchFailures.
squito Apr 9, 2018
7c1654e
[SPARK-22856][SQL] Add wrappers for codegen output and nullability
viirya Apr 9, 2018
252468a
[SPARK-14681][ML] Provide label/impurity stats for spark.ml decision …
WeichenXu123 Apr 9, 2018
61b7247
[INFRA] Close stale PRs.
Apr 9, 2018
f94f362
[SPARK-23947][SQL] Add hashUTF8String convenience method to hasher cl…
rednaxelafx Apr 10, 2018
6498884
[SPARK-23898][SQL] Simplify add & subtract code generation
hvanhovell Apr 10, 2018
95034af
[SPARK-23841][ML] NodeIdCache should unpersist the last cached nodeId…
zhengruifeng Apr 10, 2018
3323b15
[SPARK-23864][SQL] Add unsafe object writing to UnsafeWriter
hvanhovell Apr 10, 2018
e179658
[SPARK-19724][SQL][FOLLOW-UP] Check location of managed table when ig…
gengliangwang Apr 10, 2018
adb222b
[SPARK-23751][ML][PYSPARK] Kolmogorov-Smirnoff test Python API in pys…
WeichenXu123 Apr 10, 2018
4f1e8b9
[SPARK-23871][ML][PYTHON] add python api for VectorAssembler handleIn…
huaxingao Apr 10, 2018
7c7570d
[SPARK-23944][ML] Add the set method for the two LSHModel
lu-wang-dl Apr 11, 2018
c7622be
[SPARK-23847][FOLLOWUP][PYTHON][SQL] Actually test [desc|acs]_nulls_[…
HyukjinKwon Apr 11, 2018
87611bb
[MINOR][DOCS] Fix R documentation generation instruction for roxygen2
HyukjinKwon Apr 11, 2018
c604d65
[SPARK-23951][SQL] Use actual java class instead of string representa…
hvanhovell Apr 11, 2018
271c891
[SPARK-23960][SQL][MINOR] Mark HashAggregateExec.bufVars as transient
rednaxelafx Apr 11, 2018
653fe02
[SPARK-6951][CORE] Speed up parsing of event logs during listing.
Apr 11, 2018
3cb8204
[SPARK-22941][CORE] Do not exit JVM when submit fails with in-process…
Apr 11, 2018
75a1830
[SPARK-22883] ML test for StructuredStreaming: spark.ml.feature, I-M
jkbradley Apr 11, 2018
9d960de
typo rawPredicition changed to rawPrediction
JBauerKogentix Apr 11, 2018
e904dfa
Revert "[SPARK-23960][SQL][MINOR] Mark HashAggregateExec.bufVars as t…
gatorsmile Apr 12, 2018
6a2289e
[SPARK-23962][SQL][TEST] Fix race in currentExecutionIds().
squito Apr 12, 2018
0b19122
[SPARK-23762][SQL] UTF8StringBuffer uses MemoryBlock
kiszk Apr 12, 2018
0f93b91
[SPARK-23751][FOLLOW-UP] fix build for scala-2.12
WeichenXu123 Apr 12, 2018
682002b
[SPARK-23867][SCHEDULER] use droppedCount in logWarning
Apr 13, 2018
14291b0
[SPARK-23748][SS] Fix SS continuous process doesn't support SubqueryA…
jerryshao Apr 13, 2018
ab7b961
[SPARK-23942][PYTHON][SQL] Makes collect in PySpark as action for a q…
HyukjinKwon Apr 13, 2018
1018be4
[SPARK-23971] Should not leak Spark sessions across test suites
ericl Apr 13, 2018
4b07036
[SPARK-23815][CORE] Spark writer dynamic partition overwrite mode may…
Apr 13, 2018
0323e61
[SPARK-23905][SQL] Add UDF weekday
yucai Apr 13, 2018
a83ae0d
[SPARK-22839][K8S] Refactor to unify driver and executor pod builder …
mccheah Apr 13, 2018
4dfd746
[SPARK-23896][SQL] Improve PartitioningAwareFileIndex
gengliangwang Apr 13, 2018
25892f3
[SPARK-23375][SQL] Eliminate unneeded Sort in Optimizer
mgaido91 Apr 13, 2018
558f31b
[SPARK-23963][SQL] Properly handle large number of columns in query o…
bersprockets Apr 13, 2018
cbb41a0
[SPARK-23966][SS] Refactoring all checkpoint file writing logic in a …
tdas Apr 13, 2018
73f2853
[SPARK-23979][SQL] MultiAlias should not be a CodegenFallback
viirya Apr 14, 2018
c096493
[SPARK-23956][YARN] Use effective RPC port in AM registration
gerashegalov Apr 16, 2018
6931022
[SPARK-23917][SQL] Add array_max function
mgaido91 Apr 16, 2018
083cf22
[SPARK-21033][CORE][FOLLOW-UP] Update Spillable
wangyum Apr 16, 2018
5003736
[SPARK-9312][ML] Add RawPrediction, numClasses, and numFeatures for O…
lu-wang-dl Apr 16, 2018
0461482
[SPARK-21088][ML] CrossValidator, TrainValidationSplit support collec…
WeichenXu123 Apr 16, 2018
fd990a9
[SPARK-23873][SQL] Use accessors in interpreted LambdaVariable
viirya Apr 16, 2018
14844a6
[SPARK-23918][SQL] Add array_min function
mgaido91 Apr 17, 2018
1cc66a0
[SPARK-23687][SS] Add a memory source for continuous processing.
jose-torres Apr 17, 2018
05ae747
[SPARK-23747][STRUCTURED STREAMING] Add EpochCoordinator unit tests
Apr 17, 2018
30ffb53
[SPARK-23875][SQL] Add IndexedSeq wrapper for ArrayData
viirya Apr 17, 2018
0a9172a
[SPARK-23835][SQL] Add not-null check to Tuples' arguments deserializ…
mgaido91 Apr 17, 2018
ed4101d
[SPARK-22676] Avoid iterating all partition paths when spark.sql.hive…
Apr 17, 2018
3990daa
[SPARK-23948] Trigger mapstage's job listener in submitMissingTasks
Apr 17, 2018
f39e82c
[SPARK-23986][SQL] freshName can generate non-unique names
mgaido91 Apr 17, 2018
1ca3c50
[SPARK-21741][ML][PYSPARK] Python API for DataFrame-based multivariat…
WeichenXu123 Apr 17, 2018
5fccdae
[SPARK-22968][DSTREAM] Throw an exception on partition revoking issue
jerryshao Apr 18, 2018
1e3b876
[SPARK-21479][SQL] Outer join filter pushdown in null supplying table…
maryannxue Apr 18, 2018
310a8cd
[SPARK-23341][SQL] define some standard options for data source v2
cloud-fan Apr 18, 2018
cce4694
[SPARK-24002][SQL] Task not serializable caused by org.apache.parquet…
gatorsmile Apr 18, 2018
f81fa47
[SPARK-23926][SQL] Extending reverse function to support ArrayType ar…
Apr 18, 2018
f09a9e9
[SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might g…
ueshin Apr 18, 2018
a906647
[SPARK-23875][SQL][FOLLOWUP] Add IndexedSeq wrapper for ArrayData
viirya Apr 18, 2018
0c94e48
[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky
gaborgsomogyi Apr 18, 2018
8bb0df2
[SPARK-24014][PYSPARK] Add onStreamingStarted method to StreamingList…
viirya Apr 19, 2018
d5bec48
[SPARK-23919][SQL] Add array_position function
kiszk Apr 19, 2018
46bb2b5
[SPARK-23924][SQL] Add element_at function
kiszk Apr 19, 2018
1b08c43
[SPARK-23584][SQL] NewInstance should support interpreted execution
maropu Apr 19, 2018
e134165
[SPARK-23588][SQL] CatalystToExternalMap should support interpreted e…
maropu Apr 19, 2018
9e10f69
[SPARK-22676][FOLLOW-UP] fix code style for test.
Apr 19, 2018
d96c3e3
[SPARK-21811][SQL] Fix the inconsistency behavior when finding the wi…
jiangxb1987 Apr 19, 2018
0deaa52
[SPARK-24021][CORE] fix bug in BlacklistTracker's updateBlacklistForF…
Ngone51 Apr 19, 2018
6e19f76
[SPARK-23989][SQL] exchange should copy data before non-serialized sh…
cloud-fan Apr 19, 2018
a471880
[SPARK-24026][ML] Add Power Iteration Clustering to spark.ml
wangmiao1981 Apr 19, 2018
9ea8d3d
[SPARK-22362][SQL] Add unit test for Window Aggregate Functions
attilapiros Apr 19, 2018
e55953b
[SPARK-24022][TEST] Make SparkContextSuite not flaky
gaborgsomogyi Apr 19, 2018
b3fde5a
[SPARK-23877][SQL] Use filter predicates to prune partitions in metad…
rdblue Apr 20, 2018
e6b4660
[SPARK-23736][SQL] Extending the concat function to support array col…
Apr 20, 2018
074a7f9
[SPARK-23588][SQL][FOLLOW-UP] Resolve a map builder method per execut…
maropu Apr 20, 2018
0dd97f6
[SPARK-23595][SQL] ValidateExternalType should support interpreted ex…
maropu Apr 20, 2018
1d758dc
Revert "[SPARK-23775][TEST] Make DataFrameRangeSuite not flaky"
Apr 20, 2018
32b4bcd
[SPARK-24029][CORE] Set SO_REUSEADDR on listen sockets.
Apr 21, 2018
7bc853d
[SPARK-24033][SQL] Fix Mismatched of Window Frame specifiedwindowfram…
gatorsmile Apr 21, 2018
ae8a388
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
c48085a
[SPARK-23799][SQL] FilterEstimation.evaluateInSet produces devision b…
Apr 22, 2018
c3a86fa
[SPARK-10399][SPARK-23879][FOLLOWUP][CORE] Free unused off-heap memor…
kiszk Apr 23, 2018
f70f46d
[SPARK-23877][SQL][FOLLOWUP] use PhysicalOperation to simplify the ha…
cloud-fan Apr 23, 2018
d87d30e
[SPARK-23564][SQL] infer additional filters from constraints for join…
cloud-fan Apr 23, 2018
afbdf42
[SPARK-23589][SQL] ExternalMapToCatalyst should support interpreted e…
maropu Apr 23, 2018
293a0f2
[Spark-24024][ML] Fix poisson deviance calculations in GLM to handle …
tengpeng Apr 23, 2018
448d248
[SPARK-21168] KafkaRDD should always set kafka clientId.
liu-zhaokun Apr 23, 2018
770add8
[SPARK-23004][SS] Ensure StateStore.commit is called only once in a s…
tdas Apr 23, 2018
e82cb68
[SPARK-11237][ML] Add pmml export for k-means in Spark ML
holdenk Apr 23, 2018
c8f3ac6
[SPARK-23888][CORE] correct the comment of hasAttemptOnHost()
Ngone51 Apr 23, 2018
efcfc64
SPARK-23429: Add executor memory metrics to heartbeat and expose in e…
edwinalu Mar 9, 2018
b24f041
modify MimaExcludes.scala to filter changes to SparkListenerExecutorM…
edwinalu Apr 2, 2018
9d9c248
Address code review comments, change event logging to stage end.
edwinalu Apr 22, 2018
bbe1a82
Merge branch 'SPARK-23429' of https://github.com/edwinalu/spark into …
edwinalu Apr 23, 2018
8ae0126
fix MimaExcludes.scala
edwinalu Apr 23, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-6951][CORE] Speed up parsing of event logs during listing.
This change introduces two optimizations to help speed up generation
of listing data when parsing events logs.

The first one allows the parser to be stopped when enough data to
create the listing entry has been read. This is currently the start
event plus environment info, to capture UI ACLs. If the end event is
needed, the code will skip to the end of the log to try to find that
information, instead of parsing the whole log file.

Unfortunately this works better with uncompressed logs. Skipping bytes
on compressed logs only saves the work of parsing lines and some events,
so not a lot of gains are observed.

The second optimization deals with in-progress logs. It works in two
ways: first, it completely avoids parsing the rest of the log for
these apps when enough listing data is read. This, unlike the above,
also speeds things up for compressed logs, since only the very beginning
of the log has to be read.

On top of that, the code that decides whether to re-parse logs to get
updated listing data will ignore in-progress applications until they've
completed.

Both optimizations can be disabled but are enabled by default.

I tested this on some fake event logs to see the effect. I created
500 logs of about 60M each (so ~30G uncompressed; each log was 1.7M
when compressed with zstd). Below, C = completed, IP = in-progress,
the size means the amount of data re-parsed at the end of logs
when necessary.

```
            none/C   none/IP   zstd/C   zstd/IP
On / 16k      2s       2s       22s       2s
On / 1m       3s       2s       24s       2s
Off          1.1m     1.1m      26s      24s
```

This was with 4 threads on a single local SSD. As expected from the
previous explanations, there are considerable gains for in-progress
logs, and for uncompressed logs, but not so much when looking at the
full compressed log.

As a side note, I removed the custom code to get the scan time by
creating a file on HDFS; since file mod times are not used to detect
changed logs anymore, local time is enough for the current use of
the SHS.

Author: Marcelo Vanzin <[email protected]>

Closes #20952 from vanzin/SPARK-6951.
  • Loading branch information
Marcelo Vanzin authored and squito committed Apr 11, 2018
commit 653fe02415a537299e15f92b56045569864b6183
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark.deploy.history

import java.io.{File, FileNotFoundException, IOException}
import java.util.{Date, ServiceLoader, UUID}
import java.util.{Date, ServiceLoader}
import java.util.concurrent.{ExecutorService, TimeUnit}
import java.util.zip.{ZipEntry, ZipOutputStream}

import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.io.Source
import scala.util.Try
import scala.xml.Node

Expand Down Expand Up @@ -58,10 +59,10 @@ import org.apache.spark.util.kvstore._
*
* == How new and updated attempts are detected ==
*
* - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any
* entries in the log dir whose modification time is greater than the last scan time
* are considered new or updated. These are replayed to create a new attempt info entry
* and update or create a matching application info element in the list of applications.
* - New attempts are detected in [[checkForLogs]]: the log dir is scanned, and any entries in the
* log dir whose size changed since the last scan time are considered new or updated. These are
* replayed to create a new attempt info entry and update or create a matching application info
* element in the list of applications.
* - Updated attempts are also found in [[checkForLogs]] -- if the attempt's log file has grown, the
* attempt is replaced by another one with a larger log size.
*
Expand Down Expand Up @@ -125,6 +126,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)

private val storePath = conf.get(LOCAL_STORE_DIR).map(new File(_))
private val fastInProgressParsing = conf.get(FAST_IN_PROGRESS_PARSING)

// Visible for testing.
private[history] val listing: KVStore = storePath.map { path =>
Expand Down Expand Up @@ -402,13 +404,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
*/
private[history] def checkForLogs(): Unit = {
try {
val newLastScanTime = getNewLastScanTime()
val newLastScanTime = clock.getTimeMillis()
logDebug(s"Scanning $logDir with lastScanTime==$lastScanTime")

val updated = Option(fs.listStatus(new Path(logDir))).map(_.toSeq).getOrElse(Nil)
.filter { entry =>
!entry.isDirectory() &&
// FsHistoryProvider generates a hidden file which can't be read. Accidentally
// FsHistoryProvider used to generate a hidden file which can't be read. Accidentally
// reading a garbage file is safe, but we would log an error which can be scary to
// the end-user.
!entry.getPath().getName().startsWith(".") &&
Expand All @@ -417,15 +419,24 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
.filter { entry =>
try {
val info = listing.read(classOf[LogInfo], entry.getPath().toString())
if (info.fileSize < entry.getLen()) {
// Log size has changed, it should be parsed.
true
} else {

if (info.appId.isDefined) {
// If the SHS view has a valid application, update the time the file was last seen so
// that the entry is not deleted from the SHS listing.
if (info.appId.isDefined) {
listing.write(info.copy(lastProcessed = newLastScanTime))
// that the entry is not deleted from the SHS listing. Also update the file size, in
// case the code below decides we don't need to parse the log.
listing.write(info.copy(lastProcessed = newLastScanTime, fileSize = entry.getLen()))
}

if (info.fileSize < entry.getLen()) {
if (info.appId.isDefined && fastInProgressParsing) {
// When fast in-progress parsing is on, we don't need to re-parse when the
// size changes, but we do need to invalidate any existing UIs.
invalidateUI(info.appId.get, info.attemptId)
false
} else {
true
}
} else {
false
}
} catch {
Expand All @@ -449,7 +460,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val tasks = updated.map { entry =>
try {
replayExecutor.submit(new Runnable {
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime)
override def run(): Unit = mergeApplicationListing(entry, newLastScanTime, true)
})
} catch {
// let the iteration over the updated entries break, since an exception on
Expand Down Expand Up @@ -542,25 +553,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private[history] def getNewLastScanTime(): Long = {
val fileName = "." + UUID.randomUUID().toString
val path = new Path(logDir, fileName)
val fos = fs.create(path)

try {
fos.close()
fs.getFileStatus(path).getModificationTime
} catch {
case e: Exception =>
logError("Exception encountered when attempting to update last scan time", e)
lastScanTime.get()
} finally {
if (!fs.delete(path, true)) {
logWarning(s"Error deleting ${path}")
}
}
}

override def writeEventLogs(
appId: String,
attemptId: Option[String],
Expand Down Expand Up @@ -607,7 +599,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
/**
* Replay the given log file, saving the application in the listing db.
*/
protected def mergeApplicationListing(fileStatus: FileStatus, scanTime: Long): Unit = {
protected def mergeApplicationListing(
fileStatus: FileStatus,
scanTime: Long,
enableOptimizations: Boolean): Unit = {
val eventsFilter: ReplayEventsFilter = { eventString =>
eventString.startsWith(APPL_START_EVENT_PREFIX) ||
eventString.startsWith(APPL_END_EVENT_PREFIX) ||
Expand All @@ -616,32 +611,118 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

val logPath = fileStatus.getPath()
val appCompleted = isCompleted(logPath.getName())
val reparseChunkSize = conf.get(END_EVENT_REPARSE_CHUNK_SIZE)

// Enable halt support in listener if:
// - app in progress && fast parsing enabled
// - skipping to end event is enabled (regardless of in-progress state)
val shouldHalt = enableOptimizations &&
((!appCompleted && fastInProgressParsing) || reparseChunkSize > 0)

val bus = new ReplayListenerBus()
val listener = new AppListingListener(fileStatus, clock)
val listener = new AppListingListener(fileStatus, clock, shouldHalt)
bus.addListener(listener)
replay(fileStatus, bus, eventsFilter = eventsFilter)

val (appId, attemptId) = listener.applicationInfo match {
case Some(app) =>
// Invalidate the existing UI for the reloaded app attempt, if any. See LoadedAppUI for a
// discussion on the UI lifecycle.
synchronized {
activeUIs.get((app.info.id, app.attempts.head.info.attemptId)).foreach { ui =>
ui.invalidate()
ui.ui.store.close()

logInfo(s"Parsing $logPath for listing data...")
Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in =>
bus.replay(in, logPath.toString, !appCompleted, eventsFilter)
}

// If enabled above, the listing listener will halt parsing when there's enough information to
// create a listing entry. When the app is completed, or fast parsing is disabled, we still need
// to replay until the end of the log file to try to find the app end event. Instead of reading
// and parsing line by line, this code skips bytes from the underlying stream so that it is
// positioned somewhere close to the end of the log file.
//
// Because the application end event is written while some Spark subsystems such as the
// scheduler are still active, there is no guarantee that the end event will be the last
// in the log. So, to be safe, the code uses a configurable chunk to be re-parsed at
// the end of the file, and retries parsing the whole log later if the needed data is
// still not found.
//
// Note that skipping bytes in compressed files is still not cheap, but there are still some
// minor gains over the normal log parsing done by the replay bus.
//
// This code re-opens the file so that it knows where it's skipping to. This isn't as cheap as
// just skipping from the current position, but there isn't a a good way to detect what the
// current position is, since the replay listener bus buffers data internally.
val lookForEndEvent = shouldHalt && (appCompleted || !fastInProgressParsing)
if (lookForEndEvent && listener.applicationInfo.isDefined) {
Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in =>
val target = fileStatus.getLen() - reparseChunkSize
if (target > 0) {
logInfo(s"Looking for end event; skipping $target bytes from $logPath...")
var skipped = 0L
while (skipped < target) {
skipped += in.skip(target - skipped)
}
}

val source = Source.fromInputStream(in).getLines()

// Because skipping may leave the stream in the middle of a line, read the next line
// before replaying.
if (target > 0) {
source.next()
}

bus.replay(source, logPath.toString, !appCompleted, eventsFilter)
}
}

logInfo(s"Finished parsing $logPath")

listener.applicationInfo match {
case Some(app) if !lookForEndEvent || app.attempts.head.info.completed =>
// In this case, we either didn't care about the end event, or we found it. So the
// listing data is good.
invalidateUI(app.info.id, app.attempts.head.info.attemptId)
addListing(app)
(Some(app.info.id), app.attempts.head.info.attemptId)
listing.write(LogInfo(logPath.toString(), scanTime, Some(app.info.id),
app.attempts.head.info.attemptId, fileStatus.getLen()))

// For a finished log, remove the corresponding "in progress" entry from the listing DB if
// the file is really gone.
if (appCompleted) {
val inProgressLog = logPath.toString() + EventLoggingListener.IN_PROGRESS
try {
// Fetch the entry first to avoid an RPC when it's already removed.
listing.read(classOf[LogInfo], inProgressLog)
if (!fs.isFile(new Path(inProgressLog))) {
listing.delete(classOf[LogInfo], inProgressLog)
}
} catch {
case _: NoSuchElementException =>
}
}

case Some(_) =>
// In this case, the attempt is still not marked as finished but was expected to. This can
// mean the end event is before the configured threshold, so call the method again to
// re-parse the whole log.
logInfo(s"Reparsing $logPath since end event was not found.")
mergeApplicationListing(fileStatus, scanTime, false)

case _ =>
// If the app hasn't written down its app ID to the logs, still record the entry in the
// listing db, with an empty ID. This will make the log eligible for deletion if the app
// does not make progress after the configured max log age.
(None, None)
listing.write(LogInfo(logPath.toString(), scanTime, None, None, fileStatus.getLen()))
}
}

/**
* Invalidate an existing UI for a given app attempt. See LoadedAppUI for a discussion on the
* UI lifecycle.
*/
private def invalidateUI(appId: String, attemptId: Option[String]): Unit = {
synchronized {
activeUIs.get((appId, attemptId)).foreach { ui =>
ui.invalidate()
ui.ui.store.close()
}
}
listing.write(LogInfo(logPath.toString(), scanTime, appId, attemptId, fileStatus.getLen()))
}

/**
Expand Down Expand Up @@ -696,29 +777,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

/**
* Replays the events in the specified log file on the supplied `ReplayListenerBus`.
* `ReplayEventsFilter` determines what events are replayed.
*/
private def replay(
eventLog: FileStatus,
bus: ReplayListenerBus,
eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
val logPath = eventLog.getPath()
val isCompleted = !logPath.getName().endsWith(EventLoggingListener.IN_PROGRESS)
logInfo(s"Replaying log path: $logPath")
// Note that the eventLog may have *increased* in size since when we grabbed the filestatus,
// and when we read the file here. That is OK -- it may result in an unnecessary refresh
// when there is no update, but will not result in missing an update. We *must* prevent
// an error the other way -- if we report a size bigger (ie later) than the file that is
// actually read, we may never refresh the app. FileStatus is guaranteed to be static
// after it's created, so we get a file size that is no bigger than what is actually read.
Utils.tryWithResource(EventLoggingListener.openEventLog(logPath, fs)) { in =>
bus.replay(in, logPath.toString, !isCompleted, eventsFilter)
logInfo(s"Finished parsing $logPath")
}
}

/**
* Rebuilds the application state store from its event log.
*/
Expand All @@ -741,8 +799,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
} replayBus.addListener(listener)

try {
replay(eventLog, replayBus)
val path = eventLog.getPath()
logInfo(s"Parsing $path to re-build UI...")
Utils.tryWithResource(EventLoggingListener.openEventLog(path, fs)) { in =>
replayBus.replay(in, path.toString(), maybeTruncated = !isCompleted(path.toString()))
}
trackingStore.close(false)
logInfo(s"Finished parsing $path")
} catch {
case e: Exception =>
Utils.tryLogNonFatalError {
Expand Down Expand Up @@ -881,6 +944,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

private def isCompleted(name: String): Boolean = {
!name.endsWith(EventLoggingListener.IN_PROGRESS)
}

}

private[history] object FsHistoryProvider {
Expand Down Expand Up @@ -945,11 +1012,17 @@ private[history] class ApplicationInfoWrapper(

}

private[history] class AppListingListener(log: FileStatus, clock: Clock) extends SparkListener {
private[history] class AppListingListener(
log: FileStatus,
clock: Clock,
haltEnabled: Boolean) extends SparkListener {

private val app = new MutableApplicationInfo()
private val attempt = new MutableAttemptInfo(log.getPath().getName(), log.getLen())

private var gotEnvUpdate = false
private var halted = false

override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
app.id = event.appId.orNull
app.name = event.appName
Expand All @@ -958,6 +1031,8 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
attempt.startTime = new Date(event.time)
attempt.lastUpdated = new Date(clock.getTimeMillis())
attempt.sparkUser = event.sparkUser

checkProgress()
}

override def onApplicationEnd(event: SparkListenerApplicationEnd): Unit = {
Expand All @@ -968,11 +1043,18 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
}

override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
val allProperties = event.environmentDetails("Spark Properties").toMap
attempt.viewAcls = allProperties.get("spark.ui.view.acls")
attempt.adminAcls = allProperties.get("spark.admin.acls")
attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")
// Only parse the first env update, since any future changes don't have any effect on
// the ACLs set for the UI.
if (!gotEnvUpdate) {
val allProperties = event.environmentDetails("Spark Properties").toMap
attempt.viewAcls = allProperties.get("spark.ui.view.acls")
attempt.adminAcls = allProperties.get("spark.admin.acls")
attempt.viewAclsGroups = allProperties.get("spark.ui.view.acls.groups")
attempt.adminAclsGroups = allProperties.get("spark.admin.acls.groups")

gotEnvUpdate = true
checkProgress()
}
}

override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
Expand All @@ -989,6 +1071,17 @@ private[history] class AppListingListener(log: FileStatus, clock: Clock) extends
}
}

/**
* Throws a halt exception to stop replay if enough data to create the app listing has been
* read.
*/
private def checkProgress(): Unit = {
if (haltEnabled && !halted && app.id != null && gotEnvUpdate) {
halted = true
throw new HaltReplayException()
}
}

private class MutableApplicationInfo {
var id: String = null
var name: String = null
Expand Down
Loading