Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
working on state
  • Loading branch information
marmbrus committed Dec 11, 2015
commit d2706b511f254feff7d4558550e403215ffb795f
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] {
def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = {
// Obviously a lot to do here still...
val iter = strategies.view.flatMap(_(plan)).toIterator
assert(iter.hasNext, s"No plan for $plan")
assert(iter.hasNext, s"No plan for\n$plan")
iter
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ class ExpressionEncoderSuite extends SparkFunSuite {
encodeDecodeTest(new JavaSerializable(15), "java object")(
encoderFor(Encoders.javaSerialization[JavaSerializable]))

encodeDecodeTest(Array(InnerClass(1)), "array of inner class")

// test product encoders
private def productTest[T <: Product : ExpressionEncoder](input: T): Unit = {
encodeDecodeTest(input, input.getClass.getSimpleName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2141,7 +2141,7 @@ class DataFrame private[sql](
}

/** A convenient function to wrap a logical plan and produce a DataFrame. */
@inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
@inline private[sql] def withPlan(logicalPlan: => LogicalPlan): DataFrame = {
new DataFrame(sqlContext, logicalPlan)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{DataFrame, SQLContext}

trait Sink {
def allData(sqlContext: SQLContext): DataFrame
def currentWatermark(source: Source): Option[Watermark]
def addBatch(watermarks: Map[Source, Watermark], rdd: RDD[InternalRow]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,25 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.Logging
import org.apache.spark.sql.{DataFrame, Strategy, SQLContext}
import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
import org.apache.spark.sql.execution.streaming.state.StatefulPlanner
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.{SparkPlanner, LogicalRDD}
import org.apache.spark.sql.execution.{SparkPlan, QueryExecution, LogicalRDD}

class StreamExecution(
sqlContext: SQLContext,
logicalPlan: LogicalPlan,
private[sql] val logicalPlan: LogicalPlan,
val sink: Sink) extends Logging {

/** All stream sources present the query plan. */
private val sources = logicalPlan.collect { case s: Source => s: Source }

private val currentWatermarks = new StreamProgress
/** Tracks how much data we have processed from each input source. */
private[sql] val currentWatermarks = new StreamProgress

import org.apache.spark.sql.execution.streaming.state.MaxWatermark
private[sql] val maxEventTime = sqlContext.sparkContext.accumulator(Watermark(-1))(org.apache.spark.sql.execution.streaming.state.MaxWatermark)

// Start the execution at the current watermark for the sink. (i.e. avoid reprocessing data
// that we have already processed).
Expand All @@ -37,43 +45,56 @@ class StreamExecution(
currentWatermarks.update(s, sourceWatermark)
}

@volatile
private var shouldRun = true
/** When false, signals to the microBatchThread that it should stop running. */
@volatile private var shouldRun = true

private val thread = new Thread("stream execution thread") {
/** The thread that runs the micro-batches of this stream. */
private[sql] val microBatchThread = new Thread("stream execution thread") {
override def run(): Unit = {
SQLContext.setActive(sqlContext)
while (shouldRun) { attemptBatch() }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs some exception handling. If this thread silently fails on some exception, what happens to execution? Is it terminated? How will the user get the exception.

In SS, the solution was to forward any terminating exception to the thread stuck in streamingContext.awaitTermination().

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point. we do this for units tests now in a manual way, but we should have a better user API. i've added a TODO.

}
}
microBatchThread.setDaemon(true)
microBatchThread.start()

thread.setDaemon(true)
thread.start()

/**
* Checks to see if any new data is present in any of the sources. When new data is available,
* a batch is executed and passed to the sink, updating the currentWatermarks.
*/
private def attemptBatch(): Unit = {
// Check to see if any of the input sources have data that has not been processed.
val newData = sources.flatMap {
case s if s.watermark > currentWatermarks(s) => s -> s.watermark :: Nil
case _ => Nil
}.toMap

if (newData.nonEmpty) {
val startTime = System.nanoTime()
logDebug(s"Running with new data upto: $newData")
logDebug(s"Running with new data up to: $newData")

// Replace sources in the logical plan with data that has arrived since the last batch.
val newPlan = logicalPlan transform {
case s: Source if newData.contains(s) =>
val batchInput = s.getSlice(sqlContext, currentWatermarks(s), newData(s))
LogicalRDD(s.output, batchInput)(sqlContext)
case s: Source => LocalRelation(s.output)
}

// TODO: Duplicated with QueryExecution
val analyzedPlan = sqlContext.analyzer.execute(newPlan)
val optimizedPlan = sqlContext.optimizer.execute(analyzedPlan)
val physicalPlan = sqlContext.planner.plan(optimizedPlan).next()
val executedPlan = sqlContext.prepareForExecution.execute(physicalPlan)
val optimizerStart = System.nanoTime()

val results = executedPlan.execute().map(_.copy()).cache()
logInfo(s"Processed ${results.count()} records using plan\n$executedPlan")
val executedPlan = new QueryExecution(sqlContext, newPlan) {
override lazy val optimizedPlan: LogicalPlan = EliminateSubQueries(analyzed)
override lazy val sparkPlan: SparkPlan = {
SQLContext.setActive(sqlContext)
new StatefulPlanner(sqlContext, maxEventTime).plan(optimizedPlan).next()
}
}.executedPlan

val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000
logDebug(s"Optimized batch in ${optimizerTime}ms")

val results = executedPlan.execute().map(_.copy())
sink.addBatch(newData, results)

StreamExecution.this.synchronized {
Expand All @@ -82,22 +103,28 @@ class StreamExecution(
}

val batchTime = (System.nanoTime() - startTime).toDouble / 1000000
logWarning(s"Compete up to $newData in ${batchTime}ms")
logInfo(s"Compete up to $newData in ${batchTime}ms")
}

logDebug(s"Waiting for data, current: $currentWatermarks")
Thread.sleep(10)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 10? Should be a separate constant.

}

def current: DataFrame = sink.allData(sqlContext)

/**
* Signals to the thread executing micro-batches that it should stop running after the next
* batch. This method blocks until the thread stops running.
*/
def stop(): Unit = {
shouldRun = false
while (thread.isAlive) { Thread.sleep(100) }
while (microBatchThread.isAlive) { Thread.sleep(100) }
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sleep can be replaced with join: if (microBatchThread.isAlive) microBatchThread.join()

}

def waitUntil(source: Source, newWatermark: Watermark): Unit = {
assert(thread.isAlive)
/**
* Blocks the current thread until processing for data from the given `source` has reached at
* least the given `watermark`. This method is indented for use primarily when writing tests.
*/
def awaitWatermark(source: Source, newWatermark: Watermark): Unit = {
assert(microBatchThread.isAlive)

while (currentWatermarks(source) < newWatermark) {
logInfo(s"Waiting until $newWatermark at $source")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@ class StreamProgress {
def apply(source: Source): Watermark = currentWatermarks(source)
def get(source: Source): Option[Watermark] = currentWatermarks.get(source)

def copy(): StreamProgress = {
val copied = new StreamProgress
currentWatermarks.foreach(copied.update)
copied
}

override def toString: String =
currentWatermarks.map { case (k, v) => s"$k: $v"}.mkString("[", ",", "]")
currentWatermarks.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}")
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@

package org.apache.spark.sql.execution.streaming

class Watermark(val offset: Long) extends AnyVal {
object Watermark {
val min = Watermark(-1)
}

case class Watermark(offset: Long) {
def >(other: Watermark): Boolean = offset > other.offset
def <(other: Watermark): Boolean = offset < other.offset
def +=(increment: Long): Watermark = new Watermark(offset + increment)
def +(increment: Long): Watermark = new Watermark(offset + increment)
def -(decrement: Long): Watermark = new Watermark(offset - decrement)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{Logging, SparkEnv}
import org.apache.spark.api.java.StorageLevels

import org.apache.spark.sql.{DataFrame, SQLContext, Encoder}
import org.apache.spark.sql.catalyst.encoders.encoderFor
import org.apache.spark.sql.{Dataset, DataFrame, SQLContext, Encoder, Row}
import org.apache.spark.sql.catalyst.encoders.{RowEncoder, encoderFor}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Union, LeafNode}
import org.apache.spark.storage.{StreamBlockId, BlockId}
Expand All @@ -51,6 +51,14 @@ case class MemoryStream[A : Encoder](output: Seq[Attribute]) extends LeafNode wi

def watermark: Watermark = currentWatermark

def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
new Dataset(sqlContext, this)
}

def toDF()(implicit sqlContext: SQLContext): DataFrame = {
new DataFrame(sqlContext, this)
}

def addData(data: TraversableOnce[A]): Watermark = {
val blockId = StreamBlockId(0, MemoryStream.currentBlockId.incrementAndGet())
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If instead of putting it in BlockManager, why not make it an RDD with makeRDD?? Then it would run in a distributed scenarios as well. MemoryStream is very very useful thing for developing, not just for local unit tests.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed to store dataframes

blockManager.putIterator(
Expand All @@ -59,7 +67,7 @@ case class MemoryStream[A : Encoder](output: Seq[Attribute]) extends LeafNode wi
StorageLevels.MEMORY_ONLY_SER)

synchronized {
currentWatermark += 1
currentWatermark = currentWatermark + 1
blocks.append(blockId)
currentWatermark
}
Expand All @@ -76,22 +84,19 @@ case class MemoryStream[A : Encoder](output: Seq[Attribute]) extends LeafNode wi

class MemorySink(schema: StructType) extends Sink with Logging {
private val currentWatermarks = new StreamProgress
private var rdds = new ArrayBuffer[RDD[InternalRow]]
private var batches = new ArrayBuffer[(StreamProgress, Seq[Row])]()

private val output = schema.toAttributes

def allData(sqlContext: SQLContext): DataFrame =
new DataFrame(
sqlContext,
rdds
.map(LogicalRDD(output, _)(sqlContext))
.reduceOption(Union)
.getOrElse(LocalRelation(output)))

def currentWatermark(source: Source): Option[Watermark] = currentWatermarks.get(source)

def allData: Seq[Row] = batches.flatMap(_._2)

val externalRowConverter = RowEncoder(schema)
def addBatch(watermarks: Map[Source, Watermark], rdd: RDD[InternalRow]): Unit = {
watermarks.foreach(currentWatermarks.update)
rdds.append(rdd)
batches.append((currentWatermarks.copy(), rdd.collect().map(externalRowConverter.fromRow)))
}

override def toString: String = batches.map(b => s"${b._1}: ${b._2.mkString(" ")}").mkString("\n")
}
Loading