Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions docs/structured-streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,11 @@ Here are the details of all the sources in Spark.
href="api/R/read.stream.html">R</a>).
E.g. for "parquet" format options see <code>DataStreamReader.parquet()</code>.
<br/><br/>
<code>ignoreFileStreamSinkMetadata</code>: whether to ignore metadata information being left from file stream sink, which leads to always use in-memory file index. (default: false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find place to leave this for (batch) file source. Please let me know if we have proper place to put. At least we guide the needs of enable this option when "retainOnlyLastBatchInMetadata" option is enabled, so might be safe for batch case as well.

<br/>
This option is useful when metadata grows too big and reading metadata is even slower than listing files from filesystem.<br/>
NOTE: This option must be set to "true" if file source is reading from output files which file stream sink is written, with setting "retainOnlyLastBatchInMetadata" option to "true".
<br/><br/>
In addition, there are session configurations that affect certain file-formats. See the <a href="sql-programming-guide.html">SQL Programming Guide</a> for more details. E.g., for "parquet", see <a href="sql-data-sources-parquet.html#configuration">Parquet configuration</a> section.
</td>
<td>Yes</td>
Expand Down Expand Up @@ -1812,6 +1817,12 @@ Here are the details of all the sinks in Spark.
(<a href="api/scala/index.html#org.apache.spark.sql.DataFrameWriter">Scala</a>/<a href="api/java/org/apache/spark/sql/DataFrameWriter.html">Java</a>/<a href="api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter">Python</a>/<a
href="api/R/write.stream.html">R</a>).
E.g. for "parquet" format options see <code>DataFrameWriter.parquet()</code>
<br/>
<code>retainOnlyLastBatchInMetadata</code>: whether to retain metadata information only for last succeed batch.
<br/><br/>
This option greatly reduces overhead on compacting metadata files which would be non-trivial when query processes lots of files in each batch.<br/>
NOTE: As it only retains the last batch in metadata, the metadata is not readable from file source: you must set "ignoreFileStreamSinkMetadata" option
Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Feb 20, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel this is not ideal, but given file stream sink itself also leverages file log, it cannot be an optional entirely.

If we would like to not leaving file log in this case, we may need to have another metadata (which store minimized information like the last succeed batch id) and store it instead when the option is turned on. It should be stored in another directory (_spark_metadata) and listing files should ignore this directory too.

to "true" when reading sink's output files from another query, regardless of batch and streaming source.
</td>
<td>Yes (exactly-once)</td>
<td>Supports writes to partitioned tables. Partitioning by time may be useful.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,8 +344,9 @@ case class DataSource(
// We are reading from the results of a streaming query. Load files from the metadata log
// instead of listing them using HDFS APIs.
case (format: FileFormat, _)
if FileStreamSink.hasMetadata(
caseInsensitiveOptions.get("path").toSeq ++ paths,
if !caseInsensitiveOptions.getOrElse(
"ignoreFileStreamSinkMetadata", "false").toBoolean &&
FileStreamSink.hasMetadata(caseInsensitiveOptions.get("path").toSeq ++ paths,
sparkSession.sessionState.newHadoopConf()) =>
val basePath = new Path((caseInsensitiveOptions.get("path").toSeq ++ paths).head)
val fileCatalog = new MetadataLogFileIndex(sparkSession, basePath, userSpecifiedSchema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ class FileStreamOptions(parameters: CaseInsensitiveMap[String]) extends Logging
*/
val fileNameOnly: Boolean = withBooleanParameter("fileNameOnly", false)

/**
* Whether to ignore FileStreamSink metadata in source, which leads to use in-memory file index.
*/
val ignoreFileStreamSinkMetadata: Boolean = withBooleanParameter("ignoreFileStreamSinkMetadata",
default = false)

private def withBooleanParameter(name: String, default: Boolean) = {
parameters.get(name).map { str =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ class FileStreamSink(
private val fileLog =
new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, logPath.toUri.toString)
private val hadoopConf = sparkSession.sessionState.newHadoopConf()
private val retainOnlyLastBatchInMetadata: Boolean =
options.getOrElse("retainOnlyLastBatchInMetadata", "false").toBoolean

private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
Expand All @@ -114,7 +116,7 @@ class FileStreamSink(

committer match {
case manifestCommitter: ManifestFileCommitProtocol =>
manifestCommitter.setupManifestOptions(fileLog, batchId)
manifestCommitter.setupManifestOptions(fileLog, batchId, retainOnlyLastBatchInMetadata)
case _ => // Do nothing
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,14 @@ class FileStreamSource(
* Some(true) means we know for sure the source DOES have metadata
* Some(false) means we know for sure the source DOSE NOT have metadata
*/
@volatile private[sql] var sourceHasMetadata: Option[Boolean] =
if (SparkHadoopUtil.get.isGlobPath(new Path(path))) Some(false) else None
@volatile private[sql] var sourceHasMetadata: Option[Boolean] = {
if (sourceOptions.ignoreFileStreamSinkMetadata ||
SparkHadoopUtil.get.isGlobPath(new Path(path))) {
Some(false)
} else {
None
}
}

private def allFilesUsingInMemoryFileIndex() = {
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(fs, qualifiedBasePath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,19 @@ class ManifestFileCommitProtocol(jobId: String, path: String)

@transient private var fileLog: FileStreamSinkLog = _
private var batchId: Long = _
private var retainOnlyLastBatch: Boolean = _

/**
* Sets up the manifest log output and the batch id for this job.
* Must be called before any other function.
*/
def setupManifestOptions(fileLog: FileStreamSinkLog, batchId: Long): Unit = {
def setupManifestOptions(
fileLog: FileStreamSinkLog,
batchId: Long,
retainOnlyLastBatch: Boolean): Unit = {
this.fileLog = fileLog
this.batchId = batchId
this.retainOnlyLastBatch = retainOnlyLastBatch
}

override def setupJob(jobContext: JobContext): Unit = {
Expand All @@ -63,6 +68,10 @@ class ManifestFileCommitProtocol(jobId: String, path: String)

if (fileLog.add(batchId, fileStatuses)) {
logInfo(s"Committed batch $batchId")
if (retainOnlyLastBatch) {
// purge older than batchId, which always keep only one batch in file log
fileLog.purge(batchId)
}
} else {
throw new IllegalStateException(s"Race while writing batch $batchId")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,75 @@ class FileStreamSinkSuite extends StreamTest {
}
}

test("SPARK-24295 retain only last batch for file log metadata") {
val inputData = MemoryStream[Long]
val inputDF = inputData.toDF.toDF("time")
val outputDf = inputDF
.selectExpr("CAST(time AS timestamp) AS timestamp")

val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath

var query: StreamingQuery = null

try {
query =
outputDf.writeStream
.option("checkpointLocation", checkpointDir)
.option("retainOnlyLastBatchInMetadata", true)
.format("parquet")
.start(outputDir)

def addTimestamp(timestampInSecs: Int*): Unit = {
inputData.addData(timestampInSecs.map(_ * 1L): _*)
failAfter(streamingTimeout) {
query.processAllAvailable()
}
}

def check(expectedResult: Long*): Unit = {
val outputDf = spark.read
// This option must be provided when we enable 'retainOnlyLastBatchInFileLog'
// to purge metadata from FileStreamSink, otherwise query will fail while loading
// due to incomplete of metadata.
.option("ignoreFileStreamSinkMetadata", "true")
.parquet(outputDir)
.selectExpr("timestamp")
.sort("timestamp")
checkDataset(outputDf.as[Long], expectedResult: _*)
}

val logPath = new Path(outputDir, FileStreamSink.metadataDir)
val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, spark, logPath.toUri.toString)

addTimestamp(100)
check(100)

// only new batch is retained, hence length should be 1
assert(fileLog.get(None, None).length == 1)
assert(fileLog.get(None, None).head._1 === 0)

addTimestamp(104, 123)
check(100, 104, 123)

// only new batch is retained, hence length should be 1
assert(fileLog.get(None, None).length === 1)
assert(fileLog.get(None, None).head._1 === 1)

addTimestamp(140)
check(100, 104, 123, 140)

// only new batch is retained, hence length should be 1
assert(fileLog.get(None, None).length === 1)
assert(fileLog.get(None, None).head._1 === 2)

} finally {
if (query != null) {
query.stop()
}
}
}

test("partitioned writing and batch reading with 'basePath'") {
withTempDir { outputDir =>
withTempDir { checkpointDir =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -908,6 +908,56 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("ignore metadata when reading data from outputs of another streaming query") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
val q1Source = MemoryStream[String]
val q1 =
q1Source
.toDF()
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.format("text")
.start(outputDir.getCanonicalPath)

// q2 is a streaming query that reads q1's text outputs
// even q1 is supposed to store metadata in output location, we intend to ignore it
val q2 =
createFileStream("text", outputDir.getCanonicalPath,
options = Map("ignoreFileStreamSinkMetadata" -> "true"))
.filter($"value" contains "keep")

def q1AddData(data: String*): StreamAction =
Execute { _ =>
q1Source.addData(data)
q1.processAllAvailable()
}
def q2ProcessAllAvailable(): StreamAction = Execute { q2 => q2.processAllAvailable() }

testStream(q2)(
// batch 0
q1AddData("drop1", "keep2"),
q2ProcessAllAvailable(),
CheckAnswer("keep2"),

// batch 1
Assert {
// create a text file that won't be on q1's sink log
// given we are ignoring sink metadata, the content should appear in q2's answer
val shouldNotKeep = new File(outputDir, "keep.txt")
stringToFile(shouldNotKeep, "keep")
shouldNotKeep.exists()
},
q1AddData("keep3"),
q2ProcessAllAvailable(),
// here we should see "keep", whereas with metadata index, it should not appear
CheckAnswer("keep", "keep2", "keep3"),

Execute { _ => q1.stop() }
)
}
}

test("start before another streaming query, and read its output") {
withTempDirs { case (outputDir, checkpointDir) =>
// q1 is a streaming query that reads from memory and writes to text files
Expand Down