Skip to content

Commit 4c388bc

Browse files
HeartSaVioRHyukjinKwon
authored andcommitted
[SPARK-24485][SS] Measure and log elapsed time for filesystem operations in HDFSBackedStateStoreProvider
## What changes were proposed in this pull request? This patch measures and logs elapsed time for each operation which communicate with file system (mostly remote HDFS in production) in HDFSBackedStateStoreProvider to help investigating any latency issue. ## How was this patch tested? Manually tested. Author: Jungtaek Lim <kabhwan@gmail.com> Closes apache#21506 from HeartSaVioR/SPARK-24485.
1 parent 3352d6f commit 4c388bc

File tree

3 files changed

+62
-41
lines changed

3 files changed

+62
-41
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import java.nio.file.Files
3131
import java.security.SecureRandom
3232
import java.util.{Locale, Properties, Random, UUID}
3333
import java.util.concurrent._
34+
import java.util.concurrent.TimeUnit.NANOSECONDS
3435
import java.util.concurrent.atomic.AtomicBoolean
3536
import java.util.zip.GZIPInputStream
3637

@@ -434,7 +435,7 @@ private[spark] object Utils extends Logging {
434435
new URI("file:///" + rawFileName).getPath.substring(1)
435436
}
436437

437-
/**
438+
/**
438439
* Download a file or directory to target directory. Supports fetching the file in a variety of
439440
* ways, including HTTP, Hadoop-compatible filesystems, and files on a standard filesystem, based
440441
* on the URL parameter. Fetching directories is only supported from Hadoop-compatible
@@ -507,6 +508,14 @@ private[spark] object Utils extends Logging {
507508
targetFile
508509
}
509510

511+
/** Records the duration of running `body`. */
512+
def timeTakenMs[T](body: => T): (T, Long) = {
513+
val startTime = System.nanoTime()
514+
val result = body
515+
val endTime = System.nanoTime()
516+
(result, math.max(NANOSECONDS.toMillis(endTime - startTime), 0))
517+
}
518+
510519
/**
511520
* Download `in` to `tempFile`, then move it to `destFile`.
512521
*

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,10 @@
1818
package org.apache.spark.sql.execution.streaming.state
1919

2020
import java.io._
21-
import java.nio.channels.ClosedChannelException
2221
import java.util.Locale
2322

2423
import scala.collection.JavaConverters._
2524
import scala.collection.mutable
26-
import scala.util.Random
2725
import scala.util.control.NonFatal
2826

2927
import com.google.common.io.ByteStreams
@@ -280,38 +278,49 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
280278
if (loadedCurrentVersionMap.isDefined) {
281279
return loadedCurrentVersionMap.get
282280
}
283-
val snapshotCurrentVersionMap = readSnapshotFile(version)
284-
if (snapshotCurrentVersionMap.isDefined) {
285-
synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
286-
return snapshotCurrentVersionMap.get
287-
}
288281

289-
// Find the most recent map before this version that we can.
290-
// [SPARK-22305] This must be done iteratively to avoid stack overflow.
291-
var lastAvailableVersion = version
292-
var lastAvailableMap: Option[MapType] = None
293-
while (lastAvailableMap.isEmpty) {
294-
lastAvailableVersion -= 1
282+
logWarning(s"The state for version $version doesn't exist in loadedMaps. " +
283+
"Reading snapshot file and delta files if needed..." +
284+
"Note that this is normal for the first batch of starting query.")
295285

296-
if (lastAvailableVersion <= 0) {
297-
// Use an empty map for versions 0 or less.
298-
lastAvailableMap = Some(new MapType)
299-
} else {
300-
lastAvailableMap =
301-
synchronized { loadedMaps.get(lastAvailableVersion) }
302-
.orElse(readSnapshotFile(lastAvailableVersion))
286+
val (result, elapsedMs) = Utils.timeTakenMs {
287+
val snapshotCurrentVersionMap = readSnapshotFile(version)
288+
if (snapshotCurrentVersionMap.isDefined) {
289+
synchronized { loadedMaps.put(version, snapshotCurrentVersionMap.get) }
290+
return snapshotCurrentVersionMap.get
291+
}
292+
293+
// Find the most recent map before this version that we can.
294+
// [SPARK-22305] This must be done iteratively to avoid stack overflow.
295+
var lastAvailableVersion = version
296+
var lastAvailableMap: Option[MapType] = None
297+
while (lastAvailableMap.isEmpty) {
298+
lastAvailableVersion -= 1
299+
300+
if (lastAvailableVersion <= 0) {
301+
// Use an empty map for versions 0 or less.
302+
lastAvailableMap = Some(new MapType)
303+
} else {
304+
lastAvailableMap =
305+
synchronized { loadedMaps.get(lastAvailableVersion) }
306+
.orElse(readSnapshotFile(lastAvailableVersion))
307+
}
308+
}
309+
310+
// Load all the deltas from the version after the last available one up to the target version.
311+
// The last available version is the one with a full snapshot, so it doesn't need deltas.
312+
val resultMap = new MapType(lastAvailableMap.get)
313+
for (deltaVersion <- lastAvailableVersion + 1 to version) {
314+
updateFromDeltaFile(deltaVersion, resultMap)
303315
}
304-
}
305316

306-
// Load all the deltas from the version after the last available one up to the target version.
307-
// The last available version is the one with a full snapshot, so it doesn't need deltas.
308-
val resultMap = new MapType(lastAvailableMap.get)
309-
for (deltaVersion <- lastAvailableVersion + 1 to version) {
310-
updateFromDeltaFile(deltaVersion, resultMap)
317+
synchronized { loadedMaps.put(version, resultMap) }
318+
resultMap
311319
}
312320

313-
synchronized { loadedMaps.put(version, resultMap) }
314-
resultMap
321+
logDebug(s"Loading state for $version takes $elapsedMs ms.")
322+
323+
result
315324
}
316325

317326
private def writeUpdateToDeltaFile(
@@ -490,15 +499,18 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
490499
/** Perform a snapshot of the store to allow delta files to be consolidated */
491500
private def doSnapshot(): Unit = {
492501
try {
493-
val files = fetchFiles()
502+
val (files, e1) = Utils.timeTakenMs(fetchFiles())
503+
logDebug(s"fetchFiles() took $e1 ms.")
504+
494505
if (files.nonEmpty) {
495506
val lastVersion = files.last.version
496507
val deltaFilesForLastVersion =
497508
filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
498509
synchronized { loadedMaps.get(lastVersion) } match {
499510
case Some(map) =>
500511
if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
501-
writeSnapshotFile(lastVersion, map)
512+
val (_, e2) = Utils.timeTakenMs(writeSnapshotFile(lastVersion, map))
513+
logDebug(s"writeSnapshotFile() took $e2 ms.")
502514
}
503515
case None =>
504516
// The last map is not loaded, probably some other instance is in charge
@@ -517,7 +529,9 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
517529
*/
518530
private[state] def cleanup(): Unit = {
519531
try {
520-
val files = fetchFiles()
532+
val (files, e1) = Utils.timeTakenMs(fetchFiles())
533+
logDebug(s"fetchFiles() took $e1 ms.")
534+
521535
if (files.nonEmpty) {
522536
val earliestVersionToRetain = files.last.version - storeConf.minVersionsToRetain
523537
if (earliestVersionToRetain > 0) {
@@ -527,9 +541,12 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
527541
mapsToRemove.foreach(loadedMaps.remove)
528542
}
529543
val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
530-
filesToDelete.foreach { f =>
531-
fm.delete(f.path)
544+
val (_, e2) = Utils.timeTakenMs {
545+
filesToDelete.foreach { f =>
546+
fm.delete(f.path)
547+
}
532548
}
549+
logDebug(s"deleting files took $e2 ms.")
533550
logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " +
534551
filesToDelete.mkString(", "))
535552
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
3535
import org.apache.spark.sql.execution.streaming.state._
3636
import org.apache.spark.sql.streaming.{OutputMode, StateOperatorProgress}
3737
import org.apache.spark.sql.types._
38-
import org.apache.spark.util.{CompletionIterator, NextIterator}
38+
import org.apache.spark.util.{CompletionIterator, NextIterator, Utils}
3939

4040

4141
/** Used to identify the state store for a given operator. */
@@ -97,12 +97,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
9797
}
9898

9999
/** Records the duration of running `body` for the next query progress update. */
100-
protected def timeTakenMs(body: => Unit): Long = {
101-
val startTime = System.nanoTime()
102-
val result = body
103-
val endTime = System.nanoTime()
104-
math.max(NANOSECONDS.toMillis(endTime - startTime), 0)
105-
}
100+
protected def timeTakenMs(body: => Unit): Long = Utils.timeTakenMs(body)._2
106101

107102
/**
108103
* Set the SQL metrics related to the state store.

0 commit comments

Comments
 (0)