Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[SPARK-17165][SQL] FileStreamSource should not track the list of seen…
… files indefinitely
  • Loading branch information
petermaxlee committed Aug 20, 2016
commit ce1dd9c4d6a880751e7b3692f9db8597a88f2d05
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import scala.util.Try

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.datasources.CaseInsensitiveMap
import org.apache.spark.util.Utils

/**
* User specified options for file streams.
*/
class FileStreamOptions(@transient private val parameters: Map[String, String])
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a similar setup to CSVOptions and JSONOptions. I felt it would be easier to track the list of options read by the source here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can remove Serializable and @transient from this class. It's not used in the executor side.

extends Logging with Serializable {

val maxFilesPerTrigger: Option[Int] = parameters.get("maxFilesPerTrigger").map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}

/** Maximum age of a file that can be found in this directory, before it is deleted. */
val maxFileAgeMs: Long =
Utils.timeStringAsMs(parameters.getOrElse("maxFileAge", "24h"))
Copy link
Contributor

@rxin rxin Aug 20, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

24 hour seems too short. Maybe a month or a week?


/** Options as specified by the user, in a case-insensitive map, without "path" set. */
val optionMapWithoutPath: Map[String, String] =
new CaseInsensitiveMap(parameters).filterKeys(_ != "path")
}


object FileStreamOptions {

def apply(): FileStreamOptions = new FileStreamOptions(Map.empty)

def apply(paramName: String, paramValue: String): FileStreamOptions = {
new FileStreamOptions(Map(paramName -> paramValue))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like these two apply methods are not used, it would be better to remove them if not used.

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@

package org.apache.spark.sql.execution.streaming

import scala.util.Try
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.Path

import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.execution.datasources.{DataSource, ListingFileCatalog, LogicalRelation}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.collection.OpenHashSet

/**
* A very simple source that reads text files from the given directory as they appear.
*
* TODO Clean up the metadata files periodically
* A very simple source that reads files from the given directory as they appear.
*/
class FileStreamSource(
sparkSession: SparkSession,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This TODO still applies right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Put it back. Also updated the file with a new test case to make the seen map more robust.

Expand All @@ -41,36 +38,59 @@ class FileStreamSource(
metadataPath: String,
options: Map[String, String]) extends Source with Logging {

import FileStreamSource._

private val sourceOptions = new FileStreamOptions(options)

private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)

private val metadataLog = new HDFSMetadataLog[Seq[FileEntry]](sparkSession, metadataPath)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unnecessary blank lines, that will be more organized.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is only one blank line. It's actually a good idea to have blank lines separating variable definitions, as documented in a lot of different coding style guides. Excessive blank lines are bad though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know it's bike-shedding, but, my personal taste is to keep declaration of member fields together into a block, or if there are many, perhaps logically group them. So just really depends on whether you mean metadataLog to stand a bit separately from other fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

metadataLog is not logically related to qualifiedBasePath

private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = getMaxFilesPerBatch()
private val maxFilesPerBatch = sourceOptions.maxFilesPerTrigger

/** A mapping from a file that we have processed to some timestamp it was last modified. */
// Visible for testing.
val seenFiles = new SeenFilesMap(sourceOptions.maxFileAgeMs)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually you could use scalatest PrivateMethodTester to test private method, it is not necessary to expose only for testing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually useful in general for debugging. I'm going to update the comment.


private val seenFiles = new OpenHashSet[String]
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, entry) =>
entry.foreach(seenFiles.add)
seenFiles.purge()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From my understanding, this purge() can be moved out of this loop and do only once, since it is time consuming.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not safe until we truncate the logs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. let me add it to the option class.

}

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAge = ${sourceOptions.maxFileAgeMs}")

/**
* Returns the maximum offset that can be retrieved from the source.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
// All the new files found - ignore aged files and files that we have seen.
val newFiles = fetchAllFiles().filter(seenFiles.isNewFile)

// Obey user's setting to limit the number of files in this batch trigger.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles

batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}
logTrace(s"Number of new files = ${newFiles.size})")
logTrace(s"Number of files selected for batch = ${batchFiles.size}")
logTrace(s"Number of seen files = ${seenFiles.size}")
val numPurged = seenFiles.purge()

logTrace(
s"""
|Number of new files = ${newFiles.size}
|Number of files selected for batch = ${batchFiles.size}
|Number of seen files = ${seenFiles.size}
|Number of files purged from tracking map = $numPurged
""".stripMargin)

if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
Expand Down Expand Up @@ -104,22 +124,26 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))
val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
val newDataSource =
DataSource(
sparkSession,
paths = files,
paths = files.map(_.path),
userSpecifiedSchema = Some(schema),
className = fileFormatClassName,
options = newOptions)
options = sourceOptions.optionMapWithoutPath)
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}

private def fetchAllFiles(): Seq[String] = {
/**
* Returns a list of files found, sorted by their timestamp.
*/
private def fetchAllFiles(): Seq[FileEntry] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
val catalog = new ListingFileCatalog(sparkSession, globbedPaths, options, Some(new StructType))
val files = catalog.allFiles().sortBy(_.getModificationTime).map(_.getPath.toUri.toString)
val files = catalog.allFiles().sortBy(_.getModificationTime).map { status =>
FileEntry(status.getPath.toUri.toString, status.getModificationTime)
}
val endTime = System.nanoTime
val listingTimeMs = (endTime.toDouble - startTime) / 1000000
if (listingTimeMs > 2000) {
Expand All @@ -132,20 +156,71 @@ class FileStreamSource(
files
}

private def getMaxFilesPerBatch(): Option[Int] = {
new CaseInsensitiveMap(options)
.get("maxFilesPerTrigger")
.map { str =>
Try(str.toInt).toOption.filter(_ > 0).getOrElse {
throw new IllegalArgumentException(
s"Invalid value '$str' for option 'maxFilesPerTrigger', must be a positive integer")
}
}
}

override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

override def stop() {}
}


object FileStreamSource {

/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

case class FileEntry(path: String, timestamp: Timestamp) extends Serializable

/**
* A custom hash map used to track the list of files seen. This map is not thread-safe.
*
* To prevent the hash map from growing indefinitely, a purge function is available to
* remove files "maxAgeMs" older than the latest file.
*/
class SeenFilesMap(maxAgeMs: Long) {
require(maxAgeMs >= 0)

/** Mapping from file to its timestamp. */
private val map = new java.util.HashMap[String, Timestamp]

private var lastTimestamp: Timestamp = 0L

private def ageThreshold: Timestamp = lastTimestamp - maxAgeMs

/** Add a new file to the map. */
def add(file: FileEntry): Unit = {
map.put(file.path, file.timestamp)
if (file.timestamp > lastTimestamp) {
lastTimestamp = file.timestamp
}
}

/**
* Returns true if we should consider this file a new file. The file is only considered "new"
* if it is new enough that we are still tracking, and we have not seen it before.
*/
def isNewFile(file: FileEntry): Boolean = {
file.timestamp > ageThreshold && !map.containsKey(file.path)
}

/** Removes aged entries and returns the number of files removed. */
def purge(): Int = {
val iter = map.entrySet().iterator()
var count = 0
while (iter.hasNext) {
val entry = iter.next()
if (entry.getValue < lastTimestamp - maxAgeMs) {
count += 1
iter.remove()
}
}
count
}

def size: Int = map.size()

def allEntries: Seq[FileEntry] = {
map.entrySet().asScala.map(entry => FileEntry(entry.getKey, entry.getValue)).toSeq
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.SparkFunSuite

class FileStreamSourceSuite extends SparkFunSuite {

import FileStreamSource._

test("SeenFilesMap") {
val map = new SeenFilesMap(maxAgeMs = 10)

map.add(FileEntry("a", 5))
assert(map.size == 1)
map.purge()
assert(map.size == 1)

// Add a new entry and purge should be no-op, since the gap is exactly 10 ms.
map.add(FileEntry("b", 15))
assert(map.size == 2)
map.purge()
assert(map.size == 2)

// Add a new entry that's more than 10 ms than the first entry. We should be able to purge now.
map.add(FileEntry("c", 16))
assert(map.size == 3)
map.purge()
assert(map.size == 2)

// Override existing entry shouldn't change the size
map.add(FileEntry("c", 25))
assert(map.size == 2)

// Not a new file because we have seen c before
assert(!map.isNewFile(FileEntry("c", 20)))

// Not a new file because timestamp is too old
assert(!map.isNewFile(FileEntry("d", 5)))

// Finally a new file: never seen and not too old
assert(map.isNewFile(FileEntry("e", 20)))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -104,12 +104,13 @@ class FileStreamSourceTest extends StreamTest with SharedSQLContext {
def createFileStream(
format: String,
path: String,
schema: Option[StructType] = None): DataFrame = {
schema: Option[StructType] = None,
options: Map[String, String] = Map.empty): DataFrame = {
val reader =
if (schema.isDefined) {
spark.readStream.format(format).schema(schema.get)
spark.readStream.format(format).schema(schema.get).options(options)
} else {
spark.readStream.format(format)
spark.readStream.format(format).options(options)
}
reader.load(path)
}
Expand Down Expand Up @@ -331,6 +332,39 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}

test("SPARK-17165 should not track the list of seen files indefinitely") {
// This test works by:
// 1. Create a file
// 2. Get it processed
// 3. Sleeps for a very short amount of time (larger than maxFileAge
// 4. Add another file (at this point the original file should have been purged
// 5. Test the size of the seenFiles internal data structure

// Note that if we change maxFileAge to a very large number, the last step should fail.
withTempDirs { case (src, tmp) =>
val textStream: DataFrame =
createFileStream("text", src.getCanonicalPath, options = Map("maxFileAge" -> "5ms"))

testStream(textStream)(
AddTextFileData("a\nb", src, tmp),
CheckAnswer("a", "b"),

// SLeeps longer than 5ms (maxFileAge)
AssertOnQuery { _ => Thread.sleep(10); true },

AddTextFileData("c\nd", src, tmp),
CheckAnswer("a", "b", "c", "d"),

AssertOnQuery("seen files should contain only one entry") { streamExecution =>
val source = streamExecution.logicalPlan.collect { case e: StreamingExecutionRelation =>
e.source.asInstanceOf[FileStreamSource]
}.head
source.seenFiles.size == 1
}
)
}
}

// =============== JSON file stream tests ================

test("read from json files") {
Expand Down