Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,14 @@ object MimaExcludes {
// [SPARK-18034] Upgrade to MiMa 0.1.11 to fix flakiness.
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.aggregationDepth"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.getAggregationDepth"),
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_=")
ProblemFilters.exclude[InheritedNewAbstractMethodProblem]("org.apache.spark.ml.param.shared.HasAggregationDepth.org$apache$spark$ml$param$shared$HasAggregationDepth$_setter_$aggregationDepth_="),

// [SPARK-18694] Add StreamingQuery.explain and exception to Python and fix StreamingQueryException
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.StreamingQueryException$"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.startOffset"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.endOffset"),
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryException.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryException.query")
)
}

Expand Down
40 changes: 40 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.readwriter import OptionUtils, to_str
from pyspark.sql.types import *
from pyspark.sql.utils import StreamingQueryException

__all__ = ["StreamingQuery", "StreamingQueryManager", "DataStreamReader", "DataStreamWriter"]

Expand Down Expand Up @@ -132,6 +133,45 @@ def stop(self):
"""
self._jsq.stop()

@since(2.1)
def explain(self, extended=False):
"""Prints the (logical and physical) plans to the console for debugging purpose.

:param extended: boolean, default ``False``. If ``False``, prints only the physical plan.

>>> sq = sdf.writeStream.format('memory').queryName('query_explain').start()
>>> sq.processAllAvailable() # Wait a bit to generate the runtime plans.
>>> sq.explain()
== Physical Plan ==
...
>>> sq.explain(True)
== Parsed Logical Plan ==
...
== Analyzed Logical Plan ==
...
== Optimized Logical Plan ==
...
== Physical Plan ==
...
>>> sq.stop()
"""
# Cannot call `_jsq.explain(...)` because it will print in the JVM process.
# We should print it in the Python process.
print(self._jsq.explainInternal(extended))

@since(2.1)
def exception(self):
"""
:return: the StreamingQueryException if the query was terminated by an exception, or None.
"""
if self._jsq.exception().isDefined():
je = self._jsq.exception().get()
msg = je.toString().split(': ', 1)[1] # Drop the Java StreamingQueryException type info
stackTrace = '\n\t at '.join(map(lambda x: x.toString(), je.getStackTrace()))
return StreamingQueryException(msg, stackTrace)
else:
return None


class StreamingQueryManager(object):
"""A class to manage all the :class:`StreamingQuery` StreamingQueries active.
Expand Down
29 changes: 29 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,35 @@ def test_stream_await_termination(self):
q.stop()
shutil.rmtree(tmpPath)

def test_stream_exception(self):
sdf = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
sq = sdf.writeStream.format('memory').queryName('query_explain').start()
try:
sq.processAllAvailable()
self.assertEqual(sq.exception(), None)
finally:
sq.stop()

from pyspark.sql.functions import col, udf
from pyspark.sql.utils import StreamingQueryException
bad_udf = udf(lambda x: 1 / 0)
sq = sdf.select(bad_udf(col("value")))\
.writeStream\
.format('memory')\
.queryName('this_query')\
.start()
try:
# Process some data to fail the query
sq.processAllAvailable()
self.fail("bad udf should fail the query")
except StreamingQueryException as e:
# This is expected
self.assertTrue("ZeroDivisionError" in e.desc)
finally:
sq.stop()
self.assertTrue(type(sq.exception()) is StreamingQueryException)
self.assertTrue("ZeroDivisionError" in sq.exception().desc)

def test_query_manager_await_termination(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
for q in self.spark._wrapped.streams.active:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ class StreamExecution(
* once, since the field's value may change at any time.
*/
@volatile
protected var availableOffsets = new StreamProgress
var availableOffsets = new StreamProgress

/** The current batchId or -1 if execution has not yet been initialized. */
protected var currentBatchId: Long = -1
Expand Down Expand Up @@ -263,7 +263,8 @@ class StreamExecution(
this,
s"Query $name terminated with exception: ${e.getMessage}",
e,
Some(committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)))
committedOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString,
availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json).toString)
logError(s"Query $name terminated with error", e)
updateStatusMessage(s"Terminated with exception: ${e.getMessage}")
// Rethrow the fatal errors to allow the user using `Thread.UncaughtExceptionHandler` to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,42 @@ import org.apache.spark.sql.execution.streaming.{Offset, OffsetSeq, StreamExecut
* :: Experimental ::
* Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual exception
* that caused the failure.
* @param query Query that caused the exception
* @param message Message of this exception
* @param cause Internal cause of this exception
* @param startOffset Starting offset (if known) of the range of data in which exception occurred
* @param endOffset Ending offset (if known) of the range of data in exception occurred
* @param startOffset Starting offset in json of the range of data in which exception occurred
* @param endOffset Ending offset in json of the range of data in exception occurred
* @since 2.0.0
*/
@Experimental
class StreamingQueryException private[sql](
@transient val query: StreamingQuery,
class StreamingQueryException private(
causeString: String,
val message: String,
val cause: Throwable,
val startOffset: Option[OffsetSeq] = None,
val endOffset: Option[OffsetSeq] = None)
val startOffset: String,
val endOffset: String)
extends Exception(message, cause) {

private[sql] def this(
query: StreamingQuery,
message: String,
cause: Throwable,
startOffset: String,
endOffset: String) {
this(
// scalastyle:off
s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}
|
|${query.asInstanceOf[StreamExecution].toDebugString}
""".stripMargin,
// scalastyle:on
message,
cause,
startOffset,
endOffset)
}

/** Time when the exception occurred */
val time: Long = System.currentTimeMillis

override def toString(): String = {
val causeStr =
s"${cause.getMessage} ${cause.getStackTrace.take(10).mkString("", "\n|\t", "\n")}"
s"""
|$causeStr
|
|${query.asInstanceOf[StreamExecution].toDebugString}
""".stripMargin
}
override def toString(): String = causeString
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ import org.apache.spark.annotation.Experimental
class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long) {

/** The compact JSON representation of this progress. */
def json: String = compact(render(jsonValue))

/** The pretty (i.e. indented) JSON representation of this progress. */
def prettyJson: String = pretty(render(jsonValue))

private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -412,8 +412,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
eventually("microbatch thread not stopped after termination with failure") {
assert(!currentStream.microBatchThread.isAlive)
}
verify(thrownException.query.eq(currentStream),
s"incorrect query reference in exception")
verify(currentStream.exception === Some(thrownException),
s"incorrect exception returned by query.exception()")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
TestAwaitTermination(ExpectException[SparkException]),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 2000),
TestAwaitTermination(ExpectException[SparkException], timeoutMs = 10),
AssertOnQuery(
q => q.exception.get.startOffset.get.offsets ===
q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").offsets,
"incorrect start offset on exception")
AssertOnQuery(q => {
q.exception.get.startOffset ===
q.committedOffsets.toOffsetSeq(Seq(inputData), "{}").toString &&
q.exception.get.endOffset ===
q.availableOffsets.toOffsetSeq(Seq(inputData), "{}").toString
}, "incorrect start offset or end offset on exception")
)
}

Expand Down