diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala index 115edf7ab2b6..a392b8299902 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala @@ -176,6 +176,15 @@ class FileStreamSource( override def toString: String = s"FileStreamSource[$qualifiedBasePath]" + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + override def commit(end: Offset): Unit = { + // No-op for now; FileStreamSource currently garbage-collects files based on timestamp + // and the value of the maxFileAge parameter. + } + override def stop() {} } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala index 971147840d2f..f3bd5bfe23fd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Source.scala @@ -30,16 +30,30 @@ trait Source { /** Returns the schema of the data from this source */ def schema: StructType - /** Returns the maximum available offset for this source. */ + /** + * Returns the maximum available offset for this source. + * Returns `None` if this source has never received any data. + */ def getOffset: Option[Offset] /** - * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None` then - * the batch should begin with the first available record. This method must always return the - * same data for a particular `start` and `end` pair. + * Returns the data that is between the offsets (`start`, `end`]. When `start` is `None`, + * then the batch should begin with the first record. This method must always return the + * same data for a particular `start` and `end` pair; even after the Source has been restarted + * on a different node. + * + * Higher layers will always call this method with a value of `start` greater than or equal + * to the last value passed to `commit` and a value of `end` less than or equal to the + * last value returned by `getOffset` */ def getBatch(start: Option[Offset], end: Offset): DataFrame + /** + * Informs the source that Spark has completed processing all data for offsets less than or + * equal to `end` and will only request offsets greater than `end` in the future. + */ + def commit(end: Offset) : Unit = {} + /** Stop this source and free any resources it has allocated. */ def stop(): Unit } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index ba8cf808e339..37af1a550aaf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -73,6 +73,9 @@ class StreamExecution( /** * Tracks how much data we have processed and committed to the sink or state store from each * input source. + * Only the scheduler thread should modify this field, and only in atomic steps. + * Other threads should make a shallow copy if they are going to access this field more than + * once, since the field's value may change at any time. */ @volatile var committedOffsets = new StreamProgress @@ -80,6 +83,9 @@ class StreamExecution( /** * Tracks the offsets that are available to be processed, but have not yet be committed to the * sink. + * Only the scheduler thread should modify this field, and only in atomic steps. + * Other threads should make a shallow copy if they are going to access this field more than + * once, since the field's value may change at any time. */ @volatile private var availableOffsets = new StreamProgress @@ -337,17 +343,27 @@ class StreamExecution( } if (hasNewData) { reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) { - assert( - offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), + assert(offsetLog.add(currentBatchId, availableOffsets.toCompositeOffset(sources)), s"Concurrent update to the log. Multiple streaming jobs detected for $currentBatchId") logInfo(s"Committed offsets for batch $currentBatchId.") + // NOTE: The following code is correct because runBatches() processes exactly one + // batch at a time. If we add pipeline parallelism (multiple batches in flight at + // the same time), this cleanup logic will need to change. + + // Now that we've updated the scheduler's persistent checkpoint, it is safe for the + // sources to discard data from the previous batch. + val prevBatchOff = offsetLog.get(currentBatchId - 1) + if (prevBatchOff.isDefined) { + prevBatchOff.get.toStreamProgress(sources).foreach { + case (src, off) => src.commit(off) + } + } + // Now that we have logged the new batch, no further processing will happen for - // the previous batch, and it is safe to discard the old metadata. - // Note that purge is exclusive, i.e. it purges everything before currentBatchId. - // NOTE: If StreamExecution implements pipeline parallelism (multiple batches in - // flight at the same time), this cleanup logic will need to change. - offsetLog.purge(currentBatchId) + // the batch before the previous batch, and it is safe to discard the old metadata. + // Note that purge is exclusive, i.e. it purges everything before the target ID. + offsetLog.purge(currentBatchId - 1) } } else { awaitBatchLock.lock() @@ -455,7 +471,7 @@ class StreamExecution( /** * Blocks the current thread until processing for data from the given `source` has reached at - * least the given `Offset`. This method is indented for use primarily when writing tests. + * least the given `Offset`. This method is intended for use primarily when writing tests. */ private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { def notDone = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 788fcd0361be..48d9791faf1e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.atomic.AtomicInteger import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, ListBuffer} import scala.util.control.NonFatal import org.apache.spark.internal.Logging @@ -51,12 +51,23 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) protected val logicalPlan = StreamingExecutionRelation(this) protected val output = logicalPlan.output + /** + * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. + * Stored in a ListBuffer to facilitate removing committed batches. + */ @GuardedBy("this") - protected val batches = new ArrayBuffer[Dataset[A]] + protected val batches = new ListBuffer[Dataset[A]] @GuardedBy("this") protected var currentOffset: LongOffset = new LongOffset(-1) + /** + * Last offset that was discarded, or -1 if no commits have occurred. Note that the value + * -1 is used in calculations below and isn't just an arbitrary constant. + */ + @GuardedBy("this") + protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) + def schema: StructType = encoder.schema def toDS()(implicit sqlContext: SQLContext): Dataset[A] = { @@ -85,21 +96,25 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) override def toString: String = s"MemoryStream[${Utils.truncatedString(output, ",")}]" override def getOffset: Option[Offset] = synchronized { - if (batches.isEmpty) { + if (currentOffset.offset == -1) { None } else { Some(currentOffset) } } - /** - * Returns the data that is between the offsets (`start`, `end`]. - */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = { + // Compute the internal batch numbers to fetch: [startOrdinal, endOrdinal) val startOrdinal = start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 - val newBlocks = synchronized { batches.slice(startOrdinal, endOrdinal) } + + // Internal buffer only holds the batches after lastCommittedOffset. + val newBlocks = synchronized { + val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 + val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 + batches.slice(sliceStart, sliceEnd) + } logDebug( s"MemoryBatch [$startOrdinal, $endOrdinal]: ${newBlocks.flatMap(_.collect()).mkString(", ")}") @@ -111,11 +126,29 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } + override def commit(end: Offset): Unit = synchronized { + end match { + case newOffset: LongOffset => + val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + } + + batches.trimStart(offsetDiff) + lastOffsetCommitted = newOffset + case _ => + sys.error(s"MemoryStream.commit() received an offset ($end) that did not originate with " + + "an instance of this class") + } + } + override def stop() {} def reset(): Unit = synchronized { batches.clear() currentOffset = new LongOffset(-1) + lastOffsetCommitted = new LongOffset(-1) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index fb15239f9af9..c662e7c6bc77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -24,14 +24,15 @@ import java.text.SimpleDateFormat import java.util.Calendar import javax.annotation.concurrent.GuardedBy -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.ListBuffer import scala.util.{Failure, Success, Try} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} +import org.apache.spark.sql._ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType} + object TextSocketSource { val SCHEMA_REGULAR = StructType(StructField("value", StringType) :: Nil) val SCHEMA_TIMESTAMP = StructType(StructField("value", StringType) :: @@ -53,8 +54,18 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo @GuardedBy("this") private var readThread: Thread = null + /** + * All batches from `lastCommittedOffset + 1` to `currentOffset`, inclusive. + * Stored in a ListBuffer to facilitate removing committed batches. + */ + @GuardedBy("this") + protected val batches = new ListBuffer[(String, Timestamp)] + + @GuardedBy("this") + protected var currentOffset: LongOffset = new LongOffset(-1) + @GuardedBy("this") - private var lines = new ArrayBuffer[(String, Timestamp)] + protected var lastOffsetCommitted : LongOffset = new LongOffset(-1) initialize() @@ -74,10 +85,12 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo return } TextSocketSource.this.synchronized { - lines += ((line, + val newData = (line, Timestamp.valueOf( TextSocketSource.DATE_FORMAT.format(Calendar.getInstance().getTime())) - )) + ) + currentOffset = currentOffset + 1 + batches.append(newData) } } } catch { @@ -92,21 +105,54 @@ class TextSocketSource(host: String, port: Int, includeTimestamp: Boolean, sqlCo override def schema: StructType = if (includeTimestamp) TextSocketSource.SCHEMA_TIMESTAMP else TextSocketSource.SCHEMA_REGULAR - /** Returns the maximum available offset for this source. */ override def getOffset: Option[Offset] = synchronized { - if (lines.isEmpty) None else Some(LongOffset(lines.size - 1)) + if (currentOffset.offset == -1) { + None + } else { + Some(currentOffset) + } } /** Returns the data that is between the offsets (`start`, `end`]. */ override def getBatch(start: Option[Offset], end: Offset): DataFrame = synchronized { - val startIdx = start.map(_.asInstanceOf[LongOffset].offset.toInt + 1).getOrElse(0) - val endIdx = end.asInstanceOf[LongOffset].offset.toInt + 1 - val data = synchronized { lines.slice(startIdx, endIdx) } + val startOrdinal = + start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1 + val endOrdinal = end.asInstanceOf[LongOffset].offset.toInt + 1 + + // Internal buffer only holds the batches after lastOffsetCommitted + val rawList = synchronized { + val sliceStart = startOrdinal - lastOffsetCommitted.offset.toInt - 1 + val sliceEnd = endOrdinal - lastOffsetCommitted.offset.toInt - 1 + batches.slice(sliceStart, sliceEnd) + } + import sqlContext.implicits._ + val rawBatch = sqlContext.createDataset(rawList) + + // Underlying MemoryStream has schema (String, Timestamp); strip out the timestamp + // if requested. if (includeTimestamp) { - data.toDF("value", "timestamp") + rawBatch.toDF("value", "timestamp") + } else { + // Strip out timestamp + rawBatch.select("_1").toDF("value") + } + } + + override def commit(end: Offset): Unit = synchronized { + if (end.isInstanceOf[LongOffset]) { + val newOffset = end.asInstanceOf[LongOffset] + val offsetDiff = (newOffset.offset - lastOffsetCommitted.offset).toInt + + if (offsetDiff < 0) { + sys.error(s"Offsets committed out of order: $lastOffsetCommitted followed by $end") + } + + batches.trimStart(offsetDiff) + lastOffsetCommitted = newOffset } else { - data.map(_._1).toDF("value") + sys.error(s"TextSocketStream.commit() received an offset ($end) that did not " + + s"originate with an instance of this class") } } @@ -141,7 +187,7 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis providerName: String, parameters: Map[String, String]): (String, StructType) = { logWarning("The socket source should not be used for production applications! " + - "It does not support recovery and stores state indefinitely.") + "It does not support recovery.") if (!parameters.contains("host")) { throw new AnalysisException("Set a host to read from with option(\"host\", ...).") } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 92020be9789f..dad410486ed2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -252,8 +252,8 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { val inputData = MemoryStream[Int] val mapped = inputData.toDS().map(6 / _) - // Run 3 batches, and then assert that only 1 metadata file is left at the end - // since the first 2 should have been purged. + // Run 3 batches, and then assert that only 2 metadata files is are at the end + // since the first should have been purged. testStream(mapped)( AddData(inputData, 1, 2), CheckAnswer(6, 3), @@ -262,11 +262,11 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging { AddData(inputData, 4, 6), CheckAnswer(6, 3, 6, 3, 1, 1), - AssertOnQuery("metadata log should contain only one file") { q => + AssertOnQuery("metadata log should contain only two files") { q => val metadataLogDir = new java.io.File(q.offsetLog.metadataPath.toString) val logFileNames = metadataLogDir.listFiles().toSeq.map(_.getName()) val toTest = logFileNames.filter(! _.endsWith(".crc")) // Workaround for SPARK-17475 - assert(toTest.size == 1 && toTest.head == "2") + assert(toTest.size == 2 && toTest.head == "1") true } )