-
Notifications
You must be signed in to change notification settings - Fork 1
Initial draft of Streaming Dataframe infrastructure #21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 14 commits
e238911
d2706b5
7a3590f
c8a9238
dddd192
a0a1e7b
15bed31
89464a9
d133dbb
e3c4c83
90fa6d3
b1c1dc6
9205068
eab186d
cd575db
d233eae
3423dce
b0b20e5
f5d9642
f8911e4
6387781
6242bc2
95fd978
f928595
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| case class LongOffset(offset: Long) extends Offset { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good to have some scala docs here. |
||
| override def >(other: Offset): Boolean = other match { | ||
| case l: LongOffset => offset > l.offset | ||
| case _ => | ||
| throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}") | ||
| } | ||
|
|
||
| override def <(other: Offset): Boolean = other match { | ||
| case l: LongOffset => offset < l.offset | ||
| case _ => | ||
| throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}") | ||
| } | ||
|
|
||
| def +(increment: Long): LongOffset = new LongOffset(offset + increment) | ||
| def -(decrement: Long): LongOffset = new LongOffset(offset - decrement) | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| /** | ||
| * A offset is a monotonically increasing metric used to track progress in the computation of a | ||
| * stream. An [[Offset]] must be comparable. | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By comparable do you mean only
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now implements a more general
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is better I think. Any benefit in making it extend Java Comparable class?
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I actually did that at first, I wasn't sure if we wanted to be |
||
| */ | ||
| trait Offset extends Serializable { | ||
| def >(other: Offset): Boolean | ||
|
|
||
| def <(other: Offset): Boolean | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.{DataFrame, SQLContext} | ||
|
|
||
| /** | ||
| * An interface for systems that can collect the results of a streaming query. | ||
| * | ||
| * When new data is produced by a query, a [[Sink]] must be able to transactionally collect the | ||
| * data and update the [[StreamProgress]]. In the case of a failure, the sink will be recreated | ||
| * and must be able to return the [[StreamProgress]] for all of the data that is made durable. | ||
| * This contract allows Spark to process data with exactly-once semantics, even in the case | ||
| * of failures that require the computation to be restarted. | ||
| */ | ||
| trait Sink { | ||
| /** | ||
| * Returns the [[StreamProgress]] for all data that is currently present in the sink. This | ||
| * function will be called by Spark when restarting a stream in order to determine at which point | ||
| * in streamed input data computation should be resumed from. | ||
| */ | ||
| def currentProgress: StreamProgress | ||
|
|
||
| /** | ||
| * Accepts a new batch of data as well as a [[StreamProgress]] that denotes how far in the input | ||
| * data computation has progressed to. When computation restarts after a failure, it is important | ||
| * that a [[Sink]] returns the same [[StreamProgress]] as the most recent batch of data that | ||
| * has been persisted durrably. Note that this does not necessarily have to be the | ||
| * [[StreamProgress]] for the most recent batch of data that was given to the sink. For example, | ||
| * it is valid to buffer data before persisting, as long as the [[StreamProgress]] is stored | ||
| * transactionally as data is eventually persisted. | ||
| */ | ||
| def addBatch(currentState: StreamProgress, rdd: RDD[InternalRow]): Unit | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import org.apache.spark.rdd.RDD | ||
| import org.apache.spark.sql.SQLContext | ||
| import org.apache.spark.sql.catalyst.InternalRow | ||
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| /** | ||
| * A source of continually arriving data for a streaming query. A [[Source]] must have a | ||
| * monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark | ||
| * will regularly query each [[Source]] for is current [[Offset]] in order to determine when new | ||
| * data has arrived. | ||
| */ | ||
| trait Source { | ||
|
|
||
| /** Returns the schema of the data from this source */ | ||
| def schema: StructType | ||
|
|
||
| /** | ||
| * Returns the maximum offset that can be retrieved from the source. This function will be called | ||
| * only before attempting to start a new batch, in order to determine if new data is available. | ||
| * Therfore is acceptable to perform potentially expensive work when this function is called that | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Therefore, it |
||
| * is required in-order to correctly replay sections of data when `getSlice` is called. For | ||
| * example, a [[Source]] might append to a write-ahead log in order to record what data is present | ||
| * at a given [[Offset]]. | ||
| */ | ||
| def getCurrentOffset: Offset | ||
|
|
||
| /** | ||
| * Returns the data between the `start` and `end` offsets. This function must always return | ||
| * the same set of data for any given pair of offsets in order to guarantee exactly-once semantics | ||
| * in the presence of failures. | ||
| * | ||
| * When `start` is [[None]], the stream should be replayed from the beginning. `getSlice` will | ||
| * never be called with an `end` that is greater than the result of `getCurrentOffset`. | ||
| */ | ||
| def getSlice(sqlContext: SQLContext, start: Option[Offset], end: Offset): RDD[InternalRow] | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,145 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import org.apache.spark.Logging | ||
| import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries | ||
| import org.apache.spark.sql.{DataFrame, SQLContext} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} | ||
| import org.apache.spark.sql.execution.{SparkPlan, QueryExecution, LogicalRDD} | ||
|
|
||
| /** | ||
| * Manages the execution of a streaming Spark SQL query that is occuring in a separate thread. | ||
| * Unlike a standard query, a streaming query executes repeatedly each time new data arrives at any | ||
| * [[Source]] present in the query plan. Whenever new data arrives, a [[QueryExecution]] is created | ||
| * and the results are committed transactionally to the given [[Sink]]. | ||
| */ | ||
| class StreamExecution( | ||
| sqlContext: SQLContext, | ||
| private[sql] val logicalPlan: LogicalPlan, | ||
| val sink: Sink) extends Logging { | ||
|
|
||
| /** Tracks how much data we have processed from each input source. */ | ||
| private[sql] val currentOffsets = new StreamProgress | ||
|
|
||
| /** All stream sources present the query plan. */ | ||
| private val sources = | ||
| logicalPlan.collect { case s: StreamingRelation => s.source } | ||
|
|
||
| // Start the execution at the current Offset for the sink. (i.e. avoid reprocessing data | ||
| // that we have already processed). | ||
| { | ||
| val storedProgress = sink.currentProgress | ||
| sources.foreach { s => | ||
| storedProgress.get(s).foreach { offset => | ||
| currentOffsets.update(s, offset) | ||
| } | ||
| } | ||
| } | ||
|
|
||
| logInfo(s"Stream running at $currentOffsets") | ||
|
|
||
| /** When false, signals to the microBatchThread that it should stop running. */ | ||
| @volatile private var shouldRun = true | ||
|
|
||
| /** The thread that runs the micro-batches of this stream. */ | ||
| private[sql] val microBatchThread = new Thread("stream execution thread") { | ||
| override def run(): Unit = { | ||
| SQLContext.setActive(sqlContext) | ||
| while (shouldRun) { attemptBatch() } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs some exception handling. If this thread silently fails on some exception, what happens to execution? Is it terminated? How will the user get the exception. In SS, the solution was to forward any terminating exception to the thread stuck in streamingContext.awaitTermination().
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. good point. we do this for units tests now in a manual way, but we should have a better user API. i've added a TODO. |
||
| } | ||
| } | ||
| microBatchThread.setDaemon(true) | ||
| microBatchThread.start() | ||
|
|
||
| var lastExecution: QueryExecution = null | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: volatile and private[streaming]? |
||
|
|
||
| /** | ||
| * Checks to see if any new data is present in any of the sources. When new data is available, | ||
| * a batch is executed and passed to the sink, updating the currentOffsets. | ||
| */ | ||
| private def attemptBatch(): Unit = { | ||
|
|
||
| val newData = sources.flatMap { s => | ||
| val prevOffset = currentOffsets.get(s) | ||
| val latestOffset = s.getCurrentOffset | ||
| if (prevOffset.isEmpty || latestOffset > prevOffset.get) { | ||
| Some(s -> latestOffset) | ||
| } else None | ||
| }.toMap | ||
|
|
||
| if (newData.nonEmpty) { | ||
| val startTime = System.nanoTime() | ||
| logInfo(s"Running with new data up to: $newData") | ||
|
|
||
| // Replace sources in the logical plan with data that has arrived since the last batch. | ||
| val newPlan = logicalPlan transform { | ||
| case StreamingRelation(source, output) => | ||
| if (newData.contains(source)) { | ||
| val batchInput = | ||
| source.getSlice(sqlContext, currentOffsets.get(source), newData(source)) | ||
| LogicalRDD(output, batchInput)(sqlContext) | ||
| } else { | ||
| LocalRelation(output) | ||
| } | ||
| } | ||
|
|
||
| val optimizerStart = System.nanoTime() | ||
|
|
||
| lastExecution = new QueryExecution(sqlContext, newPlan) | ||
| val executedPlan = lastExecution.executedPlan | ||
| val optimizerTime = (System.nanoTime() - optimizerStart).toDouble / 1000000 | ||
| logDebug(s"Optimized batch in ${optimizerTime}ms") | ||
|
|
||
| val results = executedPlan.execute().map(_.copy()) | ||
| newData.foreach(currentOffsets.update) | ||
| sink.addBatch(currentOffsets.copy(), results) | ||
|
|
||
| StreamExecution.this.synchronized { | ||
| StreamExecution.this.notifyAll() | ||
| } | ||
|
|
||
| val batchTime = (System.nanoTime() - startTime).toDouble / 1000000 | ||
| logInfo(s"Compete up to $newData in ${batchTime}ms") | ||
| } | ||
|
|
||
| logDebug(s"Waiting for data, current: $currentOffsets") | ||
| Thread.sleep(10) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why 10? Should be a separate constant. |
||
| } | ||
|
|
||
| /** | ||
| * Signals to the thread executing micro-batches that it should stop running after the next | ||
| * batch. This method blocks until the thread stops running. | ||
| */ | ||
| def stop(): Unit = { | ||
| shouldRun = false | ||
| while (microBatchThread.isAlive) { Thread.sleep(100) } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| /** | ||
| * Blocks the current thread until processing for data from the given `source` has reached at | ||
| * least the given `Offset`. This method is indented for use primarily when writing tests. | ||
| */ | ||
| def awaitOffset(source: Source, newOffset: Offset): Unit = { | ||
| while (!currentOffsets.contains(source) || currentOffsets(source) < newOffset) { | ||
| logInfo(s"Waiting until $newOffset at $source") | ||
| synchronized { wait() } | ||
| } | ||
| logDebug(s"Unblocked at $newOffset for $source") | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,72 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.sql.execution.streaming | ||
|
|
||
| import scala.collection.mutable | ||
|
|
||
| /** | ||
| * Tracks the progress of processing data from one or more [[Source]]s that are present in a | ||
| * streaming query. This is similar to simplified, single-instance vector clock that progresses | ||
| * monotonically forward. | ||
| */ | ||
| class StreamProgress extends Serializable { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will external sinks create this object if this neither takes a map of offsets in constructor, nor can the update() be called.
Owner
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'll add a TODO for serialization API. |
||
| private val currentOffsets = new mutable.HashMap[Source, Offset] | ||
| with mutable.SynchronizedMap[Source, Offset] | ||
|
|
||
| private[streaming] def update(source: Source, newOffset: Offset): Unit = { | ||
| currentOffsets.get(source).foreach(old => assert(newOffset > old)) | ||
| currentOffsets.put(source, newOffset) | ||
| } | ||
|
|
||
| private[streaming] def update(newOffset: (Source, Offset)): Unit = | ||
| update(newOffset._1, newOffset._2) | ||
|
|
||
| private[streaming] def apply(source: Source): Offset = currentOffsets(source) | ||
| private[streaming] def get(source: Source): Option[Offset] = currentOffsets.get(source) | ||
| private[streaming] def contains(source: Source): Boolean = currentOffsets.contains(source) | ||
|
|
||
| private[streaming] def ++(updates: Map[Source, Offset]): StreamProgress = { | ||
| val updated = new StreamProgress | ||
| currentOffsets.foreach(updated.update) | ||
| updates.foreach(updated.update) | ||
| updated | ||
| } | ||
|
|
||
| /** | ||
| * Used to create a new copy of this [[StreamProgress]]. While this class is currently mutable, | ||
| * it should be copied before being passed to user code. | ||
| */ | ||
| private[streaming] def copy(): StreamProgress = { | ||
| val copied = new StreamProgress | ||
| currentOffsets.foreach(copied.update) | ||
| copied | ||
| } | ||
|
|
||
| override def toString: String = | ||
| currentOffsets.map { case (k, v) => s"$k: $v"}.mkString("{", ",", "}") | ||
|
|
||
| override def equals(other: Any): Boolean = other match { | ||
| case s: StreamProgress => | ||
| s.currentOffsets.keys.toSet == currentOffsets.keys.toSet && | ||
| s.currentOffsets.forall(w => currentOffsets(w._1) == w._2) | ||
| } | ||
|
|
||
| override def hashCode: Int = { | ||
| currentOffsets.toSeq.sortBy(_._1.toString).hashCode() | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are the basic abstractions of streaming stuff like
Offsetinsql.execution package? From user point of view, its not intuitive to have "execution" in the middle, and painful to import that deep a package name.I think basic abstractions like
Offset,Source,Sinkshould be insql.streaming, and things likeStreamExecutioncan be insql.execution.streamingorsql.streaming.execution.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My thought was to by default put everything in
executionwhich is hidden from scala doc / etc. We can move stuff out as we decide to make it public API.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am cool with that for now.