Skip to content
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,11 @@ Here are the details of all the sinks in Spark.
<td><b>File Sink</b></td>
<td>Append</td>
<td>
<code>path</code>: path to the output directory, must be specified.
<code>path</code>: path to the output directory, must be specified.<br/>
<code>retention</code>: time to live (TTL) for output files. Output files which batches were
committed older than TTL will be eventually excluded in metadata log. This means reader queries which read
the sink's output directory may not process them. You can provide the value as string format of the time. (like "12h", "7d", etc.)
By default it's disabled.
<br/><br/>
For file-format-specific options, see the related methods in DataFrameWriter
(<a href="api/scala/org/apache/spark/sql/DataFrameWriter.html">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
* Default implementation retains all log entries. Implementations should override the method
* to change the behavior.
*/
def shouldRetain(log: T): Boolean = true
def shouldRetain(log: T, currentTime: Long): Boolean = true

override def batchIdToPath(batchId: Long): Path = {
if (isCompactionBatch(batchId, compactInterval)) {
Expand Down Expand Up @@ -218,8 +218,9 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
* corresponding `batchId` file. It will delete expired files as well if enabled.
*/
private def compact(batchId: Long, logs: Array[T]): Boolean = {
val curTime = System.currentTimeMillis()
def writeEntry(entry: T, output: OutputStream): Unit = {
if (shouldRetain(entry)) {
if (shouldRetain(entry, curTime)) {
output.write('\n')
serializeEntry(entry, output)
}
Expand Down Expand Up @@ -258,7 +259,8 @@ abstract class CompactibleFileStreamLog[T <: AnyRef : ClassTag](
try {
val logs =
getAllValidBatches(latestId, compactInterval).flatMap { id =>
filterInBatch(id)(shouldRetain).getOrElse {
val curTime = System.currentTimeMillis()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we can move this out of the flatMap function.

filterInBatch(id)(shouldRetain(_, curTime)).getOrElse {
throw new IllegalStateException(
s"${batchIdToPath(id)} doesn't exist " +
s"(latestId: $latestId, compactInterval: $compactInterval)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, FileFormat, FileFormatWriter}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration
import org.apache.spark.util.{SerializableConfiguration, Utils}

object FileStreamSink extends Logging {
// The name of the subdirectory that is used to store metadata about which files are valid.
Expand Down Expand Up @@ -136,8 +136,9 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
sparkSession.sessionState.conf)
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)
private val retention = options.get("retention").map(Utils.timeStringAsMs)
private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession,
logPath.toString, retention)

private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ object SinkFileStatus {
class FileStreamSinkLog(
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
path: String,
_retentionMs: Option[Long] = None)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {

private implicit val formats = Serialization.formats(NoTypeHints)
Expand All @@ -96,6 +97,28 @@ class FileStreamSinkLog(
require(defaultCompactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")

val retentionMs: Long = _retentionMs match {
case Some(retention) =>
logInfo(s"Retention is set to $retention ms")
retention

case _ => Long.MaxValue
}

override def shouldRetain(log: SinkFileStatus, currentTime: Long): Boolean = {
if (retentionMs < Long.MaxValue) {
if (currentTime - log.modificationTime > retentionMs) {
logDebug(s"${log.path} excluded by retention - current time: $currentTime / " +
s"modification time: ${log.modificationTime} / retention: $retentionMs ms.")
false
} else {
true
}
} else {
true
}
}
}

object FileStreamSinkLog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong

import scala.util.Random

import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, RawLocalFileSystem}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
Expand All @@ -40,7 +40,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
test("shouldRetain") {
withFileStreamSinkLog { sinkLog =>
val log = newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION)
assert(sinkLog.shouldRetain(log))
assert(sinkLog.shouldRetain(log, System.currentTimeMillis()))
}
}

Expand Down Expand Up @@ -130,6 +130,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

private def listBatchFiles(fs: FileSystem, sinkLog: FileStreamSinkLog): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

test("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
Expand All @@ -139,18 +150,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())

def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
Expand All @@ -174,18 +174,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())

def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
Expand All @@ -206,6 +195,24 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

test("filter out outdated entries when compacting") {
val curTime = System.currentTimeMillis()
withFileStreamSinkLog(sinkLog => {
val logs = Seq(
newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime),
newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime),
newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime))
logs.foreach { log => assert(sinkLog.shouldRetain(log, curTime)) }

val logs2 = Seq(
newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000),
newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000))
logs2.foreach { log =>
assert(!sinkLog.shouldRetain(log, curTime))
}
}, Some(60000))
}

test("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
SinkFileStatus("/a/b/0", 1, false, 1, 1, 100, FileStreamSinkLog.ADD_ACTION),
Expand Down Expand Up @@ -260,23 +267,29 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}

/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
* Create a fake SinkFileStatus using path and action, and optionally modification time.
* Most of tests don't care about other fields in SinkFileStatus.
*/
private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = {
private def newFakeSinkFileStatus(
path: String,
action: String,
modificationTime: Long = Long.MaxValue): SinkFileStatus = {
SinkFileStatus(
path = path,
size = 100L,
isDir = false,
modificationTime = 100L,
modificationTime = modificationTime,
blockReplication = 1,
blockSize = 100L,
action = action)
}

private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
private def withFileStreamSinkLog(
f: FileStreamSinkLog => Unit,
ttl: Option[Long] = None): Unit = {
withTempDir { file =>
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath)
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath,
ttl)
f(sinkLog)
}
}
Expand Down