Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1870,7 +1870,10 @@ Here are the details of all the sinks in Spark.
<td><b>File Sink</b></td>
<td>Append</td>
<td>
<code>path</code>: path to the output directory, must be specified.
<code>path</code>: path to the output directory, must be specified.<br/>
<code>outputRetentionMs</code>: time to live (TTL) for output files. Output files which batches were
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are 2 time fields in SinkFileStatus, modificationTime and commitTime. Maybe worth to mention the exact field which is used for comparison to make it 100% clear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we avoid exposing the implementation details in docs. e.g. If I'm not mistaken, there's no explanation of the format of the metadata, hence it would be confusing which field is being used because end users even don't know what they are.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right and we're not explaining metadata details to users. What users would like to understand though what the reference TTL is bound to. As half developer and half user I was a bit confused which field SinkFileStatus we would like to refer to. Since we've removed the (in my view) duplicate field I'm fine here.

committed older than TTL will be eventually excluded in metadata log. This means reader queries which read
the sink's output directory may not process them. By default it's disabled.
<br/><br/>
For file-format-specific options, see the related methods in DataFrameWriter
(<a href="api/scala/org/apache/spark/sql/DataFrameWriter.html">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,9 @@ class FileStreamSink(
private val basePath = new Path(path)
private val logPath = getMetadataLogPath(basePath.getFileSystem(hadoopConf), basePath,
sparkSession.sessionState.conf)
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toString)
private val outputTimeToLive = options.get("outputRetentionMs").map(_.toLong)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Utils.timeStringAsMs to parse this? Users likely set this to multiple days and asking them to calculate milliseconds is not user friendly.

Nit: regarding the option name, can we call it retention? It's obvious that the query is outputting files, so output sounds redundant to me.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: any reason to use a different name outputTimeToLive? Using the same name as the option would help other people read codes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: it would be great to output an info log for this value if it's set. It might be useful when debugging data issues caused by the retention.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestions! I'll apply all of inputs.

private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession,
logPath.toString, outputTimeToLive)

private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ object SinkFileStatus {
class FileStreamSinkLog(
metadataLogVersion: Int,
sparkSession: SparkSession,
path: String)
path: String,
outputTimeToLiveMs: Option[Long] = None)
extends CompactibleFileStreamLog[SinkFileStatus](metadataLogVersion, sparkSession, path) {

private implicit val formats = Serialization.formats(NoTypeHints)
Expand All @@ -96,6 +97,19 @@ class FileStreamSinkLog(
require(defaultCompactInterval > 0,
s"Please set ${SQLConf.FILE_SINK_LOG_COMPACT_INTERVAL.key} (was $defaultCompactInterval) " +
"to a positive value.")

private val ttlMs = outputTimeToLiveMs.getOrElse(Long.MaxValue)

override def shouldRetain(log: SinkFileStatus): Boolean = {
val curTime = System.currentTimeMillis()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be great to avoid calling System.currentTimeMillis() if the option is not set, considering we need to call this method once (a JNI call) for each log entry.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Nov 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we could change the method signature a bit to provide "context" which would be same for the same compact batch. We changed the method shouldRetain in SPARK-30462 which is not yet released (3.1.0), hence making change shouldn't make backward compatibility for this change. (We decided to break the interface, but we break only once for these changes.)

Once we only call System.currentTimeMillis() once per compact batch the overhead should be ignorable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah also CompactibleFileStreamLog is not a public API (its package is org.apache.spark.sql.execution.streaming), so it shouldn't matter much.

if (curTime - log.modificationTime > ttlMs) {
logDebug(s"${log.path} excluded by retention - current time: $curTime / " +
s"modification time: ${log.modificationTime} / TTL: $ttlMs.")
false
} else {
true
}
}
}

object FileStreamSinkLog {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import java.util.concurrent.atomic.AtomicLong

import scala.util.Random

import org.apache.hadoop.fs.{FSDataInputStream, Path, RawLocalFileSystem}
import org.apache.hadoop.fs.{FileSystem, FSDataInputStream, Path, RawLocalFileSystem}

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -130,6 +130,17 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

private def listBatchFiles(fs: FileSystem, sinkLog: FileStreamSinkLog): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

test("delete expired file") {
// Set FILE_SINK_LOG_CLEANUP_DELAY to 0 so that we can detect the deleting behaviour
// deterministically and one min batches to retain
Expand All @@ -139,18 +150,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "1") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())

def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
Expand All @@ -174,18 +174,7 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
SQLConf.MIN_BATCHES_TO_RETAIN.key -> "2") {
withFileStreamSinkLog { sinkLog =>
val fs = sinkLog.metadataPath.getFileSystem(spark.sessionState.newHadoopConf())

def listBatchFiles(): Set[String] = {
fs.listStatus(sinkLog.metadataPath).map(_.getPath.getName).filter { fileName =>
try {
getBatchIdFromFileName(fileName)
true
} catch {
case _: NumberFormatException => false
}
}.toSet
}

def listBatchFiles(): Set[String] = this.listBatchFiles(fs, sinkLog)
sinkLog.add(0, Array(newFakeSinkFileStatus("/a/b/0", FileStreamSinkLog.ADD_ACTION)))
assert(Set("0") === listBatchFiles())
sinkLog.add(1, Array(newFakeSinkFileStatus("/a/b/1", FileStreamSinkLog.ADD_ACTION)))
Expand All @@ -206,6 +195,22 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}
}

test("filter out outdated entries when compacting") {
val curTime = System.currentTimeMillis()
withFileStreamSinkLog(sinkLog => {
val logs = Seq(
newFakeSinkFileStatus("/a/b/x", FileStreamSinkLog.ADD_ACTION, curTime),
newFakeSinkFileStatus("/a/b/y", FileStreamSinkLog.ADD_ACTION, curTime),
newFakeSinkFileStatus("/a/b/z", FileStreamSinkLog.ADD_ACTION, curTime))
logs.foreach { log => assert(sinkLog.shouldRetain(log)) }

val logs2 = Seq(
newFakeSinkFileStatus("/a/b/m", FileStreamSinkLog.ADD_ACTION, curTime - 80000),
newFakeSinkFileStatus("/a/b/n", FileStreamSinkLog.ADD_ACTION, curTime - 120000))
logs2.foreach { log => assert(!sinkLog.shouldRetain(log)) }
}, Some(60000))
}

test("read Spark 2.1.0 log format") {
assert(readFromResource("file-sink-log-version-2.1.0") === Seq(
SinkFileStatus("/a/b/0", 1, false, 1, 1, 100, FileStreamSinkLog.ADD_ACTION),
Expand Down Expand Up @@ -260,23 +265,29 @@ class FileStreamSinkLogSuite extends SparkFunSuite with SharedSparkSession {
}

/**
* Create a fake SinkFileStatus using path and action. Most of tests don't care about other fields
* in SinkFileStatus.
* Create a fake SinkFileStatus using path and action, and optionally modification time.
* Most of tests don't care about other fields in SinkFileStatus.
*/
private def newFakeSinkFileStatus(path: String, action: String): SinkFileStatus = {
private def newFakeSinkFileStatus(
path: String,
action: String,
modificationTime: Long = Long.MaxValue): SinkFileStatus = {
SinkFileStatus(
path = path,
size = 100L,
isDir = false,
modificationTime = 100L,
modificationTime = modificationTime,
blockReplication = 1,
blockSize = 100L,
action = action)
}

private def withFileStreamSinkLog(f: FileStreamSinkLog => Unit): Unit = {
private def withFileStreamSinkLog(
f: FileStreamSinkLog => Unit,
ttl: Option[Long] = None): Unit = {
withTempDir { file =>
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath)
val sinkLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, file.getCanonicalPath,
ttl)
f(sinkLog)
}
}
Expand Down