Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1830,7 +1830,10 @@ Here are the details of all the sinks in Spark.
<td><b>File Sink</b></td>
<td>Append</td>
<td>
<code>path</code>: path to the output directory, must be specified.
<code>path</code>: path to the output directory, must be specified.<br/>
<code>outputRetentionMs</code>: time to live (TTL) for output files. Output files which batches were
committed older than TTL will be eventually excluded in metadata log. This means reader queries which read
the sink's output directory may not process them.
<br/><br/>
For file-format-specific options, see the related methods in DataFrameWriter
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ class FileStreamSink(
FileStreamSink.checkEscapedMetadataPath(fs, metadataDir, sparkSession.sessionState.conf)
metadataDir
}
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)
private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession,
logPath.toString, outputTimeToLive)

private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.internal.SQLConf
* @param blockReplication the block replication.
* @param blockSize the block size.
* @param action the file action. Must be either "add" or "delete".
* @param commitTime the time which batch for the file is committed
*/
case class SinkFileStatus(
path: String,
Expand All @@ -45,7 +46,20 @@ case class SinkFileStatus(
modificationTime: Long,
blockReplication: Int,
blockSize: Long,
action: String) {
action: String,
commitTime: Long) {

def this(
path: String,
size: Long,
isDir: Boolean,
modificationTime: Long,
blockReplication: Int,
blockSize: Long,
action: String) {
// use Long.MaxValue if we don't know about exact commit time, which means they will not evicted
this(path, size, isDir, modificationTime, blockReplication, blockSize, action, Long.MaxValue)
}

def toFileStatus: FileStatus = {
new FileStatus(
Expand All @@ -55,14 +69,14 @@ case class SinkFileStatus(

object SinkFileStatus {
def apply(f: FileStatus): SinkFileStatus = {
SinkFileStatus(
path = f.getPath.toUri.toString,
size = f.getLen,
isDir = f.isDirectory,
modificationTime = f.getModificationTime,
blockReplication = f.getReplication,
blockSize = f.getBlockSize,
action = FileStreamSinkLog.ADD_ACTION)
new SinkFileStatus(
f.getPath.toUri.toString,
f.getLen,
f.isDirectory,
f.getModificationTime,
f.getReplication,
f.getBlockSize,
FileStreamSinkLog.ADD_ACTION)
}
}

Expand All @@ -81,7 +95,8 @@ object SinkFileStatus {
class FileStreamSinkLog(
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
path: String,
outputTimeToLiveMs: Option[Long] = None)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {

private implicit val formats = Serialization.formats(NoTypeHints)
Expand All @@ -97,8 +112,13 @@ class FileStreamSinkLog(
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")

private val ttlMs = outputTimeToLiveMs.getOrElse(Long.MaxValue)

override def compactLogs(logs: Seq[SinkFileStatus]): Seq[SinkFileStatus] = {
val deletedFiles = logs.filter(_.action == FileStreamSinkLog.DELETE_ACTION).map(_.path).toSet
val curTime = System.currentTimeMillis()
val deletedFiles = logs.filter { log =>
log.action == FileStreamSinkLog.DELETE_ACTION || (curTime - log.commitTime) > ttlMs
}.map(_.path).toSet
if (deletedFiles.isEmpty) {
logs
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String)

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
require(fileLog != null, "setupManifestOptions must be called before this function")
val fileStatuses = taskCommits.flatMap(_.obj.asInstanceOf[Seq[SinkFileStatus]]).toArray
val commitTimestamp = System.currentTimeMillis()
val fileStatuses = taskCommits.flatMap { taskCommit =>
taskCommit.obj.asInstanceOf[Seq[SinkFileStatus]].map(_.copy(commitTime = commitTimestamp))
}.toArray

if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.streaming
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.StandardCharsets.UTF_8

import org.apache.hadoop.fs.FileSystem

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -55,29 +57,32 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 1000L,
blockReplication = 1,
blockSize = 10000L,
action = FileStreamSinkLog.ADD_ACTION),
action = FileStreamSinkLog.ADD_ACTION,
commitTime = 1000L),
SinkFileStatus(
path = "/a/b/y",
size = 200L,
isDir = false,
modificationTime = 2000L,
blockReplication = 2,
blockSize = 20000L,
action = FileStreamSinkLog.DELETE_ACTION),
action = FileStreamSinkLog.DELETE_ACTION,
commitTime = 2000L),
SinkFileStatus(
path = "/a/b/z",
size = 300L,
isDir = false,
modificationTime = 3000L,
blockReplication = 3,
blockSize = 30000L,
action = FileStreamSinkLog.ADD_ACTION))
action = FileStreamSinkLog.ADD_ACTION,
commitTime = 3000L))

// scalastyle:off
val expected = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add","commitTime":1000}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete","commitTime":2000}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add","commitTime":3000}""".stripMargin
// scalastyle:on
val baos = new ByteArrayOutputStream()
sinkLog.serialize(logs, baos)
Expand All @@ -92,9 +97,9 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
withFileStreamSinkLog { sinkLog =>
// scalastyle:off
val logs = s"""v$VERSION
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add"}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete"}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add"}""".stripMargin
|{"path":"/a/b/x","size":100,"isDir":false,"modificationTime":1000,"blockReplication":1,"blockSize":10000,"action":"add","commitTime":1000}
|{"path":"/a/b/y","size":200,"isDir":false,"modificationTime":2000,"blockReplication":2,"blockSize":20000,"action":"delete","commitTime":2000}
|{"path":"/a/b/z","size":300,"isDir":false,"modificationTime":3000,"blockReplication":3,"blockSize":30000,"action":"add","commitTime":3000}""".stripMargin
// scalastyle:on

val expected = Seq(
Expand All @@ -105,23 +110,26 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
modificationTime = 1000L,
blockReplication = 1,
blockSize = 10000L,
action = FileStreamSinkLog.ADD_ACTION),
action = FileStreamSinkLog.ADD_ACTION,
commitTime = 1000L),
SinkFileStatus(
path = "/a/b/y",
size = 200L,
isDir = false,
modificationTime = 2000L,
blockReplication = 2,
blockSize = 20000L,
action = FileStreamSinkLog.DELETE_ACTION),
action = FileStreamSinkLog.DELETE_ACTION,
commitTime = 2000L),
SinkFileStatus(
path = "/a/b/z",
size = 300L,
isDir = false,
modificationTime = 3000L,
blockReplication = 3,
blockSize = 30000L,
action = FileStreamSinkLog.ADD_ACTION))
action = FileStreamSinkLog.ADD_ACTION,
commitTime = 3000L))

assert(expected === sinkLog.deserialize(new ByteArrayInputStream(logs.getBytes(UTF_8))))

Expand Down Expand Up @@ -149,6 +157,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

private def listBatchFiles(fs: FileSystem, sinkLog: FileStreamSinkLog): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

test("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
Expand All @@ -158,18 +177,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())

def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
Expand All @@ -193,18 +201,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())

def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
Expand All @@ -225,39 +222,70 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

test("filter out outdated entries when compacting") {
val curTime = System.currentTimeMillis()
withFileStreamSinkLog(Some(60000), sinkLog => {
val logs = Seq(
newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime),
newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime),
newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime))
assert(logs === sinkLog.compactLogs(logs))

val logs2 = Seq(
newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000),
newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000))
assert(logs === sinkLog.compactLogs(logs ++ logs2))
})
}

test("read Spark 2.1.0 log format") {
val maxLong = Long.MaxValue
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
// SinkFileStatus("/a/b/0", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION), -> deleted
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION),
SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION)
SinkFileStatus("/a/b/1", 100, false, 100, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/2", 200, false, 200, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/3", 300, false, 300, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/4", 400, false, 400, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/5", 500, false, 500, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/6", 600, false, 600, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/7", 700, false, 700, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/8", 800, false, 800, 1, 100, FileStreamSinkLog.ADD_ACTION, maxLong),
SinkFileStatus("/a/b/9", 900, false, 900, 3, 200, FileStreamSinkLog.ADD_ACTION, maxLong)
))
}

/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
*/
private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = {
private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus =
newFakeSinkFileStatus(path, action, Long.MaxValue)

/**
* Create a fake SinkFileStatus using path and action, and commit time.
*/
private def newFakeSinkFileStatus(
path: String,
action: String,
commitTime: Long): SinkFileStatus = {
SinkFileStatus(
path = path,
size = 100L,
isDir = false,
modificationTime = 100L,
blockReplication = 1,
blockSize = 100L,
action = action)
action = action,
commitTime = commitTime)
}

private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit =
withFileStreamSinkLog(None, f)

private def withFileStreamSinkLog(ttl: Option[Long], f: FileStreamSinkLog => Unit): Unit = {
withTempDir { file =>
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath)
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath,
ttl)
f(sinkLog)
}
}
Expand Down