Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP: file source
  • Loading branch information
marmbrus committed Dec 28, 2015
commit d133dbbed3d3ff7146129cd6532b1f26f2201315
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ import org.apache.spark.sql.types._
@Experimental
@implicitNotFound("Unable to find encoder for type stored in a Dataset. Primitive types " +
"(Int, String, etc) and Product types (case classes) are supported by importing " +
"sqlContext.implicits._ Support for serializing other types will be added in future " +
"releases.")
"sqlContext.implicits._ Use Encoder.kryo[T] to build an encoder for other objects.")
trait Encoder[T] extends Serializable {

/** Returns the schema of encoding this type of object as a Row. */
Expand Down Expand Up @@ -157,6 +156,8 @@ object Encoders {
*/
def TIMESTAMP: Encoder[java.sql.Timestamp] = ExpressionEncoder()

def BINARY: Encoder[Array[Byte]] = ExpressionEncoder()

/**
* Creates an encoder for Java Bean of type T.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import java.io.{OutputStreamWriter, BufferedWriter}

import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.collection.OpenHashSet

import scala.collection.mutable.ArrayBuffer

class FileStream(
val sqlContext: SQLContext,
val metadataPath: String,
val path: String) extends Source with Logging {

import sqlContext.implicits._

/** Returns the schema of the data from this source */
override def schema: StructType =
StructType(Nil).add("value", StringType)

/** Returns the maximum offset that can be retrieved from the source. */
override def offset: Offset = synchronized {
val filesPresent = fetchAllFiles()
val newFiles = new ArrayBuffer[String]()
filesPresent.foreach { file =>
if (!seenFiles.contains(file)) {
logWarning(s"new file: $file")
newFiles.append(file)
seenFiles.add(file)
} else {
logDebug(s"old file: $file")
}
}

if (newFiles.nonEmpty) {
maxBatchFile += 1
writeBatch(maxBatchFile, newFiles)
}

new LongOffset(maxBatchFile)
}

/**
* Returns the data between the `start` and `end` offsets. This function must always return
* the same set of data for any given pair of offsets.
*/
override def getSlice(
sqlContext: SQLContext,
start: Option[Offset],
end: Offset): RDD[InternalRow] = {
val startId = start.map(_.asInstanceOf[LongOffset].offset).getOrElse(0L)
val endId = end.asInstanceOf[LongOffset].offset

log.warn(s"Producing files from batches $start:$end")
if (endId == -1) {
sparkContext.emptyRDD
} else {
val batchFiles = ((startId + 1) to endId).map(i => s"$metadataPath$i")
log.warn(s"Batch files: $batchFiles")

val files = sqlContext
.read
.text(batchFiles: _*)
.as[String]
.collect()
log.warn(s"Streaming ${files.mkString(", ")}")
sqlContext.read.text(files: _*).queryExecution.executedPlan.execute()
}

}

def restart(): FileStream = {
new FileStream(sqlContext, metadataPath, path)
}

private def sparkContext = sqlContext.sparkContext

val fs = FileSystem.get(sparkContext.hadoopConfiguration)
val existingBatchFiles = fetchAllBatchFiles()
val existingBatchIds = existingBatchFiles.map(_.getPath.getName.toInt)
var maxBatchFile = if (existingBatchIds.isEmpty) -1 else existingBatchIds.max
val seenFiles = new OpenHashSet[String]

if (existingBatchFiles.nonEmpty) {
sqlContext.read
.text(existingBatchFiles.map(_.getPath.toString): _*)
.as[String]
.collect()
.foreach { file =>
seenFiles.add(file)
}
}

private def fetchAllBatchFiles(): Seq[FileStatus] = {
fs.listStatus(new Path(metadataPath))
}

private def fetchAllFiles(): Seq[String] = {
fs.listStatus(new Path(path)).map(_.getPath.toUri.toString)
}

private def writeBatch(id: Int, files: Seq[String]): Unit = {
val path = new Path(metadataPath + id)
val fs = FileSystem.get(sparkContext.hadoopConfiguration)
val writer = new BufferedWriter(new OutputStreamWriter(fs.create(path, true)))
files.foreach { file =>
writer.write(file)
writer.write("\n")
}
writer.close()
logWarning(s"Wrote batch file $path")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ package org.apache.spark.sql.execution.streaming
* stream. In addition to being comparable, a [[Offset]] must have a notion of being empty
* which is used to denote a stream where no processing has yet occurred.
*/
trait Offset {
trait Offset extends Serializable {
def isEmpty: Boolean

def >(other: Offset): Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,36 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LeafNode
import org.apache.spark.sql.types.StructType

trait Source {

/** Returns the schema of the data from this source */
def schema: StructType

trait Source {
/** Returns the maximum offset that can be retrieved from the source. */
def offset: Offset

/**
* Returns the data between the `start` and `end` offsets. This function must always return
* the same set of data for any given pair of offsets.
*/
def getSlice(sqlContext: SQLContext, start: Offset, end: Offset): RDD[InternalRow]
}
def getSlice(sqlContext: SQLContext, start: Option[Offset], end: Offset): RDD[InternalRow]

/** For testing. */
def restart(): Source
}

case class StreamingRelation(source: Source, output: Seq[Attribute]) extends LeafNode {
override def toString: String = source.toString
}

object StreamingRelation {
def apply(source: Source): StreamingRelation =
StreamingRelation(source, source.schema.toAttributes)
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.{LongType, DataType, StructField, StructType}
import org.apache.spark.{Accumulator, Logging}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -28,15 +29,21 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.execution.{SparkPlan, QueryExecution, LogicalRDD}

class EventTimeSource(val max: Accumulator[LongOffset]) extends Source with Serializable {

override def schema: StructType = StructType(Seq(
StructField("watermark", LongType, nullable = false)))

override def offset: Offset = max.value

override def getSlice(
sqlContext: SQLContext, start: Offset, end: Offset): RDD[InternalRow] = ???
sqlContext: SQLContext, start: Option[Offset], end: Offset): RDD[InternalRow] = ???

// HACK
override def equals(other: Any): Boolean = other.isInstanceOf[EventTimeSource]
override def hashCode: Int = 0

override def restart(): Source = this

override def toString: String = "EventTime"
}

Expand All @@ -62,21 +69,22 @@ class StreamExecution(

/** All stream sources present the query plan. */
private val sources =
logicalPlan.collect { case s: Source => s: Source } :+ eventTimeSource
logicalPlan.collect { case s: StreamingRelation => s.source } :+ eventTimeSource

// Start the execution at the current Offset for the sink. (i.e. avoid reprocessing data
// that we have already processed).
sources.foreach { s =>
val sourceOffset = sink.currentOffset(s).getOrElse(LongOffset(-1))
currentOffsets.update(s, sourceOffset)
sink.currentOffset(s).foreach { offset =>
currentOffsets.update(s, offset)
}
}

// Restore the position of the eventtime Offset accumulator
currentOffsets.get(eventTimeSource).foreach {
case w: LongOffset => eventTimeSource.max.setValue(w)
}

logInfo(s"Stream running at $currentOffsets")
println(s"Stream running at $currentOffsets")

/** When false, signals to the microBatchThread that it should stop running. */
@volatile private var shouldRun = true
Expand All @@ -90,6 +98,7 @@ class StreamExecution(
}
microBatchThread.setDaemon(true)
microBatchThread.start()
println("started")

var lastExecution: QueryExecution = null
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: volatile and private[streaming]?


Expand All @@ -98,10 +107,13 @@ class StreamExecution(
* a batch is executed and passed to the sink, updating the currentOffsets.
*/
private def attemptBatch(): Unit = {
// Check to see if any of the input sources have data that has not been processed.
val newData = sources.flatMap {
case s if s.offset > currentOffsets(s) => s -> s.offset :: Nil
case _ => Nil

val newData = sources.flatMap { s =>
val prevOffset = currentOffsets.get(s)
val latestOffset = s.offset
if (prevOffset.isEmpty || latestOffset > prevOffset.get) {
Some(s -> latestOffset)
} else None
}.toMap

if (newData.nonEmpty) {
Expand All @@ -110,10 +122,13 @@ class StreamExecution(

// Replace sources in the logical plan with data that has arrived since the last batch.
val newPlan = logicalPlan transform {
case s: Source if newData.contains(s) =>
val batchInput = s.getSlice(sqlContext, currentOffsets(s), newData(s))
LogicalRDD(s.output, batchInput)(sqlContext)
case s: Source => LocalRelation(s.output)
case StreamingRelation(source, output) =>
if (newData.contains(source)) {
val batchInput = source.getSlice(sqlContext, currentOffsets.get(source), newData(source))
LogicalRDD(output, batchInput)(sqlContext)
} else {
LocalRelation(output)
}
}

val optimizerStart = System.nanoTime()
Expand Down Expand Up @@ -169,9 +184,7 @@ class StreamExecution(
* least the given `Offset`. This method is indented for use primarily when writing tests.
*/
def awaitOffset(source: Source, newOffset: Offset): Unit = {
assert(microBatchThread.isAlive)

while (currentOffsets(source) < newOffset) {
while (!currentOffsets.contains(source) || currentOffsets(source) < newOffset) {
logInfo(s"Waiting until $newOffset at $source")
synchronized { wait() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable

class StreamProgress extends Serializable {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will external sinks create this object if this neither takes a map of offsets in constructor, nor can the update() be called.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a TODO for serialization API.

private val currentOffsets = new mutable.HashMap[Source, Offset]
with mutable.SynchronizedMap[Source, Offset]

def isEmpty: Boolean = currentOffsets.filterNot(_._2.isEmpty).isEmpty

Expand All @@ -34,6 +35,7 @@ class StreamProgress extends Serializable {

def apply(source: Source): Offset = currentOffsets(source)
def get(source: Source): Option[Offset] = currentOffsets.get(source)
def contains(source: Source): Boolean = currentOffsets.contains(source)

def ++(updates: Map[Source, Offset]): StreamProgress = {
val updated = new StreamProgress
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,31 @@ import org.apache.spark.storage.{StreamBlockId, BlockId}

object MemoryStream {
protected val currentBlockId = new AtomicInteger(0)
protected val memoryStreamId = new AtomicInteger(0)

def apply[A : Encoder]: MemoryStream[A] =
new MemoryStream[A](encoderFor[A].schema.toAttributes)
new MemoryStream[A](memoryStreamId.getAndIncrement())
}

case class MemoryStream[A : Encoder](output: Seq[Attribute]) extends LeafNode with Source {
case class MemoryStream[A : Encoder](id: Int) extends Source with Logging {
protected val encoder = encoderFor[A]
protected val logicalPlan = StreamingRelation(this)
protected val output = logicalPlan.output
protected var blocks = new ArrayBuffer[BlockId]
protected var currentOffset: LongOffset = new LongOffset(-1)
protected val encoder = encoderFor[A]

protected def blockManager = SparkEnv.get.blockManager

def schema: StructType = encoder.schema

def offset: Offset = currentOffset

def toDS()(implicit sqlContext: SQLContext): Dataset[A] = {
new Dataset(sqlContext, this)
new Dataset(sqlContext, logicalPlan)
}

def toDF()(implicit sqlContext: SQLContext): DataFrame = {
new DataFrame(sqlContext, this)
new DataFrame(sqlContext, logicalPlan)
}

def addData(data: TraversableOnce[A]): Offset = {
Expand All @@ -73,15 +78,17 @@ case class MemoryStream[A : Encoder](output: Seq[Attribute]) extends LeafNode wi
}
}

def getSlice(sqlContext: SQLContext, start: Offset, end: Offset): RDD[InternalRow] = {
def getSlice(sqlContext: SQLContext, start: Option[Offset], end: Offset): RDD[InternalRow] = {
val newBlocks =
blocks.slice(
start.asInstanceOf[LongOffset].offset.toInt + 1,
start.map(_.asInstanceOf[LongOffset]).getOrElse(LongOffset(-1)).offset.toInt + 1,
end.asInstanceOf[LongOffset].offset.toInt + 1).toArray
logDebug(s"Running [$start, $end] on blocks ${newBlocks.mkString(", ")}")
new BlockRDD[InternalRow](sqlContext.sparkContext, newBlocks)
}

def restart(): Source = this
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for if this just returns this?? Also you have removed this from the Source trait, so whats the point of this now?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

left over from the file tests where you need to restart in testing. removed


override def toString: String = s"MemoryStream[${output.mkString(",")}]"
}

Expand Down
Loading