Skip to content
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sql-migration-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ license: |
- Since Spark 3.5, Spark thrift server will interrupt task when canceling a running statement. To restore the previous behavior, set `spark.sql.thriftServer.interruptOnCancel` to `false`.
- Since Spark 3.5, the Avro will throw `AnalysisException` when reading Interval types as Date or Timestamp types, or reading Decimal types with lower precision. To restore the legacy behavior, set `spark.sql.legacy.avro.allowIncompatibleSchema` to `true`
- Since Spark 3.5, Row's json and prettyJson methods are moved to `ToJsonUtil`.
- Since Spark 3.5, ParseException is a subclass of SparkException instead of AnalysisException.
- Since Spark 3.5, the `plan` field is moved from `AnalysisException` to `EnhancedAnalysisException`.

## Upgrading from Spark SQL 3.3 to 3.4

Expand Down
4 changes: 3 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ object MimaExcludes {
// [SPARK-43952][CORE][CONNECT][SQL] Add SparkContext APIs for query cancellation by tag
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.JobData.this"),
// [SPARK-44205][SQL] Extract Catalyst Code from DecimalType
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply"),
// [SPARK-44507][SQL][CONNECT] Move AnalysisException to sql/api.
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.AnalysisException")
)

// Defulat exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import scala.collection.JavaConverters._

import org.apache.spark.{QueryContext, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.trees.{Origin, WithOrigin}

/**
Expand All @@ -34,8 +33,6 @@ class AnalysisException protected[sql] (
val message: String,
val line: Option[Int] = None,
val startPosition: Option[Int] = None,
// Some plans fail to serialize due to bugs in scala collections.
@transient val plan: Option[LogicalPlan] = None,
val cause: Option[Throwable] = None,
val errorClass: Option[String] = None,
val messageParameters: Map[String, String] = Map.empty,
Expand Down Expand Up @@ -102,12 +99,11 @@ class AnalysisException protected[sql] (
message: String = this.message,
line: Option[Int] = this.line,
startPosition: Option[Int] = this.startPosition,
plan: Option[LogicalPlan] = this.plan,
cause: Option[Throwable] = this.cause,
errorClass: Option[String] = this.errorClass,
messageParameters: Map[String, String] = this.messageParameters,
context: Array[QueryContext] = this.context): AnalysisException =
new AnalysisException(message, line, startPosition, plan, cause, errorClass,
new AnalysisException(message, line, startPosition, cause, errorClass,
messageParameters, context)

def withPosition(origin: Origin): AnalysisException = {
Expand All @@ -119,10 +115,7 @@ class AnalysisException protected[sql] (
newException
}

override def getMessage: String = {
val planAnnotation = Option(plan).flatten.map(p => s";\n$p").getOrElse("")
getSimpleMessage + planAnnotation
}
override def getMessage: String = getSimpleMessage

// Outputs an exception without the logical plan.
// For testing only
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.antlr.v4.runtime.atn.PredictionMode
import org.antlr.v4.runtime.misc.{Interval, ParseCancellationException}
import org.antlr.v4.runtime.tree.TerminalNodeImpl

import org.apache.spark.{QueryContext, SparkException, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.{QueryContext, SparkThrowable, SparkThrowableHelper}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SqlApiConf
import org.apache.spark.sql.{AnalysisException, SqlApiConf}
import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, Origin, WithOrigin}
import org.apache.spark.sql.catalyst.util.SparkParserUtils
import org.apache.spark.sql.errors.QueryParsingErrors
Expand Down Expand Up @@ -184,15 +184,17 @@ case object ParseErrorListener extends BaseErrorListener {
*/
class ParseException(
val command: Option[String],
val message: String,
message: String,
val start: Origin,
val stop: Origin,
val errorClass: Option[String] = None,
val messageParameters: Map[String, String] = Map.empty,
val queryContext: Array[QueryContext] = ParseException.getQueryContext())
extends SparkException(
errorClass: Option[String] = None,
messageParameters: Map[String, String] = Map.empty,
queryContext: Array[QueryContext] = ParseException.getQueryContext())
extends AnalysisException(
message,
cause = null,
start.line,
start.startPosition,
None,
errorClass,
messageParameters,
queryContext) {
Expand Down Expand Up @@ -222,14 +224,6 @@ class ParseException(
Some(errorClass),
messageParameters)

// Methods added to retain compatibility with AnalysisException.
@deprecated("Use start.line instead.")
def line: Option[Int] = start.line
@deprecated("Use start.startPosition instead.")
def startPosition: Option[Int] = start.startPosition
@deprecated("ParseException is never caused by another exception.")
def cause: Option[Throwable] = None

override def getMessage: String = {
val builder = new StringBuilder
builder ++= "\n" ++= message
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst

import org.apache.spark.QueryContext
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

/**
* Internal [[AnalysisException]] that also captures a [[LogicalPlan]].
*/
class ExtendedAnalysisException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means users would see the new error class in stacktraces? e.g.
org.apache.spark.sql.catalyst.AnalysisException -> org.apache.spark.sql.catalyst.ExtendedAnalysisException

Should we try to keep the original name by overriding toString method?

Throwable.java
public String toString() {
        String s = getClass().getName();
        String message = getLocalizedMessage();
        return (message != null) ? (s + ": " + message) : s;
    }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be OK with that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The current idea is there are a bit less place to use the plan field of the AnalysisException thus this switch cause less changes.

message: String,
line: Option[Int] = None,
startPosition: Option[Int] = None,
// Some plans fail to serialize due to bugs in scala collections.
@transient val plan: Option[LogicalPlan] = None,
cause: Option[Throwable] = None,
errorClass: Option[String] = None,
messageParameters: Map[String, String] = Map.empty,
context: Array[QueryContext] = Array.empty)
extends AnalysisException(
message,
line,
startPosition,
cause,
errorClass,
messageParameters,
context) {

def this(e: AnalysisException, plan: LogicalPlan) = {
this(
e.message,
e.line,
e.startPosition,
Option(plan),
e.cause,
e.errorClass,
e.messageParameters,
e.context)
setStackTrace(e.getStackTrace)
}

override def copy(
message: String,
line: Option[Int],
startPosition: Option[Int],
cause: Option[Throwable],
errorClass: Option[String],
messageParameters: Map[String, String],
context: Array[QueryContext]): ExtendedAnalysisException = {
new ExtendedAnalysisException(message, line, startPosition, plan, cause, errorClass,
messageParameters, context)
}

override def getMessage: String = {
val planAnnotation = Option(plan).flatten.map(p => s";\n$p").getOrElse("")
getSimpleMessage + planAnnotation
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst._
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.encoders.OuterScopes
import org.apache.spark.sql.catalyst.expressions.{Expression, FrameLessOffsetWindowFunction, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.SubExprUtils._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.objects._
Expand Down Expand Up @@ -212,9 +212,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
analyzed
} catch {
case e: AnalysisException =>
val ae = e.copy(plan = Option(analyzed))
ae.setStackTrace(e.getStackTrace)
throw ae
throw new ExtendedAnalysisException(e, analyzed)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, CaseWhen, Cast, CreateNamedStruct, Expression, GetStructField, IsNotNull, LessThan, Literal, PreciseTimestampConversion, SessionWindow, Subtract, TimeWindow, WindowTime}
import org.apache.spark.sql.catalyst.plans.logical.{Expand, Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -310,7 +310,7 @@ object ResolveWindowTime extends Rule[LogicalPlan] {
if (!metadata.contains(TimeWindow.marker) &&
!metadata.contains(SessionWindow.marker)) {
// FIXME: error framework?
throw new AnalysisException(
throw new ExtendedAnalysisException(
s"The input is not a correct window column: $windowTime", plan = Some(p))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, BinaryComparison, CurrentDate, CurrentTimestampLike, Expression, GreaterThan, GreaterThanOrEqual, GroupingSets, LessThan, LessThanOrEqual, LocalTimestamp, MonotonicallyIncreasingID, SessionWindow, WindowExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans._
Expand Down Expand Up @@ -570,7 +571,7 @@ object UnsupportedOperationChecker extends Logging {
}

private def throwError(msg: String)(implicit operator: LogicalPlan): Nothing = {
throw new AnalysisException(
throw new ExtendedAnalysisException(
msg, operator.origin.line, operator.origin.startPosition, Some(operator))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.hadoop.fs.Path

import org.apache.spark.{SPARK_DOC_ROOT, SparkException, SparkThrowable, SparkThrowableHelper, SparkUnsupportedOperationException}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.{ExtendedAnalysisException, FunctionIdentifier, QualifiedTableName, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.{CannotReplaceMissingTableException, FunctionAlreadyExistsException, NamespaceAlreadyExistsException, NoSuchFunctionException, NoSuchNamespaceException, NoSuchPartitionException, NoSuchTableException, ResolvedTable, Star, TableAlreadyExistsException, UnresolvedRegex}
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, InvalidUDFClassException}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
Expand Down Expand Up @@ -1901,7 +1901,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {

def streamJoinStreamWithoutEqualityPredicateUnsupportedError(plan: LogicalPlan): Throwable = {
val errorClass = "_LEGACY_ERROR_TEMP_1181"
new AnalysisException(
new ExtendedAnalysisException(
SparkThrowableHelper.getMessage(errorClass, Map.empty[String, String]),
errorClass = Some(errorClass),
messageParameters = Map.empty,
Expand Down
5 changes: 3 additions & 2 deletions sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import scala.collection.JavaConverters._

import org.scalatest.Assertions

import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.execution.SQLExecution
Expand Down Expand Up @@ -96,7 +97,7 @@ abstract class QueryTest extends PlanTest {

private def getResult[T](ds: => Dataset[T]): Array[T] = {
val analyzedDS = try ds catch {
case ae: AnalysisException =>
case ae: ExtendedAnalysisException =>
if (ae.plan.isDefined) {
fail(
s"""
Expand Down Expand Up @@ -131,7 +132,7 @@ abstract class QueryTest extends PlanTest {
*/
protected def checkAnswer(df: => DataFrame, expectedAnswer: Seq[Row]): Unit = {
val analyzedDF = try df catch {
case ae: AnalysisException =>
case ae: ExtendedAnalysisException =>
if (ae.plan.isDefined) {
fail(
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.commons.io.FileUtils

import org.apache.spark.{AccumulatorSuite, SPARK_DOC_ROOT, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.ExtendedAnalysisException
import org.apache.spark.sql.catalyst.expressions.{GenericRow, Hex}
import org.apache.spark.sql.catalyst.expressions.Cast._
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, Partial}
Expand Down Expand Up @@ -2634,11 +2635,12 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
assert(!jobStarted.get(), "Command should not trigger a Spark job.")
}

test("SPARK-20164: AnalysisException should be tolerant to null query plan") {
test("SPARK-20164: ExtendedAnalysisException should be tolerant to null query plan") {
try {
throw new AnalysisException("", None, None, plan = null)
throw new ExtendedAnalysisException("", None, None, plan = null)
} catch {
case ae: AnalysisException => assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage)
case ae: ExtendedAnalysisException =>
assert(ae.plan == null && ae.getMessage == ae.getSimpleMessage)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ trait SQLQueryTestHelper extends Logging {
// Do not output the logical plan tree which contains expression IDs.
// Also implement a crude way of masking expression IDs in the error message
// with a generic pattern "###".
val msg = if (a.plan.nonEmpty) a.getSimpleMessage else a.getMessage
(emptySchema, Seq(a.getClass.getName, msg.replaceAll("#\\d+", "#x")))
(emptySchema, Seq(a.getClass.getName, a.getSimpleMessage.replaceAll("#\\d+", "#x")))
case s: SparkException if s.getCause != null =>
// For a runtime exception, it is hard to match because its message contains
// information of stage, task ID, etc.
Expand Down