Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -111,22 +111,52 @@ class FileStreamSource(

logInfo(s"maxFilesPerBatch = $maxFilesPerBatch, maxFileAgeMs = $maxFileAgeMs")

private var unreadFiles: Seq[(String, Long)] = _

/**
* Returns the maximum offset that can be retrieved from the source.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(limit: ReadLimit): FileStreamSourceOffset = synchronized {
// All the new files found - ignore aged files and files that we have seen.
val newFiles = fetchAllFiles().filter {
case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
val newFiles = if (unreadFiles != null) {
logDebug(s"Reading from unread files - ${unreadFiles.size} files are available.")
unreadFiles
} else {
// All the new files found - ignore aged files and files that we have seen.
fetchAllFiles().filter {
case (path, timestamp) => seenFiles.isNewFile(path, timestamp)
}
}

// Obey user's setting to limit the number of files in this batch trigger.
val batchFiles = limit match {
case files: ReadMaxFiles => newFiles.take(files.maxFiles())
case _: ReadAllAvailable => newFiles
val (batchFiles, unselectedFiles) = limit match {
case files: ReadMaxFiles if !sourceOptions.latestFirst =>
// we can cache and reuse remaining fetched list of files in further batches
val (bFiles, usFiles) = newFiles.splitAt(files.maxFiles())
if (usFiles.size < files.maxFiles() * DISCARD_UNSEEN_FILES_RATIO) {
// Discard unselected files if the number of files are smaller than threshold.
// This is to avoid the case when the next batch would have too few files to read
// whereas there're new files available.
(bFiles, null)
} else {
(bFiles, usFiles)
}

case files: ReadMaxFiles =>
// implies "sourceOptions.latestFirst = true" which we want to refresh the list per batch
(newFiles.take(files.maxFiles()), null)

case _: ReadAllAvailable => (newFiles, null)
}

if (unselectedFiles != null && unselectedFiles.nonEmpty) {
unreadFiles = unselectedFiles
logTrace(s"${unreadFiles.size} unread files are available for further batches.")
} else {
unreadFiles = null
logTrace(s"No unread file is available for further batches.")
}

batchFiles.foreach { file =>
Expand All @@ -139,6 +169,7 @@ class FileStreamSource(
s"""
|Number of new files = ${newFiles.size}
|Number of files selected for batch = ${batchFiles.size}
|Number of unread files = ${Option(unreadFiles).map(_.size).getOrElse(0)}
|Number of seen files = ${seenFiles.size}
|Number of files purged from tracking map = $numPurged
""".stripMargin)
Expand Down Expand Up @@ -311,6 +342,8 @@ object FileStreamSource {
/** Timestamp for file modification time, in ms since January 1, 1970 UTC. */
type Timestamp = Long

val DISCARD_UNSEEN_FILES_RATIO = 0.2

case class FileEntry(path: String, timestamp: Timestamp, batchId: Long) extends Serializable

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.streaming

import java.io.File
import java.net.URI
import java.util.concurrent.atomic.AtomicLong

import scala.collection.mutable
import scala.util.Random
Expand All @@ -32,11 +33,11 @@ import org.scalatest.time.SpanSugar._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.read.streaming.ReadLimit
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap, SourceFileArchiver}
import org.apache.spark.sql.execution.streaming.sources.MemorySink
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -997,15 +998,6 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}

test("when schema inference is turned on, should read partition data") {
def createFile(content: String, src: File, tmp: File): Unit = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)
require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}")
require(src.mkdirs(), s"Cannot create $src")
require(src.isDirectory(), s"$src is not a directory")
require(stringToFile(tempFile, content).renameTo(finalFile))
}

withSQLConf(SQLConf.STREAMING_SCHEMA_INFERENCE.key -> "true") {
withTempDirs { case (dir, tmp) =>
val partitionFooSubDir = new File(dir, "partition=foo")
Expand Down Expand Up @@ -1602,6 +1594,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}

test("do not recheck that files exist during getBatch") {
val scheme = ExistsThrowsExceptionFileSystem.scheme
withTempDir { temp =>
spark.conf.set(
s"fs.$scheme.impl",
Expand Down Expand Up @@ -1935,6 +1928,129 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
assert(expectedDir.exists())
assert(expectedDir.list().exists(_.startsWith(filePrefix)))
}

private def withCountListingLocalFileSystemAsLocalFileSystem(body: => Unit): Unit = {
val optionKey = s"fs.${CountListingLocalFileSystem.scheme}.impl"
val originClassForLocalFileSystem = spark.conf.getOption(optionKey)
try {
spark.conf.set(optionKey, classOf[CountListingLocalFileSystem].getName)
body
} finally {
originClassForLocalFileSystem match {
case Some(fsClazz) => spark.conf.set(optionKey, fsClazz)
case _ => spark.conf.unset(optionKey)
}
}
}

test("Caches and leverages unread files") {
withCountListingLocalFileSystemAsLocalFileSystem {
withThreeTempDirs { case (src, meta, tmp) =>
val options = Map("latestFirst" -> "false", "maxFilesPerTrigger" -> "10")
val scheme = CountListingLocalFileSystem.scheme
val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
StructType(Nil), Seq.empty, meta.getCanonicalPath, options)
val _metadataLog = PrivateMethod[FileStreamSourceLog](Symbol("metadataLog"))
val metadataLog = source invokePrivate _metadataLog()

def verifyBatch(
offset: FileStreamSourceOffset,
expectedBatchId: Long,
inputFiles: Seq[File],
expectedListingCount: Int): Unit = {
val batchId = offset.logOffset
assert(batchId === expectedBatchId)

val files = metadataLog.get(batchId).getOrElse(Array.empty[FileEntry])
assert(files.forall(_.batchId == batchId))

val actualInputFiles = files.map { p => new Path(p.path).toUri.getPath }
val expectedInputFiles = inputFiles.slice(batchId.toInt * 10, batchId.toInt * 10 + 10)
.map(_.getCanonicalPath)
assert(actualInputFiles === expectedInputFiles)

assert(expectedListingCount === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
}

CountListingLocalFileSystem.resetCount()

// provide 41 files in src, with sequential "last modified" to guarantee ordering
val inputFiles = (0 to 40).map { idx =>
val f = createFile(idx.toString, new File(src, idx.toString), tmp)
f.setLastModified(idx * 10000)
f
}

// 4 batches will be available for 40 input files
(0 to 3).foreach { batchId =>
val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
verifyBatch(offsetBatch, expectedBatchId = batchId, inputFiles, expectedListingCount = 1)
}

// batch 5 will trigger list operation though the batch 4 should have 1 unseen file:
// 1 is smaller than the threshold (refer FileStreamSource.DISCARD_UNSEEN_FILES_RATIO),
// hence unseen files for batch 4 will be discarded.
val offsetBatch = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
assert(4 === offsetBatch.logOffset)
assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))

val offsetBatch2 = source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(10))
.asInstanceOf[FileStreamSourceOffset]
// latestOffset returns the offset for previous batch which means no new batch is presented
assert(4 === offsetBatch2.logOffset)
// listing should be performed after the list of unread files are exhausted
assert(3 === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
}
}
}

test("Don't cache unread files when latestFirst is true") {
withCountListingLocalFileSystemAsLocalFileSystem {
withThreeTempDirs { case (src, meta, tmp) =>
val options = Map("latestFirst" -> "true", "maxFilesPerTrigger" -> "5")
val scheme = CountListingLocalFileSystem.scheme
val source = new FileStreamSource(spark, s"$scheme:///${src.getCanonicalPath}/*/*", "text",
StructType(Nil), Seq.empty, meta.getCanonicalPath, options)

CountListingLocalFileSystem.resetCount()

// provide 20 files in src, with sequential "last modified" to guarantee ordering
(0 to 19).map { idx =>
val f = createFile(idx.toString, new File(src, idx.toString), tmp)
f.setLastModified(idx * 10000)
f
}

source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
.asInstanceOf[FileStreamSourceOffset]
assert(1 === CountListingLocalFileSystem.pathToNumListStatusCalled
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it worth to check nothing not relevant is inside. This probably indicate the need of some reset functionality for pathToNumListStatusCalled...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I've meant here is that the test should fail if some nasty code puts irrelevant data into the map. For example when I put (just for the sake of representation) the following:

        CountListingLocalFileSystem.resetCount()
        CountListingLocalFileSystem.pathToNumListStatusCalled.put("foo", new AtomicLong(1))

it would be good to fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your example is now failing because I added check for counting the element of pathToNumListStatusCalled. Does it address your comment?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sigh I realized I didn't push the change. Sorry about it. Will push.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I have to revert it. My bad. I remembered why I only checked the directory - this requires all input files to be verified, which is actually redundant, as we already verified such behavior from the UT "Caches and leverages unread files".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if it's checked in the positive case it still holds value in this negative case unless we find a pretty good reason why it's not possible. Negative case code parts can list unnecessary dirs/files.

Copy link
Contributor Author

@HeartSaVioR HeartSaVioR Apr 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure we want to verify whole behavior of file stream source in this PR. This test only makes sure the calls of listing input directory (and input files as well) are expected, other checks are redundant and error-prone. E.g. Suppose file stream source employs some changes to read side due to some changes, then this test will fail unintentionally.

EDIT: it might be true for input files as well, but that may be the one of important things we may want to watch. (And we checked it in other test I've added.) Other paths are not that important relatively.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only makes sure the calls of listing input directory (and input files as well) are expected

Making sure that the modified code doesn't introduce further unintended directory listing is also important but I agree not with the price to make test failures when somebody makes modification in the stream source code. All in all I agree not to add it since we've double checked that no further unintended directory listing introduced.

.get(src.getCanonicalPath).map(_.get()).getOrElse(0))

// Even though the first batch doesn't read all available files, since latestFirst is true,
// file stream source will not leverage unread files - next batch will also trigger
// listing files
source.latestOffset(FileStreamSourceOffset(-1L), ReadLimit.maxFiles(5))
.asInstanceOf[FileStreamSourceOffset]
assert(2 === CountListingLocalFileSystem.pathToNumListStatusCalled
.get(src.getCanonicalPath).map(_.get()).getOrElse(0))
}
}
}

private def createFile(content: String, src: File, tmp: File): File = {
val tempFile = Utils.tempFileWith(new File(tmp, "text"))
val finalFile = new File(src, tempFile.getName)
require(!src.exists(), s"$src exists, dir: ${src.isDirectory}, file: ${src.isFile}")
require(src.mkdirs(), s"Cannot create $src")
require(src.isDirectory(), s"$src is not a directory")
require(stringToFile(tempFile, content).renameTo(finalFile))
finalFile
}
}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand All @@ -1961,6 +2077,8 @@ class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
* `DataSource.resolveRelation`.
*/
class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
import ExistsThrowsExceptionFileSystem._

override def getUri: URI = {
URI.create(s"$scheme:///")
}
Expand All @@ -1980,3 +2098,24 @@ class ExistsThrowsExceptionFileSystem extends RawLocalFileSystem {
object ExistsThrowsExceptionFileSystem {
val scheme = s"FileStreamSourceSuite${math.abs(Random.nextInt)}fs"
}

class CountListingLocalFileSystem extends RawLocalFileSystem {
import CountListingLocalFileSystem._

override def getUri: URI = {
URI.create(s"$scheme:///")
}

override def listStatus(f: Path): Array[FileStatus] = {
val curVal = pathToNumListStatusCalled.getOrElseUpdate(f.toUri.getPath, new AtomicLong(0))
curVal.incrementAndGet()
super.listStatus(f)
}
}

object CountListingLocalFileSystem {
val scheme = s"CountListingLocalFileSystem${math.abs(Random.nextInt)}fs"
val pathToNumListStatusCalled = new mutable.HashMap[String, AtomicLong]

def resetCount(): Unit = pathToNumListStatusCalled.clear()
}