diff --git a/bin/docker-image-tool.sh b/bin/docker-image-tool.sh index 57b86254ab42..8a01b80c4164 100755 --- a/bin/docker-image-tool.sh +++ b/bin/docker-image-tool.sh @@ -19,6 +19,8 @@ # This script builds and pushes docker images when run from a release of Spark # with Kubernetes support. +set -x + function error { echo "$@" 1>&2 exit 1 @@ -172,6 +174,7 @@ function build { local BASEDOCKERFILE=${BASEDOCKERFILE:-"kubernetes/dockerfiles/spark/Dockerfile"} local PYDOCKERFILE=${PYDOCKERFILE:-false} local RDOCKERFILE=${RDOCKERFILE:-false} + local ARCHS=${ARCHS:-"--platform linux/amd64,linux/arm64"} (cd $(img_ctx_dir base) && docker build $NOCACHEARG "${BUILD_ARGS[@]}" \ -t $(image_ref spark) \ @@ -179,6 +182,11 @@ function build { if [ $? -ne 0 ]; then error "Failed to build Spark JVM Docker image, please refer to Docker build output for details." fi + if [ "${CROSS_BUILD}" != "false" ]; then + (cd $(img_ctx_dir base) && docker buildx build $ARCHS $NOCACHEARG "${BUILD_ARGS[@]}" \ + -t $(image_ref spark) \ + -f "$BASEDOCKERFILE" .) + fi if [ "${PYDOCKERFILE}" != "false" ]; then (cd $(img_ctx_dir pyspark) && docker build $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ @@ -187,6 +195,11 @@ function build { if [ $? -ne 0 ]; then error "Failed to build PySpark Docker image, please refer to Docker build output for details." fi + if [ "${CROSS_BUILD}" != "false" ]; then + (cd $(img_ctx_dir pyspark) && docker buildx build $ARCHS $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ + -t $(image_ref spark-py) \ + -f "$PYDOCKERFILE" .) + fi fi if [ "${RDOCKERFILE}" != "false" ]; then @@ -196,6 +209,11 @@ function build { if [ $? -ne 0 ]; then error "Failed to build SparkR Docker image, please refer to Docker build output for details." fi + if [ "${CROSS_BUILD}" != "false" ]; then + (cd $(img_ctx_dir sparkr) && docker buildx build $ARCHS $NOCACHEARG "${BINDING_BUILD_ARGS[@]}" \ + -t $(image_ref spark-r) \ + -f "$RDOCKERFILE" .) + fi fi } @@ -227,6 +245,8 @@ Options: -n Build docker image with --no-cache -u uid UID to use in the USER directive to set the user the main Spark process runs as inside the resulting container + -X Use docker buildx to cross build. Automatically pushes. + See https://docs.docker.com/buildx/working-with-buildx/ for steps to setup buildx. -b arg Build arg to build or push the image. For multiple build args, this option needs to be used separately for each build arg. @@ -252,6 +272,12 @@ Examples: - Build and push JDK11-based image with tag "v3.0.0" to docker.io/myrepo $0 -r docker.io/myrepo -t v3.0.0 -b java_image_tag=11-jre-slim build $0 -r docker.io/myrepo -t v3.0.0 push + + - Build and push JDK11-based image for multiple archs to docker.io/myrepo + $0 -r docker.io/myrepo -t v3.0.0 -X -b java_image_tag=11-jre-slim build + # Note: buildx, which does cross building, needs to do the push during build + # So there is no seperate push step with -X + EOF } @@ -268,7 +294,8 @@ RDOCKERFILE= NOCACHEARG= BUILD_PARAMS= SPARK_UID= -while getopts f:p:R:mr:t:nb:u: option +CROSS_BUILD="false" +while getopts f:p:R:mr:t:Xnb:u: option do case "${option}" in @@ -279,6 +306,7 @@ do t) TAG=${OPTARG};; n) NOCACHEARG="--no-cache";; b) BUILD_PARAMS=${BUILD_PARAMS}" --build-arg "${OPTARG};; + X) CROSS_BUILD=1;; m) if ! which minikube 1>/dev/null; then error "Cannot find minikube." diff --git a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js index ae02defd9bb9..fd4a48d2db33 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js +++ b/core/src/main/resources/org/apache/spark/ui/static/spark-dag-viz.js @@ -216,7 +216,7 @@ function renderDagVizForJob(svgContainer) { var dot = metadata.select(".dot-file").text(); var stageId = metadata.attr("stage-id"); var containerId = VizConstants.graphPrefix + stageId; - var isSkipped = metadata.attr("skipped") == "true"; + var isSkipped = metadata.attr("skipped") === "true"; var container; if (isSkipped) { container = svgContainer @@ -225,11 +225,8 @@ function renderDagVizForJob(svgContainer) { .attr("skipped", "true"); } else { // Link each graph to the corresponding stage page (TODO: handle stage attempts) - // Use the link from the stage table so it also works for the history server var attemptId = 0; - var stageLink = d3.select("#stage-" + stageId + "-" + attemptId) - .select("a.name-link") - .attr("href"); + var stageLink = uiRoot + appBasePath + "/stages/stage/?id=" + stageId + "&attempt=" + attemptId; container = svgContainer .append("a") .attr("xlink:href", stageLink) diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js index 0ba461f02317..4f8409ca2b7c 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/webui.js +++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js @@ -16,11 +16,16 @@ */ var uiRoot = ""; +var appBasePath = ""; function setUIRoot(val) { uiRoot = val; } +function setAppBasePath(path) { + appBasePath = path; +} + function collapseTablePageLoad(name, table){ if (window.localStorage.getItem(name) == "true") { // Set it to false so that the click function can revert it @@ -33,7 +38,7 @@ function collapseTable(thisName, table){ var status = window.localStorage.getItem(thisName) == "true"; status = !status; - var thisClass = '.' + thisName + var thisClass = '.' + thisName; // Expand the list of additional metrics. var tableDiv = $(thisClass).parent().find('.' + table); diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 90167858df66..087a22d6c614 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -292,6 +292,7 @@ private[spark] object UIUtils extends Logging { {commonHeaderNodes(request)} + {if (showVisualization) vizHeaderNodes(request) else Seq.empty} {if (useDataTables) dataTablesHeaderNodes(request) else Seq.empty} + sc.parallelize(1 to 100).map(v => (v, v)).repartition(10).reduceByKey(_ + _).collect + + eventually(timeout(10.seconds), interval(50.microseconds)) { + val pathWithPagedTable = + "/jobs/job/?id=0&completedStage.page=2&completedStage.sort=Stage+Id&" + + "completedStage.desc=true&completedStage.pageSize=1#completed" + goToUi(sc, pathWithPagedTable) + + // Open DAG Viz. + webDriver.findElement(By.id("job-dag-viz")).click() + val stages = webDriver.findElements(By.cssSelector("svg[class='job'] > a")) + stages.size() should be (3) + + stages.get(0).getAttribute("href") should include ("/stages/stage/?id=0&attempt=0") + stages.get(1).getAttribute("href") should include ("/stages/stage/?id=1&attempt=0") + stages.get(2).getAttribute("href") should include ("/stages/stage/?id=2&attempt=0") + } + } + } + /** * Create a test SparkContext with the SparkUI enabled. * It is safe to `get` the SparkUI directly from the SparkContext returned here. diff --git a/dev/create-release/do-release.sh b/dev/create-release/do-release.sh index 4f18a55a3bce..64fba8a56aff 100755 --- a/dev/create-release/do-release.sh +++ b/dev/create-release/do-release.sh @@ -17,6 +17,8 @@ # limitations under the License. # +set -e + SELF=$(cd $(dirname $0) && pwd) . "$SELF/release-util.sh" @@ -52,9 +54,6 @@ function should_build { if should_build "tag" && [ $SKIP_TAG = 0 ]; then run_silent "Creating release tag $RELEASE_TAG..." "tag.log" \ "$SELF/release-tag.sh" - echo "It may take some time for the tag to be synchronized to github." - echo "Press enter when you've verified that the new tag ($RELEASE_TAG) is available." - read else echo "Skipping tag creation for $RELEASE_TAG." fi @@ -79,3 +78,12 @@ if should_build "publish"; then else echo "Skipping publish step." fi + +if should_build "tag" && [ $SKIP_TAG = 0 ]; then + git push origin $RELEASE_TAG + if [[ $RELEASE_TAG != *"preview"* ]]; then + git push origin HEAD:$GIT_BRANCH + else + echo "It's preview release. We only push $RELEASE_TAG to remote." + fi +fi diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index e3bcb72ab5c6..66c51845cc1d 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -92,9 +92,12 @@ BASE_DIR=$(pwd) init_java init_maven_sbt -rm -rf spark -git clone "$ASF_REPO" +# Only clone repo fresh if not present, otherwise use checkout from the tag step +if [ ! -d spark ]; then + git clone "$ASF_REPO" +fi cd spark +git fetch git checkout $GIT_REF git_hash=`git rev-parse --short HEAD` echo "Checked out Spark git hash $git_hash" diff --git a/dev/create-release/release-tag.sh b/dev/create-release/release-tag.sh index 39856a995595..e37aa27fc0aa 100755 --- a/dev/create-release/release-tag.sh +++ b/dev/create-release/release-tag.sh @@ -25,6 +25,7 @@ function exit_with_usage { cat << EOF usage: $NAME Tags a Spark release on a particular branch. +You must push the tags after. Inputs are specified with the following environment variables: ASF_USERNAME - Apache Username @@ -105,19 +106,8 @@ sed -i".tmp7" 's/SPARK_VERSION_SHORT:.*$/SPARK_VERSION_SHORT: '"$R_NEXT_VERSION" git commit -a -m "Preparing development version $NEXT_VERSION" -if ! is_dry_run; then - # Push changes - git push origin $RELEASE_TAG - if [[ $RELEASE_VERSION != *"preview"* ]]; then - git push origin HEAD:$GIT_BRANCH - else - echo "It's preview release. We only push $RELEASE_TAG to remote." - fi - - cd .. - rm -rf spark -else - cd .. +cd .. +if is_dry_run; then mv spark spark.tag echo "Clone with version changes and tag available as spark.tag in the output directory." fi diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 index d0830a1ddd3d..0fd800558273 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-1.2 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-1.2 @@ -208,4 +208,4 @@ xmlenc/0.52//xmlenc-0.52.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper/3.4.14//zookeeper-3.4.14.jar -zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar +zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar diff --git a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 index 6d050d8a048f..e4df088e08b6 100644 --- a/dev/deps/spark-deps-hadoop-2.7-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-2.7-hive-2.3 @@ -222,4 +222,4 @@ xmlenc/0.52//xmlenc-0.52.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper/3.4.14//zookeeper-3.4.14.jar -zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar +zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar diff --git a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 index 6dab667522ec..7f3f74e3e039 100644 --- a/dev/deps/spark-deps-hadoop-3.2-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3.2-hive-2.3 @@ -236,4 +236,4 @@ xbean-asm7-shaded/4.15//xbean-asm7-shaded-4.15.jar xz/1.5//xz-1.5.jar zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar zookeeper/3.4.14//zookeeper-3.4.14.jar -zstd-jni/1.4.4-3//zstd-jni-1.4.4-3.jar +zstd-jni/1.4.5-2//zstd-jni-1.4.5-2.jar diff --git a/docs/monitoring.md b/docs/monitoring.md index 4da0f8e9d71e..32959b77c477 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -544,6 +544,24 @@ can be identified by their `[attempt-id]`. In the API listed below, when running /applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] Details of the given operation and given batch. + + /applications/[app-id]/sql + A list of all queries for a given application. +
+ ?details=[true (default) | false] lists/hides details of Spark plan nodes. +
+ ?planDescription=[true (default) | false] enables/disables Physical planDescription on demand when Physical Plan size is high. +
+ ?offset=[offset]&length=[len] lists queries in the given range. + + + /applications/[app-id]/sql/[execution-id] + Details for the given query. +
+ ?details=[true (default) | false] lists/hides metric details in addition to given query details. +
+ ?planDescription=[true (default) | false] enables/disables Physical planDescription on demand for the given query when Physical Plan size is high. + /applications/[app-id]/environment Environment details of the given application. diff --git a/docs/sql-ref-ansi-compliance.md b/docs/sql-ref-ansi-compliance.md index b62834ebe906..eab194c71ec7 100644 --- a/docs/sql-ref-ansi-compliance.md +++ b/docs/sql-ref-ansi-compliance.md @@ -95,7 +95,7 @@ CREATE TABLE t (v INT); -- `spark.sql.storeAssignmentPolicy=ANSI` INSERT INTO t VALUES ('1'); org.apache.spark.sql.AnalysisException: Cannot write incompatible data to table '`default`.`t`': -- Cannot safely cast 'v': StringType to IntegerType; +- Cannot safely cast 'v': string to int; -- `spark.sql.storeAssignmentPolicy=LEGACY` (This is a legacy behaviour until Spark 2.x) INSERT INTO t VALUES ('1'); diff --git a/pom.xml b/pom.xml index bdcb4fd5584f..b3f7b7db1a79 100644 --- a/pom.xml +++ b/pom.xml @@ -665,7 +665,7 @@ com.github.luben zstd-jni - 1.4.4-3 + 1.4.5-2 com.clearspring.analytics diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 65b902cf3c4d..03e3b9ca4bd0 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -276,6 +276,8 @@ def explain(self, extended=None, mode=None): """Prints the (logical and physical) plans to the console for debugging purpose. :param extended: boolean, default ``False``. If ``False``, prints only the physical plan. + When this is a string without specifying the ``mode``, it works as the mode is + specified. :param mode: specifies the expected output format of plans. * ``simple``: Print only a physical plan. @@ -306,12 +308,17 @@ def explain(self, extended=None, mode=None): Output [2]: [age#0, name#1] ... + >>> df.explain("cost") + == Optimized Logical Plan == + ...Statistics... + ... + .. versionchanged:: 3.0.0 Added optional argument `mode` to specify the expected output format of plans. """ if extended is not None and mode is not None: - raise Exception("extended and mode can not be specified simultaneously") + raise Exception("extended and mode should not be set together.") # For the no argument case: df.explain() is_no_argument = extended is None and mode is None @@ -319,18 +326,22 @@ def explain(self, extended=None, mode=None): # For the cases below: # explain(True) # explain(extended=False) - is_extended_case = extended is not None and isinstance(extended, bool) + is_extended_case = isinstance(extended, bool) and mode is None - # For the mode specified: df.explain(mode="formatted") - is_mode_case = mode is not None and isinstance(mode, basestring) + # For the case when extended is mode: + # df.explain("formatted") + is_extended_as_mode = isinstance(extended, basestring) and mode is None - if not is_no_argument and not (is_extended_case or is_mode_case): - if extended is not None: - err_msg = "extended (optional) should be provided as bool" \ - ", got {0}".format(type(extended)) - else: # For mode case - err_msg = "mode (optional) should be provided as str, got {0}".format(type(mode)) - raise TypeError(err_msg) + # For the mode specified: + # df.explain(mode="formatted") + is_mode_case = extended is None and isinstance(mode, basestring) + + if not (is_no_argument or is_extended_case or is_extended_as_mode or is_mode_case): + argtypes = [ + str(type(arg)) for arg in [extended, mode] if arg is not None] + raise TypeError( + "extended (optional) and mode (optional) should be a string " + "and bool; however, got [%s]." % ", ".join(argtypes)) # Sets an explain mode depending on a given argument if is_no_argument: @@ -339,6 +350,8 @@ def explain(self, extended=None, mode=None): explain_mode = "extended" if extended else "simple" elif is_mode_case: explain_mode = mode + elif is_extended_as_mode: + explain_mode = extended print(self._sc._jvm.PythonSQLUtils.explainString(self._jdf.queryExecution(), explain_mode)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala index d2daaac72fc8..6e850267100f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Sum.scala @@ -62,38 +62,74 @@ case class Sum(child: Expression) extends DeclarativeAggregate with ImplicitCast private lazy val sum = AttributeReference("sum", sumDataType)() + private lazy val isEmpty = AttributeReference("isEmpty", BooleanType, nullable = false)() + private lazy val zero = Literal.default(sumDataType) - override lazy val aggBufferAttributes = sum :: Nil + override lazy val aggBufferAttributes = resultType match { + case _: DecimalType => sum :: isEmpty :: Nil + case _ => sum :: Nil + } - override lazy val initialValues: Seq[Expression] = Seq( - /* sum = */ Literal.create(null, sumDataType) - ) + override lazy val initialValues: Seq[Expression] = resultType match { + case _: DecimalType => Seq(Literal(null, resultType), Literal(true, BooleanType)) + case _ => Seq(Literal(null, resultType)) + } override lazy val updateExpressions: Seq[Expression] = { if (child.nullable) { - Seq( - /* sum = */ - coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) - ) + val updateSumExpr = coalesce(coalesce(sum, zero) + child.cast(sumDataType), sum) + resultType match { + case _: DecimalType => + Seq(updateSumExpr, isEmpty && child.isNull) + case _ => Seq(updateSumExpr) + } } else { - Seq( - /* sum = */ - coalesce(sum, zero) + child.cast(sumDataType) - ) + val updateSumExpr = coalesce(sum, zero) + child.cast(sumDataType) + resultType match { + case _: DecimalType => + Seq(updateSumExpr, Literal(false, BooleanType)) + case _ => Seq(updateSumExpr) + } } } + /** + * For decimal type: + * If isEmpty is false and if sum is null, then it means we have had an overflow. + * + * update of the sum is as follows: + * Check if either portion of the left.sum or right.sum has overflowed + * If it has, then the sum value will remain null. + * If it did not have overflow, then add the sum.left and sum.right + * + * isEmpty: Set to false if either one of the left or right is set to false. This + * means we have seen atleast a value that was not null. + */ override lazy val mergeExpressions: Seq[Expression] = { - Seq( - /* sum = */ - coalesce(coalesce(sum.left, zero) + sum.right, sum.left) - ) + val mergeSumExpr = coalesce(coalesce(sum.left, zero) + sum.right, sum.left) + resultType match { + case _: DecimalType => + val inputOverflow = !isEmpty.right && sum.right.isNull + val bufferOverflow = !isEmpty.left && sum.left.isNull + Seq( + If(inputOverflow || bufferOverflow, Literal.create(null, sumDataType), mergeSumExpr), + isEmpty.left && isEmpty.right) + case _ => Seq(mergeSumExpr) + } } + /** + * If the isEmpty is true, then it means there were no values to begin with or all the values + * were null, so the result will be null. + * If the isEmpty is false, then if sum is null that means an overflow has happened. + * So now, if ansi is enabled, then throw exception, if not then return null. + * If sum is not null, then return the sum. + */ override lazy val evaluateExpression: Expression = resultType match { - case d: DecimalType => CheckOverflow(sum, d, !SQLConf.get.ansiEnabled) + case d: DecimalType => + If(isEmpty, Literal.create(null, sumDataType), + CheckOverflowInSum(sum, d, !SQLConf.get.ansiEnabled)) case _ => sum } - } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala index c2c70b2ab08e..7e4560ab8161 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/decimalExpressions.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, EmptyBlock, ExprCode} +import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -144,3 +145,54 @@ case class CheckOverflow( override def sql: String = child.sql } + +// A variant `CheckOverflow`, which treats null as overflow. This is necessary in `Sum`. +case class CheckOverflowInSum( + child: Expression, + dataType: DecimalType, + nullOnOverflow: Boolean) extends UnaryExpression { + + override def nullable: Boolean = true + + override def eval(input: InternalRow): Any = { + val value = child.eval(input) + if (value == null) { + if (nullOnOverflow) null else throw new ArithmeticException("Overflow in sum of decimals.") + } else { + value.asInstanceOf[Decimal].toPrecision( + dataType.precision, + dataType.scale, + Decimal.ROUND_HALF_UP, + nullOnOverflow) + } + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + val childGen = child.genCode(ctx) + val nullHandling = if (nullOnOverflow) { + "" + } else { + s""" + |throw new ArithmeticException("Overflow in sum of decimals."); + |""".stripMargin + } + val code = code""" + |${childGen.code} + |boolean ${ev.isNull} = ${childGen.isNull}; + |Decimal ${ev.value} = null; + |if (${childGen.isNull}) { + | $nullHandling + |} else { + | ${ev.value} = ${childGen.value}.toPrecision( + | ${dataType.precision}, ${dataType.scale}, Decimal.ROUND_HALF_UP(), $nullOnOverflow); + | ${ev.isNull} = ${ev.value} == null; + |} + |""".stripMargin + + ev.copy(code = code) + } + + override def toString: String = s"CheckOverflowInSum($child, $dataType, $nullOnOverflow)" + + override def sql: String = child.sql +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala index 06e1cdc27e7d..fe20e546f5d2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateFormatter.scala @@ -117,7 +117,13 @@ class LegacySimpleDateFormatter(pattern: String, locale: Locale) extends LegacyD object DateFormatter { import LegacyDateFormats._ - val defaultLocale: Locale = Locale.US + /** + * Before Spark 3.0, the first day-of-week is always Monday. Since Spark 3.0, it depends on the + * locale. + * We pick GB as the default locale instead of US, to be compatible with Spark 2.x, as US locale + * uses Sunday as the first day-of-week. See SPARK-31879. + */ + val defaultLocale: Locale = new Locale("en", "GB") val defaultPattern: String = "yyyy-MM-dd" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala index 3e302e217039..1f14c70164c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/TimestampFormatter.scala @@ -278,7 +278,13 @@ object LegacyDateFormats extends Enumeration { object TimestampFormatter { import LegacyDateFormats._ - val defaultLocale: Locale = Locale.US + /** + * Before Spark 3.0, the first day-of-week is always Monday. Since Spark 3.0, it depends on the + * locale. + * We pick GB as the default locale instead of US, to be compatible with Spark 2.x, as US locale + * uses Sunday as the first day-of-week. See SPARK-31879. + */ + val defaultLocale: Locale = new Locale("en", "GB") def defaultPattern(): String = s"${DateFormatter.defaultPattern} HH:mm:ss" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala index 7449a28e069d..fe8d7efc9dc1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala @@ -457,7 +457,7 @@ object DataType { case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == STRICT => if (!Cast.canUpCast(w, r)) { - addError(s"Cannot safely cast '$context': $w to $r") + addError(s"Cannot safely cast '$context': ${w.catalogString} to ${r.catalogString}") false } else { true @@ -467,7 +467,7 @@ object DataType { case (w: AtomicType, r: AtomicType) if storeAssignmentPolicy == ANSI => if (!Cast.canANSIStoreAssign(w, r)) { - addError(s"Cannot safely cast '$context': $w to $r") + addError(s"Cannot safely cast '$context': ${w.catalogString} to ${r.catalogString}") false } else { true @@ -477,7 +477,8 @@ object DataType { true case (w, r) => - addError(s"Cannot write '$context': $w is incompatible with $r") + addError(s"Cannot write '$context': " + + s"${w.catalogString} is incompatible with ${r.catalogString}") false } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala index c01dea96fe2d..e466d558db1e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/DataSourceV2AnalysisSuite.scala @@ -21,7 +21,7 @@ import java.net.URI import java.util.Locale import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog, SessionCatalog} -import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, AttributeReference, Cast, Expression, LessThanOrEqual, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, AnsiCast, AttributeReference, Cast, LessThanOrEqual, Literal} import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.StoreAssignmentPolicy @@ -143,7 +143,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byName: multiple field errors are reported") { @@ -160,7 +160,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", - "Cannot safely cast", "'x'", "DoubleType to FloatType", + "Cannot safely cast", "'x'", "double to float", "Cannot write nullable values to non-null column", "'x'", "Cannot find data for output column", "'y'")) } @@ -176,7 +176,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertNotResolved(parsedPlan) assertAnalysisError(parsedPlan, Seq( "Cannot write", "'table-name'", - "Cannot safely cast", "'x'", "'y'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "'y'", "double to float")) } test("byPosition: multiple field errors are reported") { @@ -194,7 +194,7 @@ abstract class DataSourceV2StrictAnalysisSuite extends DataSourceV2AnalysisBaseS assertAnalysisError(parsedPlan, Seq( "Cannot write incompatible data to table", "'table-name'", "Cannot write nullable values to non-null column", "'x'", - "Cannot safely cast", "'x'", "DoubleType to FloatType")) + "Cannot safely cast", "'x'", "double to float")) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala index c47332f5d9fc..1a262d646ca1 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeWriteCompatibilitySuite.scala @@ -80,7 +80,7 @@ class StrictDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBa test("Check NullType is incompatible with all other types") { allNonNullTypes.foreach { t => assertSingleError(NullType, t, "nulls", s"Should not allow writing None to type $t") { err => - assert(err.contains(s"incompatible with $t")) + assert(err.contains(s"incompatible with ${t.catalogString}")) } } } @@ -145,12 +145,12 @@ class ANSIDataTypeWriteCompatibilitySuite extends DataTypeWriteCompatibilityBase test("Conversions between timestamp and long are not allowed") { assertSingleError(LongType, TimestampType, "longToTimestamp", "Should not allow long to timestamp") { err => - assert(err.contains("Cannot safely cast 'longToTimestamp': LongType to TimestampType")) + assert(err.contains("Cannot safely cast 'longToTimestamp': bigint to timestamp")) } assertSingleError(TimestampType, LongType, "timestampToLong", "Should not allow timestamp to long") { err => - assert(err.contains("Cannot safely cast 'timestampToLong': TimestampType to LongType")) + assert(err.contains("Cannot safely cast 'timestampToLong': timestamp to bigint")) } } @@ -209,8 +209,8 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { s"Should not allow writing $w to $r because cast is not safe") { err => assert(err.contains("'t'"), "Should include the field name context") assert(err.contains("Cannot safely cast"), "Should identify unsafe cast") - assert(err.contains(s"$w"), "Should include write type") - assert(err.contains(s"$r"), "Should include read type") + assert(err.contains(s"${w.catalogString}"), "Should include write type") + assert(err.contains(s"${r.catalogString}"), "Should include read type") } } } @@ -413,7 +413,7 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { assertNumErrors(writeType, readType, "top", "Should catch 14 errors", 14) { errs => assert(errs(0).contains("'top.a.element'"), "Should identify bad type") assert(errs(0).contains("Cannot safely cast")) - assert(errs(0).contains("StringType to DoubleType")) + assert(errs(0).contains("string to double")) assert(errs(1).contains("'top.a'"), "Should identify bad type") assert(errs(1).contains("Cannot write nullable elements to array of non-nulls")) @@ -430,11 +430,11 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { assert(errs(5).contains("'top.m.key'"), "Should identify bad type") assert(errs(5).contains("Cannot safely cast")) - assert(errs(5).contains("StringType to LongType")) + assert(errs(5).contains("string to bigint")) assert(errs(6).contains("'top.m.value'"), "Should identify bad type") assert(errs(6).contains("Cannot safely cast")) - assert(errs(6).contains("BooleanType to FloatType")) + assert(errs(6).contains("boolean to float")) assert(errs(7).contains("'top.m'"), "Should identify bad type") assert(errs(7).contains("Cannot write nullable values to map of non-nulls")) @@ -452,7 +452,7 @@ abstract class DataTypeWriteCompatibilityBaseSuite extends SparkFunSuite { assert(errs(11).contains("'top.x'"), "Should identify bad type") assert(errs(11).contains("Cannot safely cast")) - assert(errs(11).contains("StringType to IntegerType")) + assert(errs(11).contains("string to int")) assert(errs(12).contains("'top'"), "Should identify bad type") assert(errs(12).contains("expected 'x', found 'y'"), "Should detect name mismatch") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 357820a9d63d..db587dd98685 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo import org.apache.spark.sql.internal.SQLConf @@ -56,6 +57,7 @@ private[execution] object SparkPlanInfo { case ReusedSubqueryExec(child) => child :: Nil case a: AdaptiveSparkPlanExec => a.executedPlan :: Nil case stage: QueryStageExec => stage.plan :: Nil + case inMemTab: InMemoryTableScanExec => inMemTab.relation.cachedPlan :: Nil case _ => plan.children ++ plan.subqueries } val metrics = plan.metrics.toSeq.map { case (key, metric) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 7900693a8482..491977c61d3c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.lang.{Boolean => JBoolean, Double => JDouble, Float => JFloat, Long => JLong} import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Timestamp} -import java.time.LocalDate +import java.time.{Instant, LocalDate} import java.util.Locale import scala.collection.JavaConverters.asScalaBufferConverter @@ -129,6 +129,11 @@ class ParquetFilters( case ld: LocalDate => DateTimeUtils.localDateToDays(ld) } + private def timestampToMicros(v: Any): JLong = v match { + case i: Instant => DateTimeUtils.instantToMicros(i) + case t: Timestamp => DateTimeUtils.fromJavaTimestamp(t) + } + private def decimalToInt32(decimal: JBigDecimal): Integer = decimal.unscaledValue().intValue() private def decimalToInt64(decimal: JBigDecimal): JLong = decimal.unscaledValue().longValue() @@ -149,8 +154,7 @@ class ParquetFilters( } private def timestampToMillis(v: Any): JLong = { - val timestamp = v.asInstanceOf[Timestamp] - val micros = DateTimeUtils.fromJavaTimestamp(timestamp) + val micros = timestampToMicros(v) val millis = DateTimeUtils.microsToMillis(micros) millis.asInstanceOf[JLong] } @@ -186,8 +190,7 @@ class ParquetFilters( case ParquetTimestampMicrosType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), - Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) - .asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMicros).orNull) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.eq( longColumn(n), @@ -237,8 +240,7 @@ class ParquetFilters( case ParquetTimestampMicrosType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), - Option(v).map(t => DateTimeUtils.fromJavaTimestamp(t.asInstanceOf[Timestamp]) - .asInstanceOf[JLong]).orNull) + Option(v).map(timestampToMicros).orNull) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.notEq( longColumn(n), @@ -280,9 +282,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.lt(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.lt( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.lt(longColumn(n), timestampToMillis(v)) @@ -319,9 +319,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.ltEq(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.ltEq( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.ltEq(longColumn(n), timestampToMillis(v)) @@ -358,9 +356,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gt(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gt( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gt(longColumn(n), timestampToMillis(v)) @@ -397,9 +393,7 @@ class ParquetFilters( (n: Array[String], v: Any) => FilterApi.gtEq(intColumn(n), dateToDays(v).asInstanceOf[Integer]) case ParquetTimestampMicrosType if pushDownTimestamp => - (n: Array[String], v: Any) => FilterApi.gtEq( - longColumn(n), - DateTimeUtils.fromJavaTimestamp(v.asInstanceOf[Timestamp]).asInstanceOf[JLong]) + (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMicros(v)) case ParquetTimestampMillisType if pushDownTimestamp => (n: Array[String], v: Any) => FilterApi.gtEq(longColumn(n), timestampToMillis(v)) @@ -475,7 +469,7 @@ class ParquetFilters( case ParquetDateType => value.isInstanceOf[Date] || value.isInstanceOf[LocalDate] case ParquetTimestampMicrosType | ParquetTimestampMillisType => - value.isInstanceOf[Timestamp] + value.isInstanceOf[Timestamp] || value.isInstanceOf[Instant] case ParquetSchemaType(DECIMAL, INT32, _, decimalMeta) => isDecimalMatched(value, decimalMeta) case ParquetSchemaType(DECIMAL, INT64, _, decimalMeta) => diff --git a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql index 9bd936f6f441..5636e0b67036 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/datetime.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/datetime.sql @@ -164,3 +164,7 @@ select from_csv('26/October/2015', 'date Date', map('dateFormat', 'dd/MMMMM/yyyy select from_unixtime(1, 'yyyyyyyyyyy-MM-dd'); select date_format(timestamp '2018-11-17 13:33:33', 'yyyyyyyyyy-MM-dd HH:mm:ss'); select date_format(date '2018-11-17', 'yyyyyyyyyyy-MM-dd'); + +-- SPARK-31879: the first day of week +select date_format('2020-01-01', 'YYYY-MM-dd uu'); +select date_format('2020-01-01', 'YYYY-MM-dd uuuu'); diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql index 087d7a5befd1..6e95aca7aff6 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part1.sql @@ -146,7 +146,7 @@ SELECT count(*) OVER (PARTITION BY four) FROM (SELECT * FROM tenk1 WHERE FALSE)s -- mixture of agg/wfunc in the same window -- SELECT sum(salary) OVER w, rank() OVER w FROM empsalary WINDOW w AS (PARTITION BY depname ORDER BY salary DESC); --- Cannot safely cast 'enroll_date': StringType to DateType; +-- Cannot safely cast 'enroll_date': string to date; -- SELECT empno, depname, salary, bonus, depadj, MIN(bonus) OVER (ORDER BY empno), MAX(depadj) OVER () FROM( -- SELECT *, -- CASE WHEN enroll_date < '2008-01-01' THEN 2008 - extract(year FROM enroll_date) END * 500 AS bonus, diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql index cd3b74b3aa03..f4b8454da0d8 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/window_part3.sql @@ -42,7 +42,7 @@ create table datetimes ( f_timestamp timestamp ) using parquet; --- Spark cannot safely cast StringType to TimestampType +-- Spark cannot safely cast string to timestamp -- [SPARK-29636] Spark can't parse '11:00 BST' or '2000-10-19 10:23:54+01' signatures to timestamp insert into datetimes values (1, timestamp '11:00', cast ('11:00 BST' as timestamp), cast ('1 year' as timestamp), cast ('2000-10-19 10:23:54+01' as timestamp), timestamp '2000-10-19 10:23:54'), diff --git a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out index ca04b008d653..3803460f3f08 100644 --- a/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/ansi/datetime.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 119 +-- Number of queries: 121 -- !query @@ -1025,3 +1025,19 @@ struct<> -- !query output org.apache.spark.SparkUpgradeException You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'yyyyyyyyyyy-MM-dd' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uu') +-- !query schema +struct +-- !query output +2020-01-01 03 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uuuu') +-- !query schema +struct +-- !query output +2020-01-01 Wednesday diff --git a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out index fe932d3a706a..99dd14d21e6f 100644 --- a/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/datetime-legacy.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 119 +-- Number of queries: 121 -- !query @@ -980,3 +980,19 @@ select date_format(date '2018-11-17', 'yyyyyyyyyyy-MM-dd') struct -- !query output 00000002018-11-17 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uu') +-- !query schema +struct +-- !query output +2020-01-01 03 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uuuu') +-- !query schema +struct +-- !query output +2020-01-01 0003 diff --git a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out index 06a41da2671e..c8c568c736d7 100755 --- a/sql/core/src/test/resources/sql-tests/results/datetime.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/datetime.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 119 +-- Number of queries: 121 -- !query @@ -997,3 +997,19 @@ struct<> -- !query output org.apache.spark.SparkUpgradeException You may get a different result due to the upgrading of Spark 3.0: Fail to recognize 'yyyyyyyyyyy-MM-dd' pattern in the DateTimeFormatter. 1) You can set spark.sql.legacy.timeParserPolicy to LEGACY to restore the behavior before Spark 3.0. 2) You can form a valid datetime pattern with the guide from https://spark.apache.org/docs/latest/sql-ref-datetime-pattern.html + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uu') +-- !query schema +struct +-- !query output +2020-01-01 03 + + +-- !query +select date_format('2020-01-01', 'YYYY-MM-dd uuuu') +-- !query schema +struct +-- !query output +2020-01-01 Wednesday diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 954a4bd9331e..8359dff674a8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -192,6 +192,28 @@ class DataFrameSuite extends QueryTest structDf.select(xxhash64($"a", $"record.*"))) } + private def assertDecimalSumOverflow( + df: DataFrame, ansiEnabled: Boolean, expectedAnswer: Row): Unit = { + if (!ansiEnabled) { + try { + checkAnswer(df, expectedAnswer) + } catch { + case e: SparkException if e.getCause.isInstanceOf[ArithmeticException] => + // This is an existing bug that we can write overflowed decimal to UnsafeRow but fail + // to read it. + assert(e.getCause.getMessage.contains("Decimal precision 39 exceeds max precision 38")) + } + } else { + val e = intercept[SparkException] { + df.collect + } + assert(e.getCause.isInstanceOf[ArithmeticException]) + assert(e.getCause.getMessage.contains("cannot be represented as Decimal") || + e.getCause.getMessage.contains("Overflow in sum of decimals") || + e.getCause.getMessage.contains("Decimal precision 39 exceeds max precision 38")) + } + } + test("SPARK-28224: Aggregate sum big decimal overflow") { val largeDecimals = spark.sparkContext.parallelize( DecimalData(BigDecimal("1"* 20 + ".123"), BigDecimal("1"* 20 + ".123")) :: @@ -200,14 +222,90 @@ class DataFrameSuite extends QueryTest Seq(true, false).foreach { ansiEnabled => withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) { val structDf = largeDecimals.select("a").agg(sum("a")) - if (!ansiEnabled) { - checkAnswer(structDf, Row(null)) - } else { - val e = intercept[SparkException] { - structDf.collect + assertDecimalSumOverflow(structDf, ansiEnabled, Row(null)) + } + } + } + + test("SPARK-28067: sum of null decimal values") { + Seq("true", "false").foreach { wholeStageEnabled => + withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) { + Seq("true", "false").foreach { ansiEnabled => + withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled)) { + val df = spark.range(1, 4, 1).select(expr(s"cast(null as decimal(38,18)) as d")) + checkAnswer(df.agg(sum($"d")), Row(null)) + } + } + } + } + } + + test("SPARK-28067: Aggregate sum should not return wrong results for decimal overflow") { + Seq("true", "false").foreach { wholeStageEnabled => + withSQLConf((SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, wholeStageEnabled)) { + Seq(true, false).foreach { ansiEnabled => + withSQLConf((SQLConf.ANSI_ENABLED.key, ansiEnabled.toString)) { + val df0 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + val df1 = Seq( + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + val df = df0.union(df1) + val df2 = df.withColumnRenamed("decNum", "decNum2"). + join(df, "intNum").agg(sum("decNum")) + + val expectedAnswer = Row(null) + assertDecimalSumOverflow(df2, ansiEnabled, expectedAnswer) + + val decStr = "1" + "0" * 19 + val d1 = spark.range(0, 12, 1, 1) + val d2 = d1.select(expr(s"cast('$decStr' as decimal (38, 18)) as d")).agg(sum($"d")) + assertDecimalSumOverflow(d2, ansiEnabled, expectedAnswer) + + val d3 = spark.range(0, 1, 1, 1).union(spark.range(0, 11, 1, 1)) + val d4 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as d")).agg(sum($"d")) + assertDecimalSumOverflow(d4, ansiEnabled, expectedAnswer) + + val d5 = d3.select(expr(s"cast('$decStr' as decimal (38, 18)) as d"), + lit(1).as("key")).groupBy("key").agg(sum($"d").alias("sumd")).select($"sumd") + assertDecimalSumOverflow(d5, ansiEnabled, expectedAnswer) + + val nullsDf = spark.range(1, 4, 1).select(expr(s"cast(null as decimal(38,18)) as d")) + + val largeDecimals = Seq(BigDecimal("1"* 20 + ".123"), BigDecimal("9"* 20 + ".123")). + toDF("d") + assertDecimalSumOverflow( + nullsDf.union(largeDecimals).agg(sum($"d")), ansiEnabled, expectedAnswer) + + val df3 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("50000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + + val df4 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 2)).toDF("decNum", "intNum") + + val df5 = Seq( + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("10000000000000000000"), 1), + (BigDecimal("20000000000000000000"), 2)).toDF("decNum", "intNum") + + val df6 = df3.union(df4).union(df5) + val df7 = df6.groupBy("intNum").agg(sum("decNum"), countDistinct("decNum")). + filter("intNum == 1") + assertDecimalSumOverflow(df7, ansiEnabled, Row(1, null, 2)) } - assert(e.getCause.getClass.equals(classOf[ArithmeticException])) - assert(e.getCause.getMessage.contains("cannot be represented as Decimal")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index c4cf5116c203..d20a07f420e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal => JBigDecimal} import java.nio.charset.StandardCharsets import java.sql.{Date, Timestamp} -import java.time.LocalDate +import java.time.{LocalDate, LocalDateTime, ZoneId} import org.apache.parquet.filter2.predicate.{FilterApi, FilterPredicate, Operators} import org.apache.parquet.filter2.predicate.FilterApi._ @@ -143,7 +143,10 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } - private def testTimestampPushdown(data: Seq[Timestamp]): Unit = { + private def testTimestampPushdown(data: Seq[String], java8Api: Boolean): Unit = { + implicit class StringToTs(s: String) { + def ts: Timestamp = Timestamp.valueOf(s) + } assert(data.size === 4) val ts1 = data.head val ts2 = data(1) @@ -151,7 +154,18 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared val ts4 = data(3) import testImplicits._ - withNestedDataFrame(data.map(i => Tuple1(i)).toDF()) { case (inputDF, colName, resultFun) => + val df = data.map(i => Tuple1(Timestamp.valueOf(i))).toDF() + withNestedDataFrame(df) { case (inputDF, colName, fun) => + def resultFun(tsStr: String): Any = { + val parsed = if (java8Api) { + LocalDateTime.parse(tsStr.replace(" ", "T")) + .atZone(ZoneId.systemDefault()) + .toInstant + } else { + Timestamp.valueOf(tsStr) + } + fun(parsed) + } withParquetDataFrame(inputDF) { implicit df => val tsAttr = df(colName).expr assert(df(colName).expr.dataType === TimestampType) @@ -160,26 +174,26 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared checkFilterPredicate(tsAttr.isNotNull, classOf[NotEq[_]], data.map(i => Row.apply(resultFun(i)))) - checkFilterPredicate(tsAttr === ts1, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr <=> ts1, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr =!= ts1, classOf[NotEq[_]], + checkFilterPredicate(tsAttr === ts1.ts, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr <=> ts1.ts, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr =!= ts1.ts, classOf[NotEq[_]], Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) - checkFilterPredicate(tsAttr < ts2, classOf[Lt[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr > ts1, classOf[Gt[_]], + checkFilterPredicate(tsAttr < ts2.ts, classOf[Lt[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr > ts1.ts, classOf[Gt[_]], Seq(ts2, ts3, ts4).map(i => Row.apply(resultFun(i)))) - checkFilterPredicate(tsAttr <= ts1, classOf[LtEq[_]], resultFun(ts1)) - checkFilterPredicate(tsAttr >= ts4, classOf[GtEq[_]], resultFun(ts4)) - - checkFilterPredicate(Literal(ts1) === tsAttr, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts1) <=> tsAttr, classOf[Eq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts2) > tsAttr, classOf[Lt[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts3) < tsAttr, classOf[Gt[_]], resultFun(ts4)) - checkFilterPredicate(Literal(ts1) >= tsAttr, classOf[LtEq[_]], resultFun(ts1)) - checkFilterPredicate(Literal(ts4) <= tsAttr, classOf[GtEq[_]], resultFun(ts4)) - - checkFilterPredicate(!(tsAttr < ts4), classOf[GtEq[_]], resultFun(ts4)) - checkFilterPredicate(tsAttr < ts2 || tsAttr > ts3, classOf[Operators.Or], + checkFilterPredicate(tsAttr <= ts1.ts, classOf[LtEq[_]], resultFun(ts1)) + checkFilterPredicate(tsAttr >= ts4.ts, classOf[GtEq[_]], resultFun(ts4)) + + checkFilterPredicate(Literal(ts1.ts) === tsAttr, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts1.ts) <=> tsAttr, classOf[Eq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts2.ts) > tsAttr, classOf[Lt[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts3.ts) < tsAttr, classOf[Gt[_]], resultFun(ts4)) + checkFilterPredicate(Literal(ts1.ts) >= tsAttr, classOf[LtEq[_]], resultFun(ts1)) + checkFilterPredicate(Literal(ts4.ts) <= tsAttr, classOf[GtEq[_]], resultFun(ts4)) + + checkFilterPredicate(!(tsAttr < ts4.ts), classOf[GtEq[_]], resultFun(ts4)) + checkFilterPredicate(tsAttr < ts2.ts || tsAttr > ts3.ts, classOf[Operators.Or], Seq(Row(resultFun(ts1)), Row(resultFun(ts4)))) } } @@ -588,36 +602,41 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } test("filter pushdown - timestamp") { - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS - val millisData = Seq( - Timestamp.valueOf("1000-06-14 08:28:53.123"), - Timestamp.valueOf("1582-06-15 08:28:53.001"), - Timestamp.valueOf("1900-06-16 08:28:53.0"), - Timestamp.valueOf("2018-06-17 08:28:53.999")) - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { - testTimestampPushdown(millisData) - } - - // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS - val microsData = Seq( - Timestamp.valueOf("1000-06-14 08:28:53.123456"), - Timestamp.valueOf("1582-06-15 08:28:53.123456"), - Timestamp.valueOf("1900-06-16 08:28:53.123456"), - Timestamp.valueOf("2018-06-17 08:28:53.123456")) - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { - testTimestampPushdown(microsData) - } - - // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> - ParquetOutputTimestampType.INT96.toString) { - import testImplicits._ - withParquetDataFrame(millisData.map(i => Tuple1(i)).toDF()) { implicit df => - val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) - assertResult(None) { - createParquetFilters(schema).createFilter(sources.IsNull("_1")) + Seq(true, false).foreach { java8Api => + withSQLConf(SQLConf.DATETIME_JAVA8API_ENABLED.key -> java8Api.toString) { + // spark.sql.parquet.outputTimestampType = TIMESTAMP_MILLIS + val millisData = Seq( + "1000-06-14 08:28:53.123", + "1582-06-15 08:28:53.001", + "1900-06-16 08:28:53.0", + "2018-06-17 08:28:53.999") + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> + ParquetOutputTimestampType.TIMESTAMP_MILLIS.toString) { + testTimestampPushdown(millisData, java8Api) + } + + // spark.sql.parquet.outputTimestampType = TIMESTAMP_MICROS + val microsData = Seq( + "1000-06-14 08:28:53.123456", + "1582-06-15 08:28:53.123456", + "1900-06-16 08:28:53.123456", + "2018-06-17 08:28:53.123456") + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> + ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) { + testTimestampPushdown(microsData, java8Api) + } + + // spark.sql.parquet.outputTimestampType = INT96 doesn't support pushdown + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> + ParquetOutputTimestampType.INT96.toString) { + import testImplicits._ + withParquetDataFrame( + millisData.map(i => Tuple1(Timestamp.valueOf(i))).toDF()) { implicit df => + val schema = new SparkToParquetSchemaConverter(conf).convert(df.schema) + assertResult(None) { + createParquetFilters(schema).createFilter(sources.IsNull("_1")) + } + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala new file mode 100644 index 000000000000..a702e00ff9f9 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SparkPlanInfoSuite.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.ui + +import org.apache.spark.sql.execution.SparkPlanInfo +import org.apache.spark.sql.test.SharedSparkSession + +class SparkPlanInfoSuite extends SharedSparkSession{ + + import testImplicits._ + + def vaidateSparkPlanInfo(sparkPlanInfo: SparkPlanInfo): Unit = { + sparkPlanInfo.nodeName match { + case "InMemoryTableScan" => assert(sparkPlanInfo.children.length == 1) + case _ => sparkPlanInfo.children.foreach(vaidateSparkPlanInfo) + } + } + + test("SparkPlanInfo creation from SparkPlan with InMemoryTableScan node") { + val dfWithCache = Seq( + (1, 1), + (2, 2) + ).toDF().filter("_1 > 1").cache().repartition(10) + + val planInfoResult = SparkPlanInfo.fromSparkPlan(dfWithCache.queryExecution.executedPlan) + + vaidateSparkPlanInfo(planInfoResult) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index 87a4d061b817..abd33ab8a8f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -623,12 +623,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { var msg = intercept[AnalysisException] { sql("insert into t select 1L, 2") }.getMessage - assert(msg.contains("Cannot safely cast 'i': LongType to IntegerType")) + assert(msg.contains("Cannot safely cast 'i': bigint to int")) msg = intercept[AnalysisException] { sql("insert into t select 1, 2.0") }.getMessage - assert(msg.contains("Cannot safely cast 'd': DecimalType(2,1) to DoubleType")) + assert(msg.contains("Cannot safely cast 'd': decimal(2,1) to double")) msg = intercept[AnalysisException] { sql("insert into t select 1, 2.0D, 3") @@ -660,18 +660,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { var msg = intercept[AnalysisException] { sql("insert into t values('a', 'b')") }.getMessage - assert(msg.contains("Cannot safely cast 'i': StringType to IntegerType") && - msg.contains("Cannot safely cast 'd': StringType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': string to int") && + msg.contains("Cannot safely cast 'd': string to double")) msg = intercept[AnalysisException] { sql("insert into t values(now(), now())") }.getMessage - assert(msg.contains("Cannot safely cast 'i': TimestampType to IntegerType") && - msg.contains("Cannot safely cast 'd': TimestampType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': timestamp to int") && + msg.contains("Cannot safely cast 'd': timestamp to double")) msg = intercept[AnalysisException] { sql("insert into t values(true, false)") }.getMessage - assert(msg.contains("Cannot safely cast 'i': BooleanType to IntegerType") && - msg.contains("Cannot safely cast 'd': BooleanType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': boolean to int") && + msg.contains("Cannot safely cast 'd': boolean to double")) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 9747840ce403..fe0a8439acc2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -333,7 +333,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with var msg = intercept[AnalysisException] { Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") }.getMessage - assert(msg.contains("Cannot safely cast 'i': LongType to IntegerType")) + assert(msg.contains("Cannot safely cast 'i': bigint to int")) // Insert into table successfully. Seq((1, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") @@ -354,14 +354,14 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with var msg = intercept[AnalysisException] { Seq(("a", "b")).toDF("i", "d").write.mode("append").saveAsTable("t") }.getMessage - assert(msg.contains("Cannot safely cast 'i': StringType to IntegerType") && - msg.contains("Cannot safely cast 'd': StringType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': string to int") && + msg.contains("Cannot safely cast 'd': string to double")) msg = intercept[AnalysisException] { Seq((true, false)).toDF("i", "d").write.mode("append").saveAsTable("t") }.getMessage - assert(msg.contains("Cannot safely cast 'i': BooleanType to IntegerType") && - msg.contains("Cannot safely cast 'd': BooleanType to DoubleType")) + assert(msg.contains("Cannot safely cast 'i': boolean to int") && + msg.contains("Cannot safely cast 'd': boolean to double")) } } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index d1dd13623650..8642a5ff1681 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -982,7 +982,7 @@ class VersionsSuite extends SparkFunSuite with Logging { """.stripMargin ) - val errorMsg = "Cannot safely cast 'f0': DecimalType(2,1) to BinaryType" + val errorMsg = "Cannot safely cast 'f0': decimal(2,1) to binary" if (isPartitioned) { val insertStmt = s"INSERT OVERWRITE TABLE $tableName partition (ds='a') SELECT 1.3"