Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
82e2f09
Fix part of undocumented/duplicated arguments warnings by CRAN-check
junyangq Aug 9, 2016
41d9dca
[SPARK-16950] [PYSPARK] fromOffsets parameter support in KafkaUtils.c…
Aug 9, 2016
44115e9
[SPARK-16956] Make ApplicationState.MAX_NUM_RETRY configurable
JoshRosen Aug 9, 2016
2d136db
[SPARK-16905] SQL DDL: MSCK REPAIR TABLE
Aug 9, 2016
901edbb
More fixes of the docs.
junyangq Aug 10, 2016
475ee38
Fixed typo
jupblb Aug 10, 2016
2285de7
[SPARK-16522][MESOS] Spark application throws exception on exit.
sun-rui Aug 10, 2016
20efb79
[SPARK-16324][SQL] regexp_extract should doc that it returns empty st…
srowen Aug 10, 2016
719ac5f
[SPARK-15899][SQL] Fix the construction of the file path with hadoop …
avulanov Aug 10, 2016
15637f7
Revert "[SPARK-15899][SQL] Fix the construction of the file path with…
srowen Aug 10, 2016
977fbbf
[SPARK-15639] [SPARK-16321] [SQL] Push down filter at RowGroups level…
viirya Aug 10, 2016
d3a30d2
[SPARK-16579][SPARKR] add install.spark function
junyangq Aug 10, 2016
1e40135
[SPARK-17010][MINOR][DOC] Wrong description in memory management docu…
WangTaoTheTonic Aug 11, 2016
8611bc2
[SPARK-16866][SQL] Infrastructure for file-based SQL end-to-end tests
petermaxlee Aug 10, 2016
51b1016
[SPARK-17008][SPARK-17009][SQL] Normalization and isolation in SQLQue…
petermaxlee Aug 11, 2016
ea8a198
[SPARK-17007][SQL] Move test data files into a test-data folder
petermaxlee Aug 11, 2016
4b434e7
[SPARK-17011][SQL] Support testing exceptions in SQLQueryTestSuite
petermaxlee Aug 11, 2016
0ed6236
Correct example value for spark.ssl.YYY.XXX settings
ash211 Aug 11, 2016
33a213f
[SPARK-15899][SQL] Fix the construction of the file path with hadoop …
avulanov Aug 11, 2016
b87ba8f
Fix remaining undocumented/duplicated warnings
junyangq Aug 11, 2016
6bf20cd
[SPARK-17015][SQL] group-by/order-by ordinal and arithmetic tests
petermaxlee Aug 11, 2016
bc683f0
[SPARK-17018][SQL] literals.sql for testing literal parsing
petermaxlee Aug 11, 2016
0fb0149
[SPARK-17022][YARN] Handle potential deadlock in driver handling mess…
WangTaoTheTonic Aug 11, 2016
d2c1d64
Keep to the convention where we have docs for generic and the function.
junyangq Aug 12, 2016
b4047fc
[SPARK-16975][SQL] Column-partition path starting '_' should be handl…
dongjoon-hyun Aug 12, 2016
bde94cd
[SPARK-17013][SQL] Parse negative numeric literals
petermaxlee Aug 12, 2016
38378f5
[SPARK-12370][DOCUMENTATION] Documentation should link to examples …
jagadeesanas2 Aug 13, 2016
a21ecc9
[SPARK-17023][BUILD] Upgrade to Kafka 0.10.0.1 release
lresende Aug 13, 2016
750f880
[SPARK-16966][SQL][CORE] App Name is a randomUUID even when "spark.ap…
srowen Aug 13, 2016
e02d0d0
[SPARK-17027][ML] Avoid integer overflow in PolynomialExpansion.getPo…
zero323 Aug 14, 2016
8f4cacd
[SPARK-16508][SPARKR] Split docs for arrange and orderBy methods
junyangq Aug 15, 2016
4503632
[SPARK-17065][SQL] Improve the error message when encountering an inc…
zsxwing Aug 15, 2016
e5771a1
Fix docs for window functions
junyangq Aug 16, 2016
2e2c787
[SPARK-16964][SQL] Remove private[hive] from sql.hive.execution package
hvanhovell Aug 16, 2016
237ae54
Revert "[SPARK-16964][SQL] Remove private[hive] from sql.hive.executi…
rxin Aug 16, 2016
1c56971
[SPARK-16964][SQL] Remove private[sql] and private[spark] from sql.ex…
hvanhovell Aug 16, 2016
022230c
[SPARK-16519][SPARKR] Handle SparkR RDD generics that create warnings…
felixcheung Aug 16, 2016
6cb3eab
[SPARK-17089][DOCS] Remove api doc link for mapReduceTriplets operator
phalodi Aug 16, 2016
3e0163b
[SPARK-17084][SQL] Rename ParserUtils.assert to validate
hvanhovell Aug 17, 2016
68a24d3
[MINOR][DOC] Fix the descriptions for `properties` argument in the do…
Aug 17, 2016
22c7660
[SPARK-15285][SQL] Generated SpecificSafeProjection.apply method grow…
kiszk Aug 17, 2016
394d598
[SPARK-17102][SQL] bypass UserDefinedGenerator for json format check
cloud-fan Aug 17, 2016
9406f82
[SPARK-17096][SQL][STREAMING] Improve exception string reported throu…
tdas Aug 17, 2016
585d1d9
[SPARK-17038][STREAMING] fix metrics retrieval source of 'lastReceive…
keypointt Aug 17, 2016
91aa532
[SPARK-16995][SQL] TreeNodeException when flat mapping RelationalGrou…
viirya Aug 18, 2016
5735b8b
[SPARK-16391][SQL] Support partial aggregation for reduceGroups
rxin Aug 18, 2016
ec5f157
[SPARK-17117][SQL] 1 / NULL should not fail analysis
petermaxlee Aug 18, 2016
0bc3753
Fix part of undocumented/duplicated arguments warnings by CRAN-check
junyangq Aug 9, 2016
6d5233e
More fixes of the docs.
junyangq Aug 10, 2016
0edfd7d
Fix remaining undocumented/duplicated warnings
junyangq Aug 11, 2016
e72a6aa
Keep to the convention where we have docs for generic and the function.
junyangq Aug 12, 2016
afa69ed
Fix docs for window functions
junyangq Aug 16, 2016
c9cfe43
some fixes of R doc
junyangq Aug 18, 2016
3aafaa7
Move param docs from generic function to method definition.
junyangq Aug 18, 2016
315a0dd
some fixes of R doc
junyangq Aug 18, 2016
aa3d233
Move param docs from generic function to method definition.
junyangq Aug 18, 2016
71170e9
Solve conflicts.
junyangq Aug 18, 2016
2682719
Revert "Fix docs for window functions"
junyangq Aug 18, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-17096][SQL][STREAMING] Improve exception string reported throu…
…gh the StreamingQueryListener

## What changes were proposed in this pull request?

Currently, the stackTrace (as `Array[StackTraceElements]`) reported through StreamingQueryListener.onQueryTerminated is useless as it has the stack trace of where StreamingQueryException is defined, not the stack trace of underlying exception.  For example, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.stackTrace` will have
```
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:211)
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)
```
This is basically useless, as it is location where the StreamingQueryException was defined. What we want is

Here is the right way to reason about what should be posted as through StreamingQueryListener.onQueryTerminated
- The actual exception could either be a SparkException, or an arbitrary exception.
  - SparkException reports the relevant executor stack trace of a failed task as a string in the the exception message. The `Array[StackTraceElements]` returned by `SparkException.stackTrace()` is mostly irrelevant.
  - For any arbitrary exception, the `Array[StackTraceElements]` returned by `exception.stackTrace()` may be relevant.
- When there is an error in a streaming query, it's hard to reason whether the `Array[StackTraceElements]` is useful or not. In fact, it is not clear whether it is even useful to report the stack trace as this array of Java objects. It may be sufficient to report the strack trace as a string, along with the message. This is how Spark reported executor stra
- Hence, this PR simplifies the API by removing the array `stackTrace` from `QueryTerminated`. Instead the `exception` returns a string containing the message and the stack trace of the actual underlying exception that failed the streaming query (i.e. not that of the StreamingQueryException). If anyone is interested in the actual stack trace as an array, can always access them through `streamingQuery.exception` which returns the exception object.

With this change, if a streaming query fails because of a / by zero exception in a task, the `QueryTerminated.exception` will be
```
org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost): java.lang.ArithmeticException: / by zero
	at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply$mcII$sp(StreamingQueryListenerSuite.scala:153)
	at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153)
	at org.apache.spark.sql.streaming.StreamingQueryListenerSuite$$anonfun$5$$anonfun$apply$mcV$sp$4$$anonfun$apply$mcV$sp$5.apply(StreamingQueryListenerSuite.scala:153)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:232)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:226)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
	at org.apache.spark.scheduler.Task.run(Task.scala:86)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:744)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1429)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1417)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1416)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1416)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:802)
...
```
It contains the relevant executor stack trace. In a case non-SparkException, if the streaming source MemoryStream throws an exception, exception message will have the relevant stack trace.
```
java.lang.RuntimeException: this is the exception message
	at org.apache.spark.sql.execution.streaming.MemoryStream.getBatch(memory.scala:103)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:316)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$5.apply(StreamExecution.scala:313)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
	at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
	at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
	at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:313)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:197)
	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:187)
	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:124)
```

Note that this change in the public `QueryTerminated` class is okay as the APIs are still experimental.

## How was this patch tested?
Unit tests that test whether the right information is present in the exception message reported through QueryTerminated object.

Author: Tathagata Das <[email protected]>

Closes #14675 from tdas/SPARK-17096.

(cherry picked from commit d60af8f)
Signed-off-by: Tathagata Das <[email protected]>
  • Loading branch information
tdas committed Aug 17, 2016
commit 9406f82db1e96c84bfacb4cac9b74aab6d4fde06
Original file line number Diff line number Diff line change
Expand Up @@ -217,10 +217,7 @@ class StreamExecution(
} finally {
state = TERMINATED
sparkSession.streams.notifyQueryTermination(StreamExecution.this)
postEvent(new QueryTerminated(
this.toInfo,
exception.map(_.getMessage),
exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
postEvent(new QueryTerminated(this.toInfo, exception.map(_.cause).map(Utils.exceptionString)))
terminationLatch.countDown()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import org.apache.spark.sql.execution.streaming.{Offset, StreamExecution}

/**
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]].
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
* @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,5 @@ object StreamingQueryListener {
@Experimental
class QueryTerminated private[sql](
val queryInfo: StreamingQueryInfo,
val exception: Option[String],
val stackTrace: Seq[StackTraceElement]) extends Event
val exception: Option[String]) extends Event
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.id === query.id)
assert(status.sourceStatuses(0).offsetDesc === Some(LongOffset(0).toString))
assert(status.sinkStatus.offsetDesc === CompositeOffset.fill(LongOffset(0)).toString)
assert(listener.terminationStackTrace.isEmpty)
assert(listener.terminationException === None)
}
listener.checkAsyncErrors()
Expand Down Expand Up @@ -147,7 +146,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

test("exception should be reported in QueryTerminated") {
testQuietly("exception should be reported in QueryTerminated") {
val listener = new QueryStatusCollector
withListenerAdded(listener) {
val input = MemoryStream[Int]
Expand All @@ -159,8 +158,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
spark.sparkContext.listenerBus.waitUntilEmpty(10000)
assert(listener.terminationStatus !== null)
assert(listener.terminationException.isDefined)
// Make sure that the exception message reported through listener
// contains the actual exception and relevant stack trace
assert(!listener.terminationException.get.contains("StreamingQueryException"))
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
assert(listener.terminationStackTrace.nonEmpty)
assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
}
)
}
Expand Down Expand Up @@ -205,8 +207,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val exception = new RuntimeException("exception")
val queryQueryTerminated = new StreamingQueryListener.QueryTerminated(
queryTerminatedInfo,
Some(exception.getMessage),
exception.getStackTrace)
Some(exception.getMessage))
val json =
JsonProtocol.sparkEventToJson(queryQueryTerminated)
val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
Expand Down Expand Up @@ -262,7 +263,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
@volatile var startStatus: StreamingQueryInfo = null
@volatile var terminationStatus: StreamingQueryInfo = null
@volatile var terminationException: Option[String] = null
@volatile var terminationStackTrace: Seq[StackTraceElement] = null

val progressStatuses = new ConcurrentLinkedQueue[StreamingQueryInfo]

Expand Down Expand Up @@ -296,7 +296,6 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(startStatus != null, "onQueryTerminated called before onQueryStarted")
terminationStatus = queryTerminated.queryInfo
terminationException = queryTerminated.exception
terminationStackTrace = queryTerminated.stackTrace
}
asyncTestWaiter.dismiss()
}
Expand Down