Skip to content
Closed
Prev Previous commit
Next Next commit
Made name to be default null
  • Loading branch information
tdas committed Dec 2, 2016
commit c20f4fe28d30768857c094d28473656065263a23
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ class StreamExecution(

override val runId: UUID = UUID.randomUUID

/**
* Pretty identified string of printing in logs. Format is
* If name is set "queryName [id = xyz, runId = abc]" else "[id = xyz, runId = abc]"
*/
private val prettyIdString =
Option(name).map(_ + " ").getOrElse("") + s"[id = $id, runId = $runId]"

/** All stream sources present in the query plan. */
protected val sources =
logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
Expand Down Expand Up @@ -142,15 +149,16 @@ class StreamExecution(
private val callSite = Utils.getCallSite()

/** Used to report metrics to coda-hale. This uses id for easier tracking across restarts. */
lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$id")
lazy val streamMetrics = new MetricsReporter(
this, s"spark.streaming.${Option(name).getOrElse(id)}")

/**
* The thread that runs the micro-batches of this stream. Note that this thread must be
* [[org.apache.spark.util.UninterruptibleThread]] to avoid potential deadlocks in using
* [[HDFSMetadataLog]]. See SPARK-14131 for more details.
*/
val microBatchThread =
new StreamExecutionThread(s"stream execution thread for $name") {
new StreamExecutionThread(s"stream execution thread for $prettyIdString") {
override def run(): Unit = {
// To fix call site like "run at <unknown>:0", we bridge the call site from the caller
// thread to this micro batch thread
Expand Down Expand Up @@ -261,10 +269,10 @@ class StreamExecution(
case e: Throwable =>
streamDeathCause = new StreamingQueryException(
this,
s"Query $name terminated with exception: ${e.getMessage}",
s"Query $prettyIdString terminated with exception: ${e.getMessage}",
e,
Some(committedOffsets.toOffsetSeq(sources, offsetSeqMetadata)))
logError(s"Query $name terminated with error", e)
logError(s"Query $prettyIdString terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
// handle them
Expand Down Expand Up @@ -502,7 +510,7 @@ class StreamExecution(
microBatchThread.join()
}
uniqueSources.foreach(_.stop())
logInfo(s"Query $name was stopped")
logInfo(s"Query $prettyIdString was stopped")
}

/**
Expand Down Expand Up @@ -593,7 +601,7 @@ class StreamExecution(
override def explain(): Unit = explain(extended = false)

override def toString: String = {
s"Streaming Query - $name [state = $state]"
s"Streaming Query $prettyIdString [state = $state]"
}

def toDebugString: String = {
Expand All @@ -602,7 +610,7 @@ class StreamExecution(
} else ""
s"""
|=== Streaming Query ===
|Name: $name
|Identifier: $prettyIdString
|Current Offsets: $committedOffsets
|
|Current State: $state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,10 +207,14 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
trigger: Trigger = ProcessingTime(0),
triggerClock: Clock = new SystemClock()): StreamingQuery = {
activeQueriesLock.synchronized {
val name = userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}")
if (activeQueries.values.exists(_.name == name)) {
throw new IllegalArgumentException(
s"Cannot start query with name $name as a query with that name is already active")
val name = userSpecifiedName match {
case Some(n) =>
if (activeQueries.values.exists(_.name == userSpecifiedName.get)) {
throw new IllegalArgumentException(
s"Cannot start query with name $n as a query with that name is already active")
}
n
case None => null
}
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
Expand Down Expand Up @@ -295,8 +299,3 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) {
}
}
}

private object StreamingQueryManager {
private val _nextId = new AtomicLong(0)
private def nextId: Long = _nextId.getAndIncrement()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.sql.streaming

import scala.util.Random

import org.apache.commons.lang3.RandomStringUtils
import org.scalactic.TolerantNumerics
import org.scalatest.concurrent.Eventually._
Expand Down Expand Up @@ -49,22 +47,31 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
test("name unique in active queries") {
withTempDir { dir =>
def startQuery(name: Option[String]): StreamingQuery = {
val writer = MemoryStream[Int].toDS.groupBy().count().writeStream
val writer = MemoryStream[Int].toDS.writeStream
name.foreach(writer.queryName)
writer
.format("memory")
.outputMode("complete")
.foreach(new TestForeachWriter)
.start()
}
val q1 = startQuery(name = Some("q1"))
assert(q1.name === "q1")
val q2 = startQuery(name = Some("q2"))
assert(q2.name === "q2")

// No name by default, multiple active queries can have no name
val q1 = startQuery(name = None)
assert(q1.name === null)
val q2 = startQuery(name = None)
assert(q2.name === null)

// Can be set by user
val q3 = startQuery(name = Some("q3"))
assert(q3.name === "q3")

// Multiple active queries cannot have same name
val e = intercept[IllegalArgumentException] {
startQuery(name = Some("q2"))
startQuery(name = Some("q3"))
}

q1.stop()
q2.stop()
q3.stop()
}
}

Expand Down