Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.DataFrame

/**
* Used to pass a batch of data through a streaming query execution along with an indication
* of progress in the stream.
*/
class Batch(val end: Offset, val data: DataFrame)
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

/**
* An ordered collection of offsets, used to track the progress of processing data from one or more
* [[Source]]s that are present in a streaming query. This is similar to simplified, single-instance
* vector clock that must progress linearly forward.
*/
case class CompositeOffset(offsets: Seq[Option[Offset]]) extends Offset {
/**
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
* or greater than the specified object.
*/
override def compareTo(other: Offset): Int = other match {
case otherComposite: CompositeOffset if otherComposite.offsets.size == offsets.size =>
val comparisons = offsets.zip(otherComposite.offsets).map {
case (Some(a), Some(b)) => a compareTo b
case (None, None) => 0
case (None, _) => -1
case (_, None) => 1
}
val signs = comparisons.map(sign).distinct
if (signs.size != 1) {
throw new IllegalArgumentException(
s"Invalid comparison between non-linear histories: $this <=> $other")
}
signs.head
case _ =>
throw new IllegalArgumentException(s"Cannot compare $this <=> $other")
}

private def sign(num: Int): Int = num match {
case i if i < 0 => -1
case i if i == 0 => 0
case i if i > 0 => 1
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are the basic abstractions of streaming stuff like Offset in sql.execution package? From user point of view, its not intuitive to have "execution" in the middle, and painful to import that deep a package name.

I think basic abstractions like Offset, Source, Sink should be in sql.streaming, and things like StreamExecution can be in sql.execution.streaming or sql.streaming.execution.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thought was to by default put everything in execution which is hidden from scala doc / etc. We can move stuff out as we decide to make it public API.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am cool with that for now.


/**
* A simple offset for sources that produce a single linear stream of data.
*/
case class LongOffset(offset: Long) extends Offset {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good to have some scala docs here.


override def compareTo(other: Offset): Int = other match {
case l: LongOffset => offset.compareTo(l.offset)
case _ =>
throw new IllegalArgumentException(s"Invalid comparison of $getClass with ${other.getClass}")
}

def +(increment: Long): LongOffset = new LongOffset(offset + increment)
def -(decrement: Long): LongOffset = new LongOffset(offset - decrement)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

/**
* A offset is a monotonically increasing metric used to track progress in the computation of a
* stream. An [[Offset]] must be comparable.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By comparable do you mean only > and < or equality as well. Not clear from docs, better to make it explicit.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now implements a more general compare method.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is better I think. Any benefit in making it extend Java Comparable class?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually did that at first, I wasn't sure if we wanted to be Ordered or Comparable. I've changed it to compareTo to make it easy to add either in the future.

*/
trait Offset extends Serializable {

/**
* Returns a negative integer, zero, or a positive integer as this object is less than, equal to,
* or greater than the specified object.
*/
def compareTo(other: Offset): Int

def >(other: Offset): Boolean = compareTo(other) > 0
def <(other: Offset): Boolean = compareTo(other) < 0
def <=(other: Offset): Boolean = compareTo(other) <= 0
def >=(other: Offset): Boolean = compareTo(other) >= 0
def ==(other: Offset): Boolean = compareTo(other) == 0
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.{DataFrame, SQLContext}

/**
* An interface for systems that can collect the results of a streaming query.
*
* When new data is produced by a query, a [[Sink]] must be able to transactionally collect the
* data and update the [[StreamProgress]]. In the case of a failure, the sink will be recreated
* and must be able to return the [[StreamProgress]] for all of the data that is made durable.
* This contract allows Spark to process data with exactly-once semantics, even in the case
* of failures that require the computation to be restarted.
*/
trait Sink {
/**
* Returns the [[Offset]] for all data that is currently present in the sink, if any. This
* function will be called by Spark when restarting a stream in order to determine at which point
* in streamed input data computation should be resumed from.
*/
def currentProgress: Option[Offset]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably better to call this getOffset to keep consistent with verb-noun format. Especially since this is not expected to be a static return value.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we pretty much never use get and this function should not have side-effects. If anything I think I'd change getNextBatch to fetchNextBatch to make it clear that it can do work like caching if needed.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nonetheless, does not make sense to call currentProgress when wear not returning "StreamingProgress"


/**
* Accepts a new batch of data as well as a [[StreamProgress]] that denotes how far in the input
* data computation has progressed to. When computation restarts after a failure, it is important
* that a [[Sink]] returns the same [[Offset]] as the most recent batch of data that
* has been persisted durrably. Note that this does not necessarily have to be the
* [[Offset]] for the most recent batch of data that was given to the sink. For example,
* it is valid to buffer data before persisting, as long as the [[Offset]] is stored
* transactionally as data is eventually persisted.
*/
def addBatch(batch: Batch): Unit
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType

/**
* A source of continually arriving data for a streaming query. A [[Source]] must have a
* monotonically increasing notion of progress that can be represented as an [[Offset]]. Spark
* will regularly query each [[Source]] to see if any more data is available.
*/
trait Source {

/** Returns the schema of the data from this source */
def schema: StructType

/**
* Returns the next batch of data that is available after `start`, if any is available.
*/
def getNextBatch(start: Option[Offset]): Option[Batch]
}
Loading