Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6c9acde
Initial version of changes to Source trait
frreiss Aug 9, 2016
dae72ff
Changes to files that depend on the Source trait
frreiss Aug 11, 2016
f78b4d5
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 11, 2016
cf426fa
Added method to garbage-collect the metadata log.
frreiss Aug 15, 2016
c028432
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 15, 2016
f92a9a7
Fixing problems with building from Maven.
frreiss Aug 16, 2016
4cd181d
Various bug fixes.
frreiss Aug 19, 2016
fcc90bd
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 19, 2016
35cdae9
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 22, 2016
9096c56
Merge branch 'master' of https://github.com/apache/spark into fred-16…
frreiss Aug 27, 2016
ecaf732
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
5638281
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
43ffbf3
Removed a few blank lines.
frreiss Aug 29, 2016
f5c15f8
Additional whitespace cleanup.
frreiss Aug 29, 2016
a79c557
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 30, 2016
7c6a30d
Narrowing the size of the diff by moving some changes out to future w…
frreiss Aug 31, 2016
5e340c2
Fixed a regression introduced in an earlier merge.
frreiss Sep 8, 2016
128f7fe
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 28, 2016
6334a4b
Fixed compilation problem from merging someone else's PR.
frreiss Sep 28, 2016
09e4b8e
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 29, 2016
aaf0307
Removed a safety check that was invalidated by SPARK-17643 and fixed …
frreiss Sep 29, 2016
947b510
Updating regression tests after merge.
frreiss Sep 30, 2016
ed887ca
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 15, 2016
ec67429
Changes to address review comments.
frreiss Oct 15, 2016
e7ef7ab
Fix compilation problems.
frreiss Oct 15, 2016
7d98c6b
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 19, 2016
c726549
Changes to address review comments.
frreiss Oct 21, 2016
47eee52
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 21, 2016
46f6411
Commit before merge.
frreiss Oct 26, 2016
d9eaf5a
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 26, 2016
0a56e4a
Addressing review comments.
frreiss Oct 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Various bug fixes.
  • Loading branch information
frreiss committed Aug 19, 2016
commit 4cd181dbb4f16fbb2a08b7405d4f9d10274529fc
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.apache.spark.util.collection.OpenHashSet
* <li><b>deleteCommittedFiles</b>: If true, the source will delete old files and
* clean up associated internal metadata when Spark has completed processing
* the data in those files.
* Default: False.
* Default: true.
* </ul>
*/
class FileStreamSource(
Expand Down Expand Up @@ -85,10 +85,11 @@ class FileStreamSource(
private def initialize() = {
maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

minBatchId = maxBatchId
metadataLog.get(None, Some(maxBatchId)).foreach {
case (batchId, files) =>
files.foreach(seenFiles.add)
minBatchId = math.max(minBatchId, batchId)
minBatchId = math.min(minBatchId, batchId)
}
}

Expand Down Expand Up @@ -127,6 +128,7 @@ class FileStreamSource(

override def getMaxOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)


override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
val endId = end.asInstanceOf[LongOffset].offset
Expand All @@ -135,6 +137,7 @@ class FileStreamSource(
val files = metadataLog.get(Some(startId + 1), Some(endId)).flatMap(_._2)
logInfo(s"Processing ${files.length} files from ${startId + 1}:$endId")
logTrace(s"Files are:\n\t" + files.mkString("\n\t"))

val newOptions = new CaseInsensitiveMap(options).filterKeys(_ != "path")
val newDataSource =
DataSource(
Expand All @@ -154,10 +157,10 @@ class FileStreamSource(
if (end.isInstanceOf[LongOffset]) {
val lastCommittedBatch = end.asInstanceOf[LongOffset].offset

// Build up a list of batches, then process them one by one
val firstCommittedBatch = math.max(minBatchId, 0)

if (deleteCommittedFiles) {
// Build up a list of batches, then process them one by one
val firstCommittedBatch = math.max(minBatchId, 0)

var batchId = 0L;
for (batchId <- firstCommittedBatch to lastCommittedBatch) {
val files = metadataLog.get(batchId)
Expand All @@ -169,14 +172,13 @@ class FileStreamSource(
// Examples of problems to catch: Spark does not have permission to delete
// the file; or the filesystem metadata is corrupt.
files.get.foreach(f => fs.delete(new Path(f), true))

// TODO: Add a "remove" method to HDFSMetadataLog, then add code here to
// remove the metadata for each completed batch. It's important that we
// remove the metadata only after the files are deleted; otherwise we
// may not be able to tell what files to delete after a crash.
}

}

// Clean up metadata for the files we've removed. It's important to do this
// AFTER deleting the files.
metadataLog.trim(minBatchId)
}

minBatchId = lastCommittedBatch
Expand All @@ -187,7 +189,9 @@ class FileStreamSource(
}
}

override def stop() {}
override def stop(): Unit = {
logTrace(s"Stopping $this")
}

// END methods from Source trait
////////////////////////////////
Expand All @@ -197,7 +201,13 @@ class FileStreamSource(
/**
* Scans the directory and creates batches from any new files found.
*
* Returns the maximum offset that can be retrieved from the source.
* Updates and returns the maximum offset that can be retrieved from the source.
* If `maxFilesPerBatch` is set, will consume the first (`maxFilesPerBatch`) files
* in alphabetical order.
*
* Note that this implementation relies on the fact that StreamExecution calls
* getMaxOffset() exactly once per clock tick. Otherwise the logic for
* `maxFilesPerBatch` will be incorrect.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
Expand All @@ -207,36 +217,24 @@ class FileStreamSource(
logTrace(s"Number of new files = ${newFiles.size})")

// To make the source's behavior less nondeterministic, we process files in
// alphabetical order and ensure that every new file goes into a batch.
val remainingFiles = mutable.Queue[String]()
remainingFiles ++= newFiles.sorted

val batches = ListBuffer[Seq[String]]()
if (maxFilesPerBatch.nonEmpty) {
while (remainingFiles.size > 0) {
batches += remainingFiles.take(maxFilesPerBatch.get)
// alphabetical order.
val batchFiles =
if (maxFilesPerBatch.nonEmpty) {
newFiles.sorted.take(maxFilesPerBatch.get)
} else {
newFiles
}
} else {
batches += remainingFiles
}

newFiles.foreach { file =>
// Pretend we didn't see files that aren't in the batch.
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}

batches.foreach {
case batchFiles =>
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
if (batchFiles.size > 0) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

new LongOffset(maxBatchId)
Expand All @@ -260,7 +258,7 @@ class FileStreamSource(
}

private def getDeleteCommittedFiles(): Boolean = {
val str = options.getOrElse("deleteCommittedFiles", "false")
val str = options.getOrElse("deleteCommittedFiles", "true")
try {
str.toBoolean
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ class StreamExecution(
val srcOffset = src.getMinOffset
if (srcOffset.isDefined && srcOffset.get > checkptOffset) {
logWarning(s"Source $src lost offsets between $checkptOffset " +
s"and $srcOffset when resuming. Skipping ahead to $srcOffset.")
s"and ${srcOffset.get} when resuming. Skipping ahead to ${srcOffset.get}.")
offsetChanges += (src -> srcOffset.get)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
@GuardedBy("this")
protected var currentOffset: LongOffset = new LongOffset(-1)

/**
* Last offset that was discarded, or -1 if no commits have occurred. Note that the value
* -1 is used in calculations below and isn't just an arbitrary constant.
*/
@GuardedBy("this")
protected var lastCommittedOffset : LongOffset = new LongOffset(-1)

Expand Down Expand Up @@ -108,14 +112,15 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
}

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1

// Internal buffer only holds the batches after lastCommittedOffset
// Internal buffer only holds the batches after lastCommittedOffset.
val newBlocks = synchronized {
val sliceStart = startOrdinal - lastCommittedOffset.offset.toInt
val sliceEnd = endOrdinal - lastCommittedOffset.offset.toInt
val sliceStart = startOrdinal - lastCommittedOffset.offset.toInt - 1
val sliceEnd = endOrdinal - lastCommittedOffset.offset.toInt - 1
batches.slice(sliceStart, sliceEnd)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,12 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

/** Create a text file with a single data item */
def createFile(data: Int): File = {
val file = stringToFile(new File(src, s"$data.txt"), data.toString)
// Use 2 character file names padded with zeros so that alphabetical and
// numeric order are the same for the generated file names.
val file = stringToFile(new File(src, f"$data%02d.txt"), data.toString)

// File modification times aren't currently used to decide what goes into
// the next batch, but they may be used in the future.
if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
lastFileModTime = Some(file.lastModified)
file
Expand Down Expand Up @@ -727,6 +732,48 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
}
}
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line no longer exists after merging the changes from [https://github.com//pull/14728] and replacing my former implementation of GC for FileStreamSource with the one in that merged PR.

test("deleteCommittedFiles option") {
withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.format("text")
.option("deleteCommittedFiles", "false")
.load(src.getCanonicalPath)

testStream(textStream)(
AddTextFileData("kneel", src, tmp),
CheckAnswer("kneel"),
StopStream,
AddTextFileData("before", src, tmp),
StartStream(),
CheckAnswer("kneel", "before"),
AddTextFileData("zod", src, tmp),
CheckAnswer("kneel", "before", "zod")
)

assert(src.listFiles().size === 3)
}

withTempDirs { case (src, tmp) =>
val textStream = spark.readStream.format("text")
.option("deleteCommittedFiles", "true")
.load(src.getCanonicalPath)

testStream(textStream)(
AddTextFileData("it's only", src, tmp),
CheckAnswer("it's only"),
StopStream,
AddTextFileData("a", src, tmp),
StartStream(),
CheckAnswer("it's only", "a"),
AddTextFileData("model", src, tmp),
CheckAnswer("it's only", "a", "model")
)

assert(src.listFiles().size === 1)
}
}

}

class FileStreamSourceStressTestSuite extends FileStreamSourceTest {
Expand Down