Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
continuous processing working
  • Loading branch information
jose-torres committed Feb 9, 2018
commit 4dfe57dc9364752988f9f0ed6e00c3a1d4d39e83
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ class ForeachDataWriter[T : Encoder](
case OPENED_SKIP_PROCESSING => ()
case CLOSED =>
// First record of a new epoch, so we need to open a new writer for it.
currentEpochId += 1
openAndSetState(currentEpochId)
writer.process(encoder.fromRow(record))
}
Expand All @@ -146,7 +145,13 @@ class ForeachDataWriter[T : Encoder](
}

override def commit(): WriterCommitMessage = {
writer.close(null)
// Close if the writer got opened for this epoch.
state match {
case CLOSED => ()
case _ => writer.close(null)
}
state = CLOSED
currentEpochId += 1
ForeachWriterCommitMessage
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -565,6 +565,7 @@ abstract class StreamExecution(

object StreamExecution {
val QUERY_ID_KEY = "sql.streaming.queryId"
val BATCH_ID_KEY = "sql.streaming.batchId"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.execution.streaming

import java.io.Serializable
import java.util.concurrent.ConcurrentLinkedQueue

import scala.collection.mutable
Expand All @@ -26,7 +27,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.ForeachWriter
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest, Trigger}
import org.apache.spark.sql.test.SharedSQLContext

class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAfter {
Expand Down Expand Up @@ -281,8 +282,67 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
"writer was reused!")
}
}

testQuietly("foreach sink for continuous query") {
withTempDir { checkpointDir =>
val query = spark.readStream
.format("rate")
.option("numPartitions", "1")
.option("rowsPerSecond", "5")
.load()
.select('value.cast("INT"))
.map(r => r.getInt(0))
.writeStream
.option("checkpointLocation", checkpointDir.getCanonicalPath)
.trigger(Trigger.Continuous(500))
.foreach(new TestForeachWriter with Serializable {
override def process(value: Int): Unit = {
super.process(this.hashCode())
}
}).start()
try {
// Wait until we get 3 epochs with at least 3 events in them. This means we'll see
// open, close, and at least 1 process.
eventually(timeout(streamingTimeout)) {
// Check
assert(ForeachSinkSuite.allEvents().count(_.size >= 3) === 3)
}

val allEvents = ForeachSinkSuite.allEvents().filter(_.size >= 3)
// Check open and close events.
allEvents(0).head match {
case ForeachSinkSuite.Open(0, _) =>
case e => assert(false, s"unexpected event $e")
}
allEvents(1).head match {
case ForeachSinkSuite.Open(0, _) =>
case e => assert(false, s"unexpected event $e")
}
allEvents(2).head match {
case ForeachSinkSuite.Open(0, _) =>
case e => assert(false, s"unexpected event $e")
}
assert(allEvents(0).last == ForeachSinkSuite.Close(None))
assert(allEvents(1).last == ForeachSinkSuite.Close(None))
assert(allEvents(2).last == ForeachSinkSuite.Close(None))

// Check the first Process event in each epoch, and also check the writer IDs
// we packed in to make sure none got reused.
val writerIds = (0 to 2).map { i =>
allEvents(i)(1).asInstanceOf[ForeachSinkSuite.Process[Int]].value
}
assert(
writerIds.toSet.size == 3,
s"writer was reused! expected 3 unique writers but saw $writerIds")
} finally {
query.stop()
}
}
}
}



/** A global object to collect events in the executor */
object ForeachSinkSuite {

Expand Down