Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add the ability to remove the old MetadataLog in FileStreamSource
  • Loading branch information
jerryshao committed Sep 18, 2016
commit 6cc43a3526295438a0b8b3b810f77bced3dd18dc
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,36 @@

package org.apache.spark.sql.execution.streaming

<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1
import scala.collection.JavaConverters._
=======
import java.util.UUID

import org.apache.hadoop.fs.Path
import scala.collection.mutable.ArrayBuffer
import scala.util.control.NonFatal
>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource

import org.apache.hadoop.fs.{Path, PathFilter}

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1
import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
=======
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.internal.SQLConf
>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource
import org.apache.spark.sql.types.StructType

/**
<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1
* A very simple source that reads files from the given directory as they appear.
*
* TODO: Clean up the metadata log files periodically.
=======
* A very simple source that reads text files from the given directory as they appear.
>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource
*/
class FileStreamSource(
sparkSession: SparkSession,
Expand All @@ -40,6 +56,7 @@ class FileStreamSource(
metadataPath: String,
options: Map[String, String]) extends Source with Logging {

<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1
import FileStreamSource._

private val sourceOptions = new FileStreamOptions(options)
Expand All @@ -51,6 +68,11 @@ class FileStreamSource(

private val metadataLog = new HDFSMetadataLog[Array[FileEntry]](sparkSession, metadataPath)

=======
private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
private val metadataLog = new FileStreamSourceLog(sparkSession, metadataPath)
>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/** Maximum number of new files to be considered in each batch */
Expand Down Expand Up @@ -234,3 +256,86 @@ object FileStreamSource {
}
}
}

class FileStreamSourceLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[Seq[String]](sparkSession, path) {

// Configurations about metadata compaction
private val compactInterval = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL)
require(compactInterval > 0,
s"Please set ${SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key} (was $compactInterval) to a " +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd move (was $compactInterval) at the end of the message.

s"positive value.")

private val fileCleanupDelayMs = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY)

private val isDeletingExpiredLog = sparkSession.conf.get(SQLConf.FILE_SOURCE_LOG_DELETION)

private var compactBatchId: Long = -1L

private def isCompactionBatch(batchId: Long, compactInterval: Long): Boolean = {
batchId % compactInterval == 0
}

override def add(batchId: Long, metadata: Seq[String]): Boolean = {
if (isCompactionBatch(batchId, compactInterval)) {
compactMetadataLog(batchId - 1)
}

super.add(batchId, metadata)
}

private def compactMetadataLog(batchId: Long): Unit = {
// read out compact metadata and merge with new metadata.
val batches = super.get(Some(compactBatchId), Some(batchId))
val totalMetadata = batches.flatMap(_._2)
if (totalMetadata.isEmpty) {
return
}

// Remove old compact metadata file and rewrite.
val renamedPath = new Path(path, s".${batchId.toString}-${UUID.randomUUID.toString}.tmp")
fileManager.rename(batchIdToPath(batchId), renamedPath)

var isSuccess = false
try {
isSuccess = super.add(batchId, totalMetadata)
} catch {
case NonFatal(e) => isSuccess = false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you setting isSuccess to false since it's false already?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it is not necessary, I will remove it.

} finally {
if (!isSuccess) {
// Rollback to the previous status if compaction is failed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/status/state ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rollback code will not execute if the process exits during a compaction operation. You will need cleanup code in the class constructor to handle that case.

fileManager.delete(batchIdToPath(batchId))
fileManager.rename(renamedPath, batchIdToPath(batchId))
return
} else {
fileManager.delete(renamedPath)
}
}

compactBatchId = batchId

// Remove expired metadata log
if (isDeletingExpiredLog) {
removeOlderThan(compactBatchId)
}
}

private def removeOlderThan(batchId: Long): Unit = {
val expiredTime = System.currentTimeMillis() - fileCleanupDelayMs
fileManager.list(metadataPath, new PathFilter {
override def accept(path: Path): Boolean = {
try {
val id = pathToBatchId(path)
id < batchId
} catch {
case _: NumberFormatException =>
false
}
}
}).foreach { f =>
if (f.getModificationTime <= expiredTime) {
fileManager.delete(f.getPath)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,28 @@ object SQLConf {
.internal()
.doc("How long that a file is guaranteed to be visible for all readers.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(60 * 1000L) // 10 minutes
.createWithDefault(60 * 10 * 1000L) // 10 minutes

val FILE_SOURCE_LOG_DELETION = SQLConfigBuilder("spark.sql.streaming.fileSource.log.deletion")
.internal()
.doc("Whether to delete the expired log files in file stream source.")
.booleanConf
.createWithDefault(true)

val FILE_SOURCE_LOG_COMPACT_INTERVAL =
SQLConfigBuilder("spark.sql.streaming.fileSource.log.compactInterval")
.internal()
.doc("Number of log files after which all the previous files " +
"are compacted into the next log file.")
.intConf
.createWithDefault(10)

val FILE_SOURCE_LOG_CLEANUP_DELAY =
SQLConfigBuilder("spark.sql.streaming.fileSource.log.cleanupDelay")
.internal()
.doc("How long in milliseconds a file is guaranteed to be visible for all readers.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(60 * 10 * 1000L) // 10 minutes
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A nitpick but think it'd be easier to "decode" - 10 * 60 * 1000L.


val STREAMING_SCHEMA_INFERENCE =
SQLConfigBuilder("spark.sql.streaming.schemaInference")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,16 @@

package org.apache.spark.sql.streaming

<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1
import java.io.File

import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._
=======
import java.io.{File, FilenameFilter}

import org.scalatest.PrivateMethodTester
>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource

import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
Expand All @@ -30,7 +36,7 @@ import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

class FileStreamSourceTest extends StreamTest with SharedSQLContext {
class FileStreamSourceTest extends StreamTest with SharedSQLContext with PrivateMethodTester {

import testImplicits._

Expand Down Expand Up @@ -623,6 +629,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

<<<<<<< 92ce8d4849a0341c4636e70821b7be57ad3055b1
test("max files per trigger") {
withTempDir { case src =>
var lastFileModTime: Option[Long] = None
Expand Down Expand Up @@ -801,6 +808,63 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
)
}
}
=======
test("clean obsolete metadata log") {
val _sources = PrivateMethod[Seq[Source]]('sources)
val _metadataLog = PrivateMethod[FileStreamSourceLog]('metadataLog)

def verify(execution: StreamExecution)
(batchId: Long, expectedBatches: Int, expectedFileNames: Array[String]): Boolean = {
val fileSource = (execution invokePrivate _sources()).head.asInstanceOf[FileStreamSource]
val metadataLog = fileSource invokePrivate _metadataLog()
val files = new File(metadataLog.metadataPath.toUri.toString).listFiles(
new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
try {
name.toLong
true
} catch {
case _: NumberFormatException => false
}
}
}).map(_.getName)

metadataLog.get(None, Some(batchId)).flatMap(_._2).size === expectedBatches &&
files === expectedFileNames
}

withTempDirs { case (src, tmp) =>
withSQLConf(
SQLConf.FILE_SOURCE_LOG_COMPACT_INTERVAL.key -> "2",
SQLConf.FILE_SOURCE_LOG_CLEANUP_DELAY.key -> "0ms"
) {
val fileStream = createFileStream("text", src.getCanonicalPath)
val filtered = fileStream.filter($"value" contains "keep")

testStream(filtered)(
AddTextFileData("drop1\nkeep2\nkeep3", src, tmp),
CheckAnswer("keep2", "keep3"),
AssertOnQuery(verify(_)(0L, 1, Array("0"))),
AddTextFileData("drop4\nkeep5\nkeep6", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6"),
AssertOnQuery(verify(_)(1L, 2, Array("0", "1"))),
AddTextFileData("drop7\nkeep8\nkeep9", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9"),
AssertOnQuery(verify(_)(2L, 3, Array("1", "2"))),
StopStream,
StartStream(),
AssertOnQuery(verify(_)(2L, 3, Array("1", "2"))),
AddTextFileData("drop10\nkeep11", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11"),
AssertOnQuery(verify(_)(3L, 4, Array("1", "2", "3"))),
AddTextFileData("drop12\nkeep13", src, tmp),
CheckAnswer("keep2", "keep3", "keep5", "keep6", "keep8", "keep9", "keep11", "keep13"),
AssertOnQuery(verify(_)(4L, 5, Array("3", "4")))
)
}
}
}
>>>>>>> Add the ability to remove the old MetadataLog in FileStreamSource
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down