Skip to content

Commit ea205e3

Browse files
frreisszsxwing
authored andcommitted
[SPARK-16963][STREAMING][SQL] Changes to Source trait and related implementation classes
## What changes were proposed in this pull request? This PR contains changes to the Source trait such that the scheduler can notify data sources when it is safe to discard buffered data. Summary of changes: * Added a method `commit(end: Offset)` that tells the Source that is OK to discard all offsets up `end`, inclusive. * Changed the semantics of a `None` value for the `getBatch` method to mean "from the very beginning of the stream"; as opposed to "all data present in the Source's buffer". * Added notes that the upper layers of the system will never call `getBatch` with a start value less than the last value passed to `commit`. * Added a `lastCommittedOffset` method to allow the scheduler to query the status of each Source on restart. This addition is not strictly necessary, but it seemed like a good idea -- Sources will be maintaining their own persistent state, and there may be bugs in the checkpointing code. * The scheduler in `StreamExecution.scala` now calls `commit` on its stream sources after marking each batch as complete in its checkpoint. * `MemoryStream` now cleans committed batches out of its internal buffer. * `TextSocketSource` now cleans committed batches from its internal buffer. ## How was this patch tested? Existing regression tests already exercise the new code. Author: frreiss <[email protected]> Closes #14553 from frreiss/fred-16963. (cherry picked from commit 5b27598) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 72b3cff commit ea205e3

File tree

6 files changed

+154
-36
lines changed

6 files changed

+154
-36
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,15 @@ class FileStreamSource(
176176

177177
override def toString: String = s"FileStreamSource[$qualifiedBasePath]"
178178

179+
/**
180+
* Informs the source that Spark has completed processing all data for offsets less than or
181+
* equal to `end` and will only request offsets greater than `end` in the future.
182+
*/
183+
override def commit(end: Offset): Unit = {
184+
// No-op for now; FileStreamSource currently garbage-collects files based on timestamp
185+
// and the value of the maxFileAge parameter.
186+
}
187+
179188
override def stop() {}
180189
}
181190

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,16 +30,30 @@ trait Source {
3030
/** Returns the schema of the data from this source */
3131
def schema: StructType
3232

33-
/** Returns the maximum available offset for this source. */
33+
/**
34+
* Returns the maximum available offset for this source.
35+
* Returns `None` if this source has never received any data.
36+
*/
3437
def getOffset: Option[Offset]
3538

3639
/**
37-
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then
38-
* the batch should begin with the first available record. This method must always return the
39-
* same data for a particular `start` and `end` pair.
40+
* Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`,
41+
* then the batch should begin with the first record. This method must always return the
42+
* same data for a particular `start` and `end` pair; even after the Source has been restarted
43+
* on a different node.
44+
*
45+
* Higher layers will always call this method with a value of `start` greater than or equal
46+
* to the last value passed to `commit` and a value of `end` less than or equal to the
47+
* last value returned by `getOffset`
4048
*/
4149
def getBatch(start: Option[Offset], end: Offset): DataFrame
4250

51+
/**
52+
* Informs the source that Spark has completed processing all data for offsets less than or
53+
* equal to `end` and will only request offsets greater than `end` in the future.
54+
*/
55+
def commit(end: Offset) : Unit = {}
56+
4357
/** Stop this source and free any resources it has allocated. */
4458
def stop(): Unit
4559
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,19 @@ class StreamExecution(
7373
/**
7474
* Tracks how much data we have processed and committed to the sink or state store from each
7575
* input source.
76+
* Only the scheduler thread should modify this field, and only in atomic steps.
77+
* Other threads should make a shallow copy if they are going to access this field more than
78+
* once, since the field's value may change at any time.
7679
*/
7780
@volatile
7881
var committedOffsets = new StreamProgress
7982

8083
/**
8184
* Tracks the offsets that are available to be processed, but have not yet be committed to the
8285
* sink.
86+
* Only the scheduler thread should modify this field, and only in atomic steps.
87+
* Other threads should make a shallow copy if they are going to access this field more than
88+
* once, since the field's value may change at any time.
8389
*/
8490
@volatile
8591
private var availableOffsets = new StreamProgress
@@ -337,17 +343,27 @@ class StreamExecution(
337343
}
338344
if (hasNewData) {
339345
reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
340-
assert(
341-
offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
346+
assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)),
342347
s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId")
343348
logInfo(s"Committed offsets for batch $currentBatchId.")
344349

350+
// NOTE: The following code is correct because runBatches() processes exactly one
351+
// batch at a time. If we add pipeline parallelism (multiple batches in flight at
352+
// the same time), this cleanup logic will need to change.
353+
354+
// Now that we've updated the scheduler's persistent checkpoint, it is safe for the
355+
// sources to discard data from the previous batch.
356+
val prevBatchOff = offsetLog.get(currentBatchId - 1)
357+
if (prevBatchOff.isDefined) {
358+
prevBatchOff.get.toStreamProgress(sources).foreach {
359+
case (src, off) => src.commit(off)
360+
}
361+
}
362+
345363
// Now that we have logged the new batch, no further processing will happen for
346-
// the previous batch, and it is safe to discard the old metadata.
347-
// Note that purge is exclusive, i.e. it purges everything before currentBatchId.
348-
// NOTE: If StreamExecution implements pipeline parallelism (multiple batches in
349-
// flight at the same time), this cleanup logic will need to change.
350-
offsetLog.purge(currentBatchId)
364+
// the batch before the previous batch, and it is safe to discard the old metadata.
365+
// Note that purge is exclusive, i.e. it purges everything before the target ID.
366+
offsetLog.purge(currentBatchId - 1)
351367
}
352368
} else {
353369
awaitBatchLock.lock()
@@ -455,7 +471,7 @@ class StreamExecution(
455471

456472
/**
457473
* Blocks the current thread until processing for data from the given `source` has reached at
458-
* least the given `Offset`. This method is indented for use primarily when writing tests.
474+
* least the given `Offset`. This method is intended for use primarily when writing tests.
459475
*/
460476
private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
461477
def notDone = {

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming
2020
import java.util.concurrent.atomic.AtomicInteger
2121
import javax.annotation.concurrent.GuardedBy
2222

23-
import scala.collection.mutable.ArrayBuffer
23+
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
2424
import scala.util.control.NonFatal
2525

2626
import org.apache.spark.internal.Logging
@@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
5151
protected val logicalPlan = StreamingExecutionRelation(this)
5252
protected val output = logicalPlan.output
5353

54+
/**
55+
* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
56+
* Stored in a ListBuffer to facilitate removing committed batches.
57+
*/
5458
@GuardedBy("this")
55-
protected val batches = new ArrayBuffer[Dataset[A]]
59+
protected val batches = new ListBuffer[Dataset[A]]
5660

5761
@GuardedBy("this")
5862
protected var currentOffset: LongOffset = new LongOffset(-1)
5963

64+
/**
65+
* Last offset that was discarded, or -1 if no commits have occurred. Note that the value
66+
* -1 is used in calculations below and isn't just an arbitrary constant.
67+
*/
68+
@GuardedBy("this")
69+
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
70+
6071
def schema: StructType = encoder.schema
6172

6273
def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
@@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
8596
override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]"
8697

8798
override def getOffset: Option[Offset] = synchronized {
88-
if (batches.isEmpty) {
99+
if (currentOffset.offset == -1) {
89100
None
90101
} else {
91102
Some(currentOffset)
92103
}
93104
}
94105

95-
/**
96-
* Returns the data that is between the offsets (`start`, `end`].
97-
*/
98106
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
107+
// Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal)
99108
val startOrdinal =
100109
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
101110
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
102-
val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) }
111+
112+
// Internal buffer only holds the batches after lastCommittedOffset.
113+
val newBlocks = synchronized {
114+
val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
115+
val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
116+
batches.slice(sliceStart, sliceEnd)
117+
}
103118

104119
logDebug(
105120
s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}")
@@ -111,11 +126,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext)
111126
}
112127
}
113128

129+
override def commit(end: Offset): Unit = synchronized {
130+
end match {
131+
case newOffset: LongOffset =>
132+
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
133+
134+
if (offsetDiff < 0) {
135+
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
136+
}
137+
138+
batches.trimStart(offsetDiff)
139+
lastOffsetCommitted = newOffset
140+
case _ =>
141+
sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " +
142+
"an instance of this class")
143+
}
144+
}
145+
114146
override def stop() {}
115147

116148
def reset(): Unit = synchronized {
117149
batches.clear()
118150
currentOffset = new LongOffset(-1)
151+
lastOffsetCommitted = new LongOffset(-1)
119152
}
120153
}
121154

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala

Lines changed: 59 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,15 @@ import java.text.SimpleDateFormat
2424
import java.util.Calendar
2525
import javax.annotation.concurrent.GuardedBy
2626

27-
import scala.collection.mutable.ArrayBuffer
27+
import scala.collection.mutable.ListBuffer
2828
import scala.util.{Failure, Success, Try}
2929

3030
import org.apache.spark.internal.Logging
31-
import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
31+
import org.apache.spark.sql._
3232
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
3333
import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
3434

35+
3536
object TextSocketSource {
3637
val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil)
3738
val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) ::
@@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
5354
@GuardedBy("this")
5455
private var readThread: Thread = null
5556

57+
/**
58+
* All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive.
59+
* Stored in a ListBuffer to facilitate removing committed batches.
60+
*/
61+
@GuardedBy("this")
62+
protected val batches = new ListBuffer[(String, Timestamp)]
63+
64+
@GuardedBy("this")
65+
protected var currentOffset: LongOffset = new LongOffset(-1)
66+
5667
@GuardedBy("this")
57-
private var lines = new ArrayBuffer[(String, Timestamp)]
68+
protected var lastOffsetCommitted : LongOffset = new LongOffset(-1)
5869

5970
initialize()
6071

@@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
7485
return
7586
}
7687
TextSocketSource.this.synchronized {
77-
lines += ((line,
88+
val newData = (line,
7889
Timestamp.valueOf(
7990
TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime()))
80-
))
91+
)
92+
currentOffset = currentOffset + 1
93+
batches.append(newData)
8194
}
8295
}
8396
} catch {
@@ -92,21 +105,54 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo
92105
override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP
93106
else TextSocketSource.SCHEMA_REGULAR
94107

95-
/** Returns the maximum available offset for this source. */
96108
override def getOffset: Option[Offset] = synchronized {
97-
if (lines.isEmpty) None else Some(LongOffset(lines.size - 1))
109+
if (currentOffset.offset == -1) {
110+
None
111+
} else {
112+
Some(currentOffset)
113+
}
98114
}
99115

100116
/** Returns the data that is between the offsets (`start`, `end`]. */
101117
override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized {
102-
val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0)
103-
val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1
104-
val data = synchronized { lines.slice(startIdx, endIdx) }
118+
val startOrdinal =
119+
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1
120+
val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1
121+
122+
// Internal buffer only holds the batches after lastOffsetCommitted
123+
val rawList = synchronized {
124+
val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1
125+
val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1
126+
batches.slice(sliceStart, sliceEnd)
127+
}
128+
105129
import sqlContext.implicits._
130+
val rawBatch = sqlContext.createDataset(rawList)
131+
132+
// Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp
133+
// if requested.
106134
if (includeTimestamp) {
107-
data.toDF("value", "timestamp")
135+
rawBatch.toDF("value", "timestamp")
136+
} else {
137+
// Strip out timestamp
138+
rawBatch.select("_1").toDF("value")
139+
}
140+
}
141+
142+
override def commit(end: Offset): Unit = synchronized {
143+
if (end.isInstanceOf[LongOffset]) {
144+
val newOffset = end.asInstanceOf[LongOffset]
145+
val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt
146+
147+
if (offsetDiff < 0) {
148+
sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end")
149+
}
150+
151+
batches.trimStart(offsetDiff)
152+
lastOffsetCommitted = newOffset
108153
} else {
109-
data.map(_._1).toDF("value")
154+
sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " +
155+
s"originate with an instance of this class")
110156
}
111157
}
112158

@@ -141,7 +187,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis
141187
providerName: String,
142188
parameters: Map[String, String]): (String, StructType) = {
143189
logWarning("The socket source should not be used for production applications! " +
144-
"It does not support recovery and stores state indefinitely.")
190+
"It does not support recovery.")
145191
if (!parameters.contains("host")) {
146192
throw new AnalysisException("Set a host to read from with option(\"host\", ...).")
147193
}

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -252,8 +252,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
252252
val inputData = MemoryStream[Int]
253253
val mapped = inputData.toDS().map(6 / _)
254254

255-
// Run 3 batches, and then assert that only 1 metadata file is left at the end
256-
// since the first 2 should have been purged.
255+
// Run 3 batches, and then assert that only 2 metadata files is are at the end
256+
// since the first should have been purged.
257257
testStream(mapped)(
258258
AddData(inputData, 1, 2),
259259
CheckAnswer(6, 3),
@@ -262,11 +262,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
262262
AddData(inputData, 4, 6),
263263
CheckAnswer(6, 3, 6, 3, 1, 1),
264264

265-
AssertOnQuery("metadata log should contain only one file") { q =>
265+
AssertOnQuery("metadata log should contain only two files") { q =>
266266
val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString)
267267
val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName())
268268
val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475
269-
assert(toTest.size == 1 && toTest.head == "2")
269+
assert(toTest.size == 2 && toTest.head == "1")
270270
true
271271
}
272272
)

0 commit comments

Comments
 (0)