diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md
index a371f4f50f9f..0c923908bba3 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1870,7 +1870,11 @@ Here are the details of all the sinks in Spark.
File Sink |
Append |
- path: path to the output directory, must be specified.
+ path: path to the output directory, must be specified.
+ retention: time to live (TTL) for output files. Output files which batches were
+ committed older than TTL will be eventually excluded in metadata log. This means reader queries which read
+ the sink's output directory may not process them. You can provide the value as string format of the time. (like "12h", "7d", etc.)
+ By default it's disabled.
For file-format-specific options, see the related methods in DataFrameWriter
(Scala/Java/Python/
- filterInBatch(id)(shouldRetain).getOrElse {
+ filterInBatch(id)(shouldRetain(_, curTime)).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist " +
s"(latestId: $latestId, compactInterval: $compactInterval)")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index ecaf4f8160a0..e1c9b82ec2ac 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter}
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.util.SerializableConfiguration
+import org.apache.spark.util.{SerializableConfiguration, Utils}
object FileStreamSink extends Logging {
// The name of the subdirectory that is used to store metadata about which files are valid.
@@ -136,8 +136,9 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
sparkSession.sessionState.conf)
- private val fileLog =
- new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)
+ private val retention = options.get("retention").map(Utils.timeStringAsMs)
+ private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession,
+ logPath.toString, retention)
private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
index 5cb68e1ae956..2d70d95c6850 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLog.scala
@@ -81,7 +81,8 @@ object SinkFileStatus {
class FileStreamSinkLog(
metadataLogVersion: Int,
sparkSession: SparkSession,
- path: String)
+ path: String,
+ _retentionMs: Option[Long] = None)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {
private implicit val formats = Serialization.formats(NoTypeHints)
@@ -96,6 +97,28 @@ class FileStreamSinkLog(
require(defaultCompactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")
+
+ val retentionMs: Long = _retentionMs match {
+ case Some(retention) =>
+ logInfo(s"Retention is set to $retention ms")
+ retention
+
+ case _ => Long.MaxValue
+ }
+
+ override def shouldRetain(log: SinkFileStatus, currentTime: Long): Boolean = {
+ if (retentionMs < Long.MaxValue) {
+ if (currentTime - log.modificationTime > retentionMs) {
+ logDebug(s"${log.path} excluded by retention - current time: $currentTime / " +
+ s"modification time: ${log.modificationTime} / retention: $retentionMs ms.")
+ false
+ } else {
+ true
+ }
+ } else {
+ true
+ }
+ }
}
object FileStreamSinkLog {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
index c53617b40e09..bdd6bec8e782 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/FileStreamSinkLogSuite.scala
@@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong
import scala.util.Random
-import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem}
+import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, RawLocalFileSystem}
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
@@ -40,7 +40,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
test("shouldRetain") {
withFileStreamSinkLog { sinkLog =>
val log = newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION)
- assert(sinkLog.shouldRetain(log))
+ assert(sinkLog.shouldRetain(log, System.currentTimeMillis()))
}
}
@@ -130,6 +130,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}
+ private def listBatchFiles(fs: FileSystem, sinkLog: FileStreamSinkLog): Set[String] = {
+ fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
+ try {
+ getBatchIdFromFileName(fileName)
+ true
+ } catch {
+ case _: NumberFormatException => false
+ }
+ }.toSet
+ }
+
test("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
@@ -139,18 +150,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
-
- def listBatchFiles(): Set[String] = {
- fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
- try {
- getBatchIdFromFileName(fileName)
- true
- } catch {
- case _: NumberFormatException => false
- }
- }.toSet
- }
-
+ def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
@@ -174,18 +174,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())
-
- def listBatchFiles(): Set[String] = {
- fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
- try {
- getBatchIdFromFileName(fileName)
- true
- } catch {
- case _: NumberFormatException => false
- }
- }.toSet
- }
-
+ def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
@@ -206,6 +195,24 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}
+ test("filter out outdated entries when compacting") {
+ val curTime = System.currentTimeMillis()
+ withFileStreamSinkLog(sinkLog => {
+ val logs = Seq(
+ newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime),
+ newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime),
+ newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime))
+ logs.foreach { log => assert(sinkLog.shouldRetain(log, curTime)) }
+
+ val logs2 = Seq(
+ newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000),
+ newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000))
+ logs2.foreach { log =>
+ assert(!sinkLog.shouldRetain(log, curTime))
+ }
+ }, Some(60000))
+ }
+
test("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
SinkFileStatus("/a/b/0", 1, false, 1, 1, 100, FileStreamSinkLog.ADD_ACTION),
@@ -260,23 +267,29 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
/**
- * Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
- * in SinkFileStatus.
+ * Create a fake SinkFileStatus using path and action, and optionally modification time.
+ * Most of tests don't care about other fields in SinkFileStatus.
*/
- private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = {
+ private def newFakeSinkFileStatus(
+ path: String,
+ action: String,
+ modificationTime: Long = Long.MaxValue): SinkFileStatus = {
SinkFileStatus(
path = path,
size = 100L,
isDir = false,
- modificationTime = 100L,
+ modificationTime = modificationTime,
blockReplication = 1,
blockSize = 100L,
action = action)
}
- private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
+ private def withFileStreamSinkLog(
+ f: FileStreamSinkLog => Unit,
+ ttl: Option[Long] = None): Unit = {
withTempDir { file =>
- val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath)
+ val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath,
+ ttl)
f(sinkLog)
}
}
|