Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Address comments
  • Loading branch information
dongjoon-hyun committed Jul 22, 2025
commit ef9b833c27b088fdb8a63bf15859877bd69a1610
Original file line number Diff line number Diff line change
Expand Up @@ -321,13 +321,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
override def getLastUpdatedTime(): Long = lastScanTime.get()

override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX +
Utils.nameForAppAndAttempt(appId, attemptId)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RollingEventLogFilesWriter.getAppEventLogDirPath also considers logBaseDir, don't we need to consider it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we don't need logBaseDir at this info.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. AttemptInfoWrapper.logPath doesn't contain base dir. When FsHistoryProvider tried to read for attempt, it will append base dir.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems it's better to call existing method for preparing the log name. Otherwise, they might be out of sync unintentionally.

Suggested change
val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX +
Utils.nameForAppAndAttempt(appId, attemptId)
val logPath = RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX +
EventLogFileWriter.nameForAppAndAttempt(appId, attemptId)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Ya, I agree. For now, it's a simple alias, you are right for the future.

val app = try {
load(appId)
} catch {
case _: NoSuchElementException if this.conf.get(ON_DEMAND_ENABLED) =>
val name = Utils.nameForAppAndAttempt(appId, attemptId)
loadFromFallbackLocation(appId, attemptId,
RollingEventLogFilesWriter.EVENT_LOG_DIR_NAME_PREFIX + name)
case _: NoSuchElementException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that we are trying to push for usage of RollingEventLogFilesWriter as the new default but for users who have single event logs and if they try to get the UI for an app, will this functionality not break for them since EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED is true by default ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you elaborate on what we can break here, @thejdeep ?

will this functionality not break for them since EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED is true by default ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dummy metadata is added and cleaned up at FileNotFound exception immediate in this function as @mridulm requested. It works for both non-existing appId and SingleFile logs.

loadFromFallbackLocation(appId, attemptId, logPath)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if EVENT_LOG_ENABLE_ROLLING is disabled? Should we only do this if EVENT_LOG_ENABLE_ROLLING is enabled?

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, EVENT_LOG_ENABLE_ROLLING is per-application setting. This is SHS, @viirya .

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I proposed the config name spark.history.fs.update.onDemandEnabled at the first commit because this is SHS setting. However, it was revised during the review in order to be clear in the context of rolling.

- spark.history.fs.update.onDemandEnabled
+ spark.history.fs.eventLog.rolling.onDemandLoadEnabled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I see. A bit confusing setup to me. EVENT_LOG_ENABLE_ROLLING determines at the EventLoggingListener in SparkContext application. When FsHistoryProvider creates EventLogFileReader, it doesn't care about this config but decide it is single log or rolling log based on attempt lastIndex.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Jul 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it sounds confusing. Basically, it's the same for event log compression codec.

Since a Spark job can choose spark.eventLog.compress and spark.eventLog.compression.codec arbitrarily, we need to inference from the file name. It's inevitable because Writers(Spark job) and Reader(SHS) are independent.

case _: NoSuchElementException =>
return None
}
Expand All @@ -349,6 +349,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
createInMemoryStore(attempt)
}
} catch {
case _: FileNotFoundException if this.conf.get(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED) =>
if (app.attempts.head.info.appSparkVersion == "unknown") {
listing.synchronized {
listing.delete(classOf[ApplicationInfoWrapper], appId)
}
}
return None
Comment on lines +352 to +358
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I don't quite understand what this did. Seems loadFromFallbackLocation loads a dummy record from the log path into listing?

But what does this FileNotFoundException catch? Why it deletes the dummy record immediately?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review, @viirya .

Yes, this adds a dummy record (based on the user request) to proceed to load the actual file. However, if the actual file doesn't exist, FoundNotFoundException will be thrown. It means it's a user mistake. In that case, since we don't need this dummy record, we cleaned up.

case _: FileNotFoundException =>
return None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,6 @@ private[spark] object History {
.checkValue(v => v > 0, "The update batchSize should be a positive integer.")
.createWithDefault(Int.MaxValue)

val ON_DEMAND_ENABLED = ConfigBuilder("spark.history.fs.update.onDemandEnabled")
.version("4.1.0")
.doc("Whether to look up rolling event log locations on demand manner before listing files.")
.booleanConf
.createWithDefault(true)

val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.version("1.4.0")
.doc("Whether the History Server should periodically clean up event logs from storage")
Expand Down Expand Up @@ -165,6 +159,13 @@ private[spark] object History {
.doubleConf
.createWithDefault(0.7d)

val EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED =
ConfigBuilder("spark.history.fs.eventLog.rolling.onDemandLoadEnabled")
.doc("Whether to look up rolling event log locations on demand manner before listing files.")
.version("4.1.0")
.booleanConf
.createWithDefault(true)

val DRIVER_LOG_CLEANER_ENABLED = ConfigBuilder("spark.history.fs.driverlog.cleaner.enabled")
.version("3.0.0")
.doc("Specifies whether the History Server should periodically clean up driver logs from " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1645,7 +1645,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
withTempDir { dir =>
val conf = createTestConf(true)
conf.set(HISTORY_LOG_DIR, dir.getAbsolutePath)
conf.set(ON_DEMAND_ENABLED, onDemandEnabled)
conf.set(EVENT_LOG_ROLLING_ON_DEMAND_LOAD_ENABLED, onDemandEnabled)
val hadoopConf = SparkHadoopUtil.newConfiguration(conf)
val provider = new FsHistoryProvider(conf)

Expand All @@ -1656,9 +1656,14 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P
SparkListenerJobStart(1, 0, Seq.empty)), rollFile = false)
writer1.stop()

assert(provider.getListing().length === 0)
assert(dir.listFiles().length === 1)
assert(provider.getListing().length === 0)
assert(provider.getAppUI("app1", None).isDefined == onDemandEnabled)
assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0))

assert(dir.listFiles().length === 1)
assert(provider.getAppUI("nonexist", None).isEmpty)
assert(provider.getListing().length === (if (onDemandEnabled) 1 else 0))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This new line verifies the cleanup, @mridulm .


provider.stop()
}
Expand Down