Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
6c9acde
Initial version of changes to Source trait
frreiss Aug 9, 2016
dae72ff
Changes to files that depend on the Source trait
frreiss Aug 11, 2016
f78b4d5
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 11, 2016
cf426fa
Added method to garbage-collect the metadata log.
frreiss Aug 15, 2016
c028432
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 15, 2016
f92a9a7
Fixing problems with building from Maven.
frreiss Aug 16, 2016
4cd181d
Various bug fixes.
frreiss Aug 19, 2016
fcc90bd
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 19, 2016
35cdae9
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 22, 2016
9096c56
Merge branch 'master' of https://github.com/apache/spark into fred-16…
frreiss Aug 27, 2016
ecaf732
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
5638281
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 27, 2016
43ffbf3
Removed a few blank lines.
frreiss Aug 29, 2016
f5c15f8
Additional whitespace cleanup.
frreiss Aug 29, 2016
a79c557
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Aug 30, 2016
7c6a30d
Narrowing the size of the diff by moving some changes out to future w…
frreiss Aug 31, 2016
5e340c2
Fixed a regression introduced in an earlier merge.
frreiss Sep 8, 2016
128f7fe
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 28, 2016
6334a4b
Fixed compilation problem from merging someone else's PR.
frreiss Sep 28, 2016
09e4b8e
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Sep 29, 2016
aaf0307
Removed a safety check that was invalidated by SPARK-17643 and fixed …
frreiss Sep 29, 2016
947b510
Updating regression tests after merge.
frreiss Sep 30, 2016
ed887ca
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 15, 2016
ec67429
Changes to address review comments.
frreiss Oct 15, 2016
e7ef7ab
Fix compilation problems.
frreiss Oct 15, 2016
7d98c6b
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 19, 2016
c726549
Changes to address review comments.
frreiss Oct 21, 2016
47eee52
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 21, 2016
46f6411
Commit before merge.
frreiss Oct 26, 2016
d9eaf5a
Merge branch 'master' of https://github.com/apache/spark into fred-16963
frreiss Oct 26, 2016
0a56e4a
Addressing review comments.
frreiss Oct 26, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fixing problems with building from Maven.
  • Loading branch information
frreiss committed Aug 16, 2016
commit f92a9a7ec2e6ee47e47698bec896504b771c27d5
Original file line number Diff line number Diff line change
Expand Up @@ -181,20 +181,10 @@ class FileStreamSource(

minBatchId = lastCommittedBatch

val offsetDiff = (newOffset.offset - lastCommittedOffset.offset).toInt

if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastCommittedOffset followed by $end")
}

batches.trimStart(offsetDiff)
lastCommittedOffset = newOffset
} else {
sys.error(s"FileStreamSource.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
}


}

override def stop() {}
Expand Down Expand Up @@ -252,7 +242,6 @@ class FileStreamSource(
new LongOffset(maxBatchId)
}


private def fetchAllFiles(): Seq[String] = {
val startTime = System.nanoTime
val globbedPaths = SparkHadoopUtil.get.globPathIfNecessary(qualifiedBasePath)
Expand All @@ -275,7 +264,7 @@ class FileStreamSource(
try {
str.toBoolean
} catch {
case _ => throw new IllegalArgumentException(
case _ : Throwable => throw new IllegalArgumentException(
s"Invalid value '$str' for option 'deleteCommittedFiles', must be true or false")
}
}
Expand All @@ -290,7 +279,4 @@ class FileStreamSource(
}
}
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,52 @@ import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.Calendar
import javax.annotation.concurrent.GuardedBy
import javax.xml.transform.stream.StreamSource

import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.ListBuffer
import scala.util.{Failure, Success, Try}

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql._
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}


object TextSocketSource {
val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
StructField("timestamp", TimestampType) :: Nil)
val DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra blank line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in my local copy.


/**
* A source that reads text lines through a TCP socket, designed only for tutorials and debugging.
* This source will *not* work in production applications due to multiple reasons, including no
* support for fault recovery and keeping all of the text read in memory forever.
*/
class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlContext: SQLContext)
extends MemoryStream[(String, Timestamp)](-1, sqlContext) with Logging
extends Source with Logging
{
@GuardedBy("this")
private var socket: Socket = null

@GuardedBy("this")
private var readThread: Thread = null

/**
* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
* Stored in a ListBuffer to facilitate removing committed batches.
*/
@GuardedBy("this")
protected val batches = new ListBuffer[(String, Timestamp)]

@GuardedBy("this")
protected var currentOffset: LongOffset = new LongOffset(-1)

@GuardedBy("this")
protected var lastCommittedOffset : LongOffset = new LongOffset(-1)

initialize()

private def initialize(): Unit = synchronized {
Expand All @@ -72,10 +87,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
return
}
TextSocketSource.this.synchronized {
addData((line,
val newData = (line,
Timestamp.valueOf(
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
))
)
currentOffset = currentOffset + 1
batches.append(newData)
}
}
} catch {
Expand All @@ -90,10 +107,37 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
else TextSocketSource.SCHEMA_REGULAR

override def getMinOffset: Option[Offset] = synchronized {
if (lastCommittedOffset.offset == -1) {
None
} else {
Some(lastCommittedOffset)
}
}

override def getMaxOffset: Option[Offset] = synchronized {
if (currentOffset.offset == -1) {
None
} else {
Some(currentOffset)
}
}

/** Returns the data that is between the offsets (`start`, `end`]. */
override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
val rawBatch = super.getBatch(start, end)
val startOrdinal =
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1

// Internal buffer only holds the batches after lastCommittedOffset
val rawList = synchronized {
val sliceStart = startOrdinal - lastCommittedOffset.offset.toInt
val sliceEnd = endOrdinal - lastCommittedOffset.offset.toInt
batches.slice(sliceStart, sliceEnd)
}

import sqlContext.implicits._
val rawBatch = sqlContext.createDataset(rawList)

// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp
// if requested.
Expand All @@ -105,6 +149,23 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
}
}

override def commit(end: Offset): Unit = synchronized {
if (end.isInstanceOf[LongOffset]) {
val newOffset = end.asInstanceOf[LongOffset]
val offsetDiff = (newOffset.offset - lastCommittedOffset.offset).toInt

if (offsetDiff < 0) {
sys.error(s"Offsets committed out of order: $lastCommittedOffset followed by $end")
}

batches.trimStart(offsetDiff)
lastCommittedOffset = newOffset
} else {
sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
s"originate with an instance of this class")
}
}

/** Stop this source. */
override def stop(): Unit = synchronized {
if (socket != null) {
Expand Down Expand Up @@ -136,7 +197,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
logWarning("The socket source should not be used for production applications! " +
"It does not support recovery and stores state indefinitely.")
"It does not support recovery.")
if (!parameters.contains("host")) {
throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
}
Expand All @@ -160,6 +221,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
parameters: Map[String, String]): Source = {
val host = parameters("host")
val port = parameters("port").toInt

new TextSocketSource(host, port, parseIncludeTimestamp(parameters), sqlContext)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,18 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before

failAfter(streamingTimeout) {
serverThread.enqueue("hello")
while (source.getOffset.isEmpty) {
while (source.getMaxOffset.isEmpty) {
Thread.sleep(10)
}
val offset1 = source.getOffset.get
val offset1 = source.getMaxOffset.get
val batch1 = source.getBatch(None, offset1)
assert(batch1.as[String].collect().toSeq === Seq("hello"))

serverThread.enqueue("world")
while (source.getOffset.get === offset1) {
while (source.getMaxOffset.get === offset1) {
Thread.sleep(10)
}
val offset2 = source.getOffset.get
val offset2 = source.getMaxOffset.get
val batch2 = source.getBatch(Some(offset1), offset2)
assert(batch2.as[String].collect().toSeq === Seq("world"))

Expand Down Expand Up @@ -101,20 +101,20 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before

failAfter(streamingTimeout) {
serverThread.enqueue("hello")
while (source.getOffset.isEmpty) {
while (source.getMaxOffset.isEmpty) {
Thread.sleep(10)
}
val offset1 = source.getOffset.get
val offset1 = source.getMaxOffset.get
val batch1 = source.getBatch(None, offset1)
val batch1Seq = batch1.as[(String, Timestamp)].collect().toSeq
assert(batch1Seq.map(_._1) === Seq("hello"))
val batch1Stamp = batch1Seq(0)._2

serverThread.enqueue("world")
while (source.getOffset.get === offset1) {
while (source.getMaxOffset.get === offset1) {
Thread.sleep(10)
}
val offset2 = source.getOffset.get
val offset2 = source.getMaxOffset.get
val batch2 = source.getBatch(Some(offset1), offset2)
val batch2Seq = batch2.as[(String, Timestamp)].collect().toSeq
assert(batch2Seq.map(_._1) === Seq("world"))
Expand Down