Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Removed stacktrace from QueryTerminated
  • Loading branch information
tdas committed Aug 16, 2016
commit dafdbb6c06dd2be49ede3fd6a4c745e0001bf272
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,7 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(new QueryTerminated(
this.toInfo,
exception.map(_.getMessage),
exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
postEvent(new QueryTerminated(this.toInfo, exception.map(_.getMessage)))
terminationLatch.countDown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}

/**
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]].
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
* @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,5 @@ object StreamingQueryListener {
@Experimental
class QueryTerminated private[sql](
val queryInfo: StreamingQueryInfo,
val exception: Option[String],
val stackTrace: Seq[StackTraceElement]) extends Event
val exception: Option[String]) extends Event
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.id === query.id)
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
assert(listener.terminationStackTrace.isEmpty)
assert(listener.terminationException === None)
}
listener.checkAsyncErrors()
Expand Down Expand Up @@ -147,7 +146,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

test("exception should be reported in QueryTerminated") {
testQuietly("exception should be reported in QueryTerminated") {
val listener = new QueryStatusCollector
withListenerAdded(listener) {
val input = MemoryStream[Int]
Expand All @@ -159,8 +158,12 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
assert(listener.terminationStatus !== null)
assert(listener.terminationException.isDefined)
// Make sure that the exception message reported through listener
// contains the actual exception and relevant stack trace
assert(!listener.terminationException.get.contains("StreamingQueryException"))
assert(listener.terminationException.get.contains("terminated with exception"))
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
assert(listener.terminationStackTrace.nonEmpty)
assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
}
)
}
Expand Down Expand Up @@ -205,8 +208,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
queryTerminatedInfo,
Some(exception.getMessage),
exception.getStackTrace)
Some(exception.getMessage))
val json =
JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
Expand Down Expand Up @@ -262,7 +264,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
@volatile var startStatus: StreamingQueryInfo = null
@volatile var terminationStatus: StreamingQueryInfo = null
@volatile var terminationException: Option[String] = null
@volatile var terminationStackTrace: Seq[StackTraceElement] = null

val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo]

Expand Down Expand Up @@ -296,7 +297,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = queryTerminated.queryInfo
terminationException = queryTerminated.exception
terminationStackTrace = queryTerminated.stackTrace
}
asyncTestWaiter.dismiss()
}
Expand Down