Skip to content

Commit 97bcf5b

Browse files
committed
Updated StreamExecution and StreamingQueryManager to utilize CheckpointFileManager
1 parent 39c1127 commit 97bcf5b

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ abstract class StreamExecution(
9696

9797
val resolvedCheckpointRoot = {
9898
val checkpointPath = new Path(checkpointRoot)
99-
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
99+
val checkpointFileManager = CheckpointFileManager.create(checkpointPath,
100+
sparkSession.sessionState.newHadoopConf())
100101
if (sparkSession.conf.get(SQLConf.STREAMING_CHECKPOINT_ESCAPED_PATH_CHECK_ENABLED)
101102
&& StreamExecution.containsSpecialCharsInPath(checkpointPath)) {
102103
// In Spark 2.4 and earlier, the checkpoint path is escaped 3 times (3 `Path.toUri.toString`
@@ -106,7 +107,7 @@ abstract class StreamExecution(
106107
new Path(new Path(checkpointPath.toUri.toString).toUri.toString).toUri.toString
107108
val legacyCheckpointDirExists =
108109
try {
109-
fs.exists(new Path(legacyCheckpointDir))
110+
checkpointFileManager.exists(new Path(legacyCheckpointDir))
110111
} catch {
111112
case NonFatal(e) =>
112113
// We may not have access to this directory. Don't fail the query if that happens.
@@ -133,9 +134,8 @@ abstract class StreamExecution(
133134
.stripMargin)
134135
}
135136
}
136-
val checkpointDir = checkpointPath.makeQualified(fs.getUri, fs.getWorkingDirectory)
137-
fs.mkdirs(checkpointDir)
138-
checkpointDir.toString
137+
checkpointFileManager.mkdirs(checkpointPath)
138+
checkpointPath.toString
139139
}
140140
logInfo(s"Checkpoint root $checkpointRoot resolved to $resolvedCheckpointRoot.")
141141

@@ -388,8 +388,9 @@ abstract class StreamExecution(
388388
val checkpointPath = new Path(resolvedCheckpointRoot)
389389
try {
390390
logInfo(s"Deleting checkpoint $checkpointPath.")
391-
val fs = checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
392-
fs.delete(checkpointPath, true)
391+
val manager = CheckpointFileManager.create(checkpointPath,
392+
sparkSession.sessionState.newHadoopConf())
393+
manager.delete(checkpointPath)
393394
} catch {
394395
case NonFatal(e) =>
395396
// Deleting temp checkpoint folder is best effort, don't throw non fatal exceptions

sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,8 +242,9 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
242242
// If offsets have already been created, we trying to resume a query.
243243
if (!recoverFromCheckpointLocation) {
244244
val checkpointPath = new Path(checkpointLocation, "offsets")
245-
val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
246-
if (fs.exists(checkpointPath)) {
245+
val checkpointFileManager = CheckpointFileManager.create(checkpointPath,
246+
df.sparkSession.sessionState.newHadoopConf())
247+
if (checkpointFileManager.exists(checkpointPath)) {
247248
throw new AnalysisException(
248249
s"This query does not support recovering from checkpoint location. " +
249250
s"Delete $checkpointPath to start over.")

0 commit comments

Comments
 (0)