Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6c9acde
Initial version of changes to Source trait
frreiss Aug 9, 2016
dae72ff
Changes to files that depend on the Source trait
frreiss Aug 11, 2016
f78b4d5
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 11, 2016
cf426fa
Added method to garbage-collect the metadata log.
frreiss Aug 15, 2016
c028432
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 15, 2016
f92a9a7
Fixing problems with building from Maven.
frreiss Aug 16, 2016
4cd181d
Various bug fixes.
frreiss Aug 19, 2016
fcc90bd
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 19, 2016
35cdae9
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 22, 2016
9096c56
Merge branch 'master' of https://github.com/apache/spark into fred-16…
frreiss Aug 27, 2016
ecaf732
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
5638281
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
43ffbf3
Removed a few blank lines.
frreiss Aug 29, 2016
f5c15f8
Additional whitespace cleanup.
frreiss Aug 29, 2016
a79c557
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 30, 2016
7c6a30d
Narrowing the size of the diff by moving some changes out to future w…
frreiss Aug 31, 2016
5e340c2
Fixed a regression introduced in an earlier merge.
frreiss Sep 8, 2016
128f7fe
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 28, 2016
6334a4b
Fixed compilation problem from merging someone else's PR.
frreiss Sep 28, 2016
09e4b8e
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 29, 2016
aaf0307
Removed a safety check that was invalidated by SPARK-17643 and fixed …
frreiss Sep 29, 2016
947b510
Updating regression tests after merge.
frreiss Sep 30, 2016
ed887ca
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 15, 2016
ec67429
Changes to address review comments.
frreiss Oct 15, 2016
e7ef7ab
Fix compilation problems.
frreiss Oct 15, 2016
7d98c6b
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 19, 2016
c726549
Changes to address review comments.
frreiss Oct 21, 2016
47eee52
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 21, 2016
46f6411
Commit before merge.
frreiss Oct 26, 2016
d9eaf5a
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 26, 2016
0a56e4a
Addressing review comments.
frreiss Oct 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Narrowing the size of the diff by moving some changes out to future w…
…ork.
  • Loading branch information
frreiss committed Aug 31, 2016
commit 7c6a30d2da8c31dcd5db8a4337913d5805264306
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ package org.apache.spark.sql.execution.streaming
* - Allow the user to query the latest batch id.
* - Allow the user to query the metadata object of a specified batch id.
* - Allow the user to query metadata objects in a range of batch ids.
* - Inform the log that it is safe to garbage-collect metadata from a batch
*/
trait MetadataLog[T] {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.StructType
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
* will regularly query each [[Source]] to see if any more data is available.
*/
trait Source {
trait Source {

/** Returns the schema of the data from this source */
def schema: StructType
Expand Down Expand Up @@ -59,7 +59,7 @@ trait Source {
* Informs the source that Spark has completed processing all data for offsets less than or
* equal to `end` and will only request offsets greater than `end` in the future.
*/
def commit(end: Offset)
def commit(end: Offset) : Unit = {}

/** Stop this source and free any resources it has allocated. */
def stop(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,6 @@ class StreamExecution(
case (src, off) => src.commit(off)
}

// The log can also discard old metadata. Trim one batch less than we could, just
// in case.
if (currentBatchId > 2) {
offsetLog.purge(currentBatchId - 2)
}
} else {
awaitBatchLock.lock()
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -625,12 +625,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {

/** Create a text file with a single data item */
def createFile(data: Int): File = {
// Use 2 character file names padded with zeros so that alphabetical and
// numeric order are the same for the generated file names.
val file = stringToFile(new File(src, f"$data%02d.txt"), data.toString)

// File modification times aren't currently used to decide what goes into
// the next batch, but they may be used in the future.
val file = stringToFile(new File(src, s"$data.txt"), data.toString)
if (lastFileModTime.nonEmpty) file.setLastModified(lastFileModTime.get + 1000)
lastFileModTime = Some(file.lastModified)
file
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,6 @@ class FakeDefaultSource extends StreamSourceProvider {

override def schema: StructType = StructType(StructField("a", IntegerType) :: Nil)

override def lastCommittedOffset: Option[Offset] = None

override def getOffset: Option[Offset] = {
if (offset >= 10) {
None
Expand All @@ -314,8 +312,6 @@ class FakeDefaultSource extends StreamSourceProvider {
spark.range(startOffset, end.asInstanceOf[LongOffset].offset + 1).toDF("a")
}

override def commit(end: Offset): Unit = {}

override def stop() {}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
new Source {
override def schema: StructType = fakeSchema

override def lastCommittedOffset: Option[Offset] = None

override def getOffset: Option[Offset] = Some(new LongOffset(0))

override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
Expand All @@ -87,8 +85,6 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider {
Seq[Int]().toDS().toDF()
}

override def commit(end: Offset): Unit = {}

override def stop() {}
}
}
Expand Down