Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6c9acde
Initial version of changes to Source trait
frreiss Aug 9, 2016
dae72ff
Changes to files that depend on the Source trait
frreiss Aug 11, 2016
f78b4d5
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 11, 2016
cf426fa
Added method to garbage-collect the metadata log.
frreiss Aug 15, 2016
c028432
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 15, 2016
f92a9a7
Fixing problems with building from Maven.
frreiss Aug 16, 2016
4cd181d
Various bug fixes.
frreiss Aug 19, 2016
fcc90bd
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 19, 2016
35cdae9
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 22, 2016
9096c56
Merge branch 'master' of https://github.com/apache/spark into fred-16…
frreiss Aug 27, 2016
ecaf732
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
5638281
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
43ffbf3
Removed a few blank lines.
frreiss Aug 29, 2016
f5c15f8
Additional whitespace cleanup.
frreiss Aug 29, 2016
a79c557
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 30, 2016
7c6a30d
Narrowing the size of the diff by moving some changes out to future w…
frreiss Aug 31, 2016
5e340c2
Fixed a regression introduced in an earlier merge.
frreiss Sep 8, 2016
128f7fe
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 28, 2016
6334a4b
Fixed compilation problem from merging someone else's PR.
frreiss Sep 28, 2016
09e4b8e
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 29, 2016
aaf0307
Removed a safety check that was invalidated by SPARK-17643 and fixed …
frreiss Sep 29, 2016
947b510
Updating regression tests after merge.
frreiss Sep 30, 2016
ed887ca
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 15, 2016
ec67429
Changes to address review comments.
frreiss Oct 15, 2016
e7ef7ab
Fix compilation problems.
frreiss Oct 15, 2016
7d98c6b
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 19, 2016
c726549
Changes to address review comments.
frreiss Oct 21, 2016
47eee52
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 21, 2016
46f6411
Commit before merge.
frreiss Oct 26, 2016
d9eaf5a
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 26, 2016
0a56e4a
Addressing review comments.
frreiss Oct 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Changes to files that depend on the Source trait
  • Loading branch information
frreiss committed Aug 11, 2016
commit dae72ff923edbbeb0f0ac2fd251a25d8372ad5fa
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.streaming

import scala.collection.mutable
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.util.Try

import org.apache.hadoop.fs.Path
Expand All @@ -31,7 +33,17 @@ import org.apache.spark.util.collection.OpenHashSet
/**
* A very simple source that reads text files from the given directory as they appear.
*
* TODO Clean up the metadata files periodically
* Special options that this source can take (via `options`):
* <ul>
* <li><b>maxFilesPerTrigger</b>: The maximum number of files to include in a
* microbatch. If more than this number of files appear at once, an arbitrary
* subset of the files of this size will be used for the next batch.
* Default: No limit.
* <li><b>deleteCommittedFiles</b>: If true, the source will delete old files and
* clean up associated internal metadata when Spark has completed processing
* the data in those files.
* Default: False.
* </ul>
*/
class FileStreamSource(
sparkSession: SparkSession,
Expand All @@ -44,42 +56,44 @@ class FileStreamSource(
private val fs = new Path(path).getFileSystem(sparkSession.sessionState.newHadoopConf())
private val qualifiedBasePath = fs.makeQualified(new Path(path)) // can contains glob patterns
private val metadataLog = new HDFSMetadataLog[Seq[String]](sparkSession, metadataPath)
private var maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

/**
* ID of the last batch committed and cleaned up, or -1 if no files have been removed.
*/
private var minBatchId = -1L

/**
* ID of the most recent batch that has been entered into the metadata log, or -1 if
* no files have been processed at all.
*/
private var maxBatchId = -1L

/** Maximum number of new files to be considered in each batch */
private val maxFilesPerBatch = getMaxFilesPerBatch()

/**
* Should files whose data has been committed be removed from the directory, along with
* their metadata entries?
*/
private val deleteCommittedFiles = getDeleteCommittedFiles()

private val seenFiles = new OpenHashSet[String]
metadataLog.get(None, Some(maxBatchId)).foreach { case (batchId, files) =>
files.foreach(seenFiles.add)
}

/**
* Returns the maximum offset that can be retrieved from the source.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
* Initialize the state of this source from the on-disk checkpoint, if present.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}
logTrace(s"Number of new files = ${newFiles.size})")
logTrace(s"Number of files selected for batch = ${batchFiles.size}")
logTrace(s"Number of seen files = ${seenFiles.size}")
if (batchFiles.nonEmpty) {
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}
private def initialize() = {
maxBatchId = metadataLog.getLatest().map(_._1).getOrElse(-1L)

new LongOffset(maxBatchId)
metadataLog.get(None, Some(maxBatchId)).foreach {
case (batchId, files) =>
files.foreach(seenFiles.add)
minBatchId = math.max(minBatchId, batchId)
}
}

initialize()

/**
* For test only. Run `func` with the internal lock to make sure when `func` is running,
* the current offset won't be changed and no new batch will be emitted.
Expand All @@ -93,9 +107,26 @@ class FileStreamSource(
new LongOffset(maxBatchId)
}

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

//////////////////////////////////
// BEGIN methods from Source trait

/**
* Returns the data that is between the offsets (`start`, `end`].
* Returns the highest offset that this source has <b>removed</b> from its internal buffer
* in response to a call to `commit`.
* Returns `None` if this source has not removed any data.
*/
override def getMinOffset: Option[Offset] = {
if (-1L == minBatchId) {
None
} else {
Some(new LongOffset(minBatchId))
}
}

override def getMaxOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(-1L)
val endId = end.asInstanceOf[LongOffset].offset
Expand All @@ -115,6 +146,113 @@ class FileStreamSource(
Dataset.ofRows(sparkSession, LogicalRelation(newDataSource.resolveRelation()))
}

/**
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
override def commit(end: Offset): Unit = {
if (end.isInstanceOf[LongOffset]) {
val lastCommittedBatch = end.asInstanceOf[LongOffset].offset

// Build up a list of batches, then process them one by one
val firstCommittedBatch = math.max(minBatchId, 0)

if (deleteCommittedFiles) {
var batchId = 0L;
for (batchId <- firstCommittedBatch to lastCommittedBatch) {
val files = metadataLog.get(batchId)
if (files.isDefined) {

// Files may actually be directories, so use the recursive version
// of the delete method.
// TODO: This delete() should be wrapped in more error handling.
// Examples of problems to catch: Spark does not have permission to delete
// the file; or the filesystem metadata is corrupt.
files.get.foreach(f => fs.delete(new Path(f), true))

// TODO: Add a "remove" method to HDFSMetadataLog, then add code here to
// remove the metadata for each completed batch. It's important that we
// remove the metadata only after the files are deleted; otherwise we
// may not be able to tell what files to delete after a crash.
}

}
}

minBatchId = lastCommittedBatch

val offsetDiff = (newOffset.offset - lastCommittedOffset.offset).toInt

if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastCommittedOffset followed by $end")
}

batches.trimStart(offsetDiff)
lastCommittedOffset = newOffset
} else {
sys.error(s"FileStreamSource.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
}


}

override def stop() {}

// END methods from Source trait
////////////////////////////////

// All methods that follow are internal to this class.

/**
* Scans the directory and creates batches from any new files found.
*
* Returns the maximum offset that can be retrieved from the source.
*
* `synchronized` on this method is for solving race conditions in tests. In the normal usage,
* there is no race here, so the cost of `synchronized` should be rare.
*/
private def fetchMaxOffset(): LongOffset = synchronized {
val newFiles = fetchAllFiles().filter(!seenFiles.contains(_))
logTrace(s"Number of new files = ${newFiles.size})")

// To make the source's behavior less nondeterministic, we process files in
// alphabetical order and ensure that every new file goes into a batch.
val remainingFiles = mutable.Queue[String]()
remainingFiles ++= newFiles.sorted

val batches = ListBuffer[Seq[String]]()
if (maxFilesPerBatch.nonEmpty) {
while (remainingFiles.size > 0) {
batches += remainingFiles.take(maxFilesPerBatch.get)
}
} else {
batches += remainingFiles
}

newFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}

batches.foreach {
case batchFiles =>
maxBatchId += 1
metadataLog.add(maxBatchId, batchFiles)
logInfo(s"Max batch id increased to $maxBatchId with ${batchFiles.size} new files")
}

val batchFiles =
if (maxFilesPerBatch.nonEmpty) newFiles.take(maxFilesPerBatch.get) else newFiles
batchFiles.foreach { file =>
seenFiles.add(file)
logDebug(s"New file: $file")
}

new LongOffset(maxBatchId)
}


private def fetchAllFiles(): Seq[String] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
Expand All @@ -132,6 +270,16 @@ class FileStreamSource(
files
}

private def getDeleteCommittedFiles(): Boolean = {
val str = options.getOrElse("deleteCommittedFiles", "false")
try {
str.toBoolean
} catch {
case _ => throw new IllegalArgumentException(
s"Invalid value '$str' for option 'deleteCommittedFiles', must be true or false")
}
}

private def getMaxFilesPerBatch(): Option[Int] = {
new CaseInsensitiveMap(options)
.get("maxFilesPerTrigger")
Expand All @@ -143,9 +291,6 @@ class FileStreamSource(
}
}

override def getOffset: Option[Offset] = Some(fetchMaxOffset()).filterNot(_.offset == -1)

override def toString: String = s"FileStreamSource[$qualifiedBasePath]"

override def stop() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,15 @@ trait Source {

/**
* Returns the highest offset that this source has <b>removed</b> from its internal buffer
* in response to a call to `doneWithBatch`.
* Returns `None` if this source has not removed any data. */
* in response to a call to `commit`.
* Returns `None` if this source has not removed any data.
*/
def getMinOffset: Option[Offset]

/** Returns the maximum available offset for this source. */
/**
* Returns the maximum available offset for this source.
* Returns `None` if this source has never received any data.
*/
def getMaxOffset: Option[Offset]

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.locks.ReentrantLock

import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.util.control.NonFatal

import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -72,13 +74,17 @@ class StreamExecution(
/**
* Tracks how much data we have processed and committed to the sink or state store from each
* input source.
* Only the scheduler thread should modify this field, and only in atomic steps. Other threads
* must create a local copy before iterating over this data structure.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the first sentance. I'm not sure I understand the second.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant to say is that two accesses to this field from a thread other than the scheduler thread may return different different values. The functions that access committedOffsets from other threads either should access the field only once (toDebugString); or they should make a copy (as in the code that consumes StreamingQueryStatus objects). Updated the comment in my local copy to something that is hopefully more clear: "Other threads should make a shallow copy if they are going to access this field more than once, since the field's value may change at any time."

*/
@volatile
private[sql] var committedOffsets = new StreamProgress

/**
* Tracks the offsets that are available to be processed, but have not yet be committed to the
* sink.
* Only the scheduler thread should modify this field, and only in atomic steps. Other threads
* must create a local copy before iterating over this data structure.
*/
@volatile
private var availableOffsets = new StreamProgress
Expand Down Expand Up @@ -248,6 +254,21 @@ class StreamExecution(
logDebug(s"Resuming with committed offsets: $committedOffsets")
}

// Compare the offsets we just read from the checkpoint against the
// sources' own checkpoint data.
val offsetChanges = mutable.Map[Source, Offset]()
committedOffsets.map {
case (src, checkptOffset) =>
val srcOffset = src.getMinOffset
if (srcOffset.isDefined && srcOffset.get > checkptOffset) {
logWarning(s"Source $src lost offsets between $checkptOffset " +
s"and $srcOffset when resuming. Skipping ahead to $srcOffset.")
offsetChanges += (src -> srcOffset.get)
}
}
committedOffsets ++= offsetChanges


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in my local copy.

case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
Expand Down Expand Up @@ -277,7 +298,7 @@ class StreamExecution(
val hasNewData = {
awaitBatchLock.lock()
try {
val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o))
val newData = uniqueSources.flatMap(s => s.getMaxOffset.map(o => s -> o))
availableOffsets ++= newData

if (dataAvailable) {
Expand All @@ -294,6 +315,12 @@ class StreamExecution(
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
logInfo(s"Committed offsets for batch $currentBatchId.")

// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
// sources to discard batches from the *previous* batch.
committedOffsets.foreach {
case (src, off) => src.commit(off)
}
} else {
awaitBatchLock.lock()
try {
Expand Down Expand Up @@ -374,6 +401,8 @@ class StreamExecution(
logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
// Update committed offsets.
committedOffsets ++= availableOffsets


postEvent(new QueryProgress(this.toInfo))
}

Expand All @@ -399,7 +428,7 @@ class StreamExecution(

/**
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `Offset`. This method is indented for use primarily when writing tests.
* least the given `Offset`. This method is intended for use primarily when writing tests.
*/
def awaitOffset(source: Source, newOffset: Offset): Unit = {
def notDone = {
Expand Down
Loading