Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
more docs
  • Loading branch information
marmbrus committed Jan 5, 2016
commit 92050688699b4c509f5ec5c2b636797cb5ae3c89
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the basic abstractions of streaming stuff like Offset in sql.execution package? From user point of view, its not intuitive to have "execution" in the middle, and painful to import that deep a package name.

I think basic abstractions like Offset, Source, Sink should be in sql.streaming, and things like StreamExecution can be in sql.execution.streaming or sql.streaming.execution.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was to by default put everything in execution which is hidden from scala doc / etc. We can move stuff out as we decide to make it public API.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am cool with that for now.


case class LongOffset(offset: Long) extends Offset {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to have some scala docs here.

override def >(other: Offset): Boolean = other match {
case l: LongOffset => offset > l.offset
case _ =>
throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
}

override def <(other: Offset): Boolean = other match {
case l: LongOffset => offset < l.offset
case _ =>
throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
}

def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.execution.streaming



/**
* A offset is a monotonically increasing metric used to track progress in the computation of a
* stream. An [[Offset]] must be comparable.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By comparable do you mean only > and < or equality as well. Not clear from docs, better to make it explicit.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now implements a more general compare method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is better I think. Any benefit in making it extend Java Comparable class?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually did that at first, I wasn't sure if we wanted to be Ordered or Comparable. I've changed it to compareTo to make it easy to add either in the future.

Expand All @@ -28,20 +26,3 @@ trait Offset extends Serializable {

def <(other: Offset): Boolean
}

case class LongOffset(offset: Long) extends Offset {
override def >(other: Offset): Boolean = other match {
case l: LongOffset => offset > l.offset
case _ =>
throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
}

override def <(other: Offset): Boolean = other match {
case l: LongOffset => offset < l.offset
case _ =>
throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
}

def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,31 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{DataFrame, SQLContext}

/**
* An interface for systems that can collect the results of a streaming query.
*
* When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
* data and update the [[StreamProgress]]. In the case of a failure, the sink will be recreated
* and must be able to return the [[StreamProgress]] for all of the data that is made durable.
* This contract allows Spark to process data with exactly-once semantics, even in the case
* of failures that require the computation to be restarted.
*/
trait Sink {
def currentOffset(source: Source): Option[Offset]
/**
* Returns the [[StreamProgress]] for all data that is currently present in the sink. This
* function will be called by Spark when restarting a stream in order to determine at which point
* in streamed input data computation should be resumed from.
*/
def currentProgress: StreamProgress

/**
* Accepts a new batch of data as well as a [[StreamProgress]] that denotes how far in the input
* data computation has progressed to. When computation restarts after a failure, it is important
* that a [[Sink]] returns the same [[StreamProgress]] as the most recent batch of data that
* has been persisted durrably. Note that this does not necessarily have to be the
* [[StreamProgress]] for the most recent batch of data that was given to the sink. For example,
* it is valid to buffer data before persisting, as long as the [[StreamProgress]] is stored
* transactionally as data is eventually persisted.
*/
def addBatch(currentState: StreamProgress, rdd: RDD[InternalRow]): Unit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,34 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType


/**
* A source of continually arriving data for a streaming query. A [[Source]] must have a
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
* will regularly query each [[Source]] for is current [[Offset]] in order to determine when new
* data has arrived.
*/
trait Source {

/** Returns the schema of the data from this source */
def schema: StructType

/** Returns the maximum offset that can be retrieved from the source. */
/**
* Returns the maximum offset that can be retrieved from the source. This function will be called
* only before attempting to start a new batch, in order to determine if new data is available.
* Therfore is acceptable to perform potentially expensive work when this function is called that
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Therefore, it

* is required in-order to correctly replay sections of data when `getSlice` is called. For
* example, a [[Source]] might append to a write-ahead log in order to record what data is present
* at a given [[Offset]].
*/
def getCurrentOffset: Offset

/**
* Returns the data between the `start` and `end` offsets. This function must always return
* the same set of data for any given pair of offsets.
* the same set of data for any given pair of offsets in order to guarantee exactly-once semantics
* in the presence of failures.
*
* When `start` is [[None]], the stream should be replayed from the beginning. `getSlice` will
* never be called with an `end` that is greater than the result of `getCurrentOffset`.
*/
def getSlice(sqlContext: SQLContext, start: Option[Offset], end: Offset): RDD[InternalRow]
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,12 @@ class StreamExecution(

// Start the execution at the current Offset for the sink. (i.e. avoid reprocessing data
// that we have already processed).
sources.foreach { s =>
sink.currentOffset(s).foreach { offset =>
currentOffsets.update(s, offset)
{
val storedProgress = sink.currentProgress
sources.foreach { s =>
storedProgress.get(s).foreach { offset =>
currentOffsets.update(s, offset)
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ package org.apache.spark.sql.execution.streaming

import scala.collection.mutable

/**
* Tracks the progress of processing data from one or more [[Source]]s that are present in a
* streaming query. This is similar to simplified, single-instance vector clock that progresses
* monotonically forward.
*/
class StreamProgress extends Serializable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will external sinks create this object if this neither takes a map of offsets in constructor, nor can the update() be called.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a TODO for serialization API.

private val currentOffsets = new mutable.HashMap[Source, Offset]
with mutable.SynchronizedMap[Source, Offset]
Expand All @@ -42,6 +47,10 @@ class StreamProgress extends Serializable {
updated
}

/**
* Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable,
* it should be copied before being passed to user code.
*/
private[streaming] def copy(): StreamProgress = {
val copied = new StreamProgress
currentOffsets.foreach(copied.update)
Expand All @@ -60,4 +69,4 @@ class StreamProgress extends Serializable {
override def hashCode: Int = {
currentOffsets.toSeq.sortBy(_._1.toString).hashCode()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,3 @@ object StreamingRelation {
case class StreamingRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
override def toString: String = source.toString
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ object MemoryStream {
new MemoryStream[A](memoryStreamId.getAndIncrement())
}

/**
* A [[Source]] that produces value stored in memory as they are added by the user. This [[Source]]
* is primarily intended for use in unit tests as it can only replay data when the object is still
* available.
*/
case class MemoryStream[A : Encoder](id: Int) extends Source with Logging {
protected val encoder = encoderFor[A]
protected val logicalPlan = StreamingRelation(this)
Expand Down Expand Up @@ -92,20 +97,32 @@ case class MemoryStream[A : Encoder](id: Int) extends Source with Logging {
override def toString: String = s"MemoryStream[${output.mkString(",")}]"
}

/**
* A sink that stores the results in memory. This [[Sink]] is primarily intended for use in unit
* tests and does not provide durablility.
*/
class MemorySink(schema: StructType) extends Sink with Logging {
/** An order list of batches that have been written to this [[Sink]]. */
private var batches = new ArrayBuffer[(StreamProgress, Seq[Row])]()
private val output = schema.toAttributes

def currentOffsets: StreamProgress = batches.lastOption.map(_._1).getOrElse(new StreamProgress)
def currentOffset(source: Source): Option[Offset] = currentOffsets.get(source)
/** Used to convert an [[InternalRow]] to an external [[Row]] for comparison in testing. */
private val externalRowConverter = RowEncoder(schema)

def allData: Seq[Row] = batches.flatMap(_._2)
override def currentProgress: StreamProgress =
batches.lastOption.map(_._1).getOrElse(new StreamProgress)

val externalRowConverter = RowEncoder(schema)
def addBatch(currentState: StreamProgress, rdd: RDD[InternalRow]): Unit = {
override def addBatch(currentState: StreamProgress, rdd: RDD[InternalRow]): Unit = {
batches.append((currentState, rdd.collect().map(externalRowConverter.fromRow)))
}

/** Returns all rows that are stored in this [[Sink]]. */
def allData: Seq[Row] = batches.flatMap(_._2)

/**
* Atomically drops the most recent `num` batches and resets the [[StreamProgress]] to the
* corresponding point in the input. This function can be used when testing to simulate data
* that has been lost due to buffering.
*/
def dropBatches(num: Int): Unit = {
batches.remove(batches.size - num, num)
}
Expand Down
17 changes: 8 additions & 9 deletions sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,16 @@ object QueryTest {
isSorted: Boolean = false): Option[String] = {
if (prepareAnswer(expectedAnswer, isSorted) != prepareAnswer(sparkAnswer, isSorted)) {
val errorMessage =
s"""
|== Results ==
|${sideBySide(
s"== Correct Answer - ${expectedAnswer.size} ==" +:
prepareAnswer(expectedAnswer, isSorted).map(_.toString()),
s"== Spark Answer - ${sparkAnswer.size} ==" +:
prepareAnswer(sparkAnswer, isSorted).map(_.toString())).mkString("\n")}
""".stripMargin
s"""
|== Results ==
|${sideBySide(
s"== Correct Answer - ${expectedAnswer.size} ==" +:
prepareAnswer(expectedAnswer, isSorted).map(_.toString()),
s"== Spark Answer - ${sparkAnswer.size} ==" +:
prepareAnswer(sparkAnswer, isSorted).map(_.toString())).mkString("\n")}
""".stripMargin
return Some(errorMessage)
}

None
}

Expand Down
19 changes: 14 additions & 5 deletions sql/core/src/test/scala/org/apache/spark/sql/StreamTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ import org.apache.spark.sql.execution.streaming._
/**
* A framework for implementing tests for streaming queries and sources.
*
* A test consists of a set of steps (i.e. a [[StreamAction]]) that are executed in order, blocking
* as necessary to let the stream catch up. For example, the following adds some data to a stream
* and verifies that
* A test consists of a set of steps (expressed as a `StreamAction`) that are executed in order,
* blocking as necessary to let the stream catch up. For example, the following adds some data to
* a stream, blocking until it can verify that the correct values are eventually produced.
*
* {{{
* val inputData = MemoryStream[Int]
Expand All @@ -46,8 +46,12 @@ import org.apache.spark.sql.execution.streaming._
* }}}
*
* Note that while we do sleep to allow the other thread to progress without spinning,
* [[StreamAction]] checks should not depend on the amount of time spent sleeping. Instead they
* `StreamAction` checks should not depend on the amount of time spent sleeping. Instead they
* should check the actual progress of the stream before verifying the required test condition.
*
* Currently it is assumed that all streaming queries will eventually complete in 10 seconds to
* avoid hanging forever in the case of failures. However, individual suites can change this
* by overriding `streamingTimeout`.
*/
trait StreamTest extends QueryTest with Timeouts {

Expand Down Expand Up @@ -76,6 +80,11 @@ trait StreamTest extends QueryTest with Timeouts {
/** A trait that can be extended when testing other sources. */
trait AddData extends StreamAction {
def source: Source

/**
* Called to trigger adding the data. Should return the offset that will denote when this
* new data has been processed.
*/
def addData(): Offset
}

Expand Down Expand Up @@ -267,4 +276,4 @@ trait StreamTest extends QueryTest with Timeouts {
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,4 @@ class StreamSuite extends StreamTest with SharedSQLContext {
AddData(inputData, 1, 2, 3, 4),
CheckAnswer(2, 4))
}
}
}