Skip to content

Conversation

@rednaxelafx
Copy link
Contributor

What changes were proposed in this pull request?

Use Utils.getSimpleName to avoid hitting Malformed class name error in NewInstance.doGenCode.

Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger java.lang.Class.getSimpleName to throw an java.lang.InternalError: Malformed class name error.
In this particular case, creating an ExpressionEncoder on such a nested Scala class would create a NewInstance expression under the hood, which will trigger the problem during codegen.

Similar to #29050, we should use Spark's Utils.getSimpleName utility function in place of Class.getSimpleName to avoid hitting the issue.

There are two other occurrences of java.lang.Class.getSimpleName in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:

    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }

The Unsafe-* family of types are all Java types, so they're okay.

Does this PR introduce any user-facing change?

Fixes a bug that throws an error when using ExpressionEncoder on some nested Scala types, otherwise no changes.

How was this patch tested?

Added a test case to org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite. It'll fail on JDK8u before the fix, and pass after the fix.

@rednaxelafx rednaxelafx changed the title [SPARK-34596] Use Utils.getSimpleName to avoid hitting Malformed class name in NewInstance.doGenCode [SPARK-34596][SQL] Use Utils.getSimpleName to avoid hitting Malformed class name in NewInstance.doGenCode Mar 2, 2021
Copy link
Member

@maropu maropu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've checked that the added test fails on jdk8. LGTM if the running tests pass.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM (if CI passes).

@cloud-fan
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Test build #135671 has started for PR 31709 at commit d4e92aa.

@rednaxelafx
Copy link
Contributor Author

Looks like the AMPLab Jenkins is unstable, that there are test failures due to java.io.IOException: Backing channel 'amp-jenkins-staging-worker-02' is disconnected.

@github-actions github-actions bot added the SQL label Mar 2, 2021
@dongjoon-hyun
Copy link
Member

Ya, it looks like that, @rednaxelafx .

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Mar 2, 2021

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40257/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/40257/

@SparkQA
Copy link

SparkQA commented Mar 3, 2021

Test build #135675 has finished for PR 31709 at commit d4e92aa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

HyukjinKwon pushed a commit that referenced this pull request Mar 3, 2021
… class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to #29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes #31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811)
Signed-off-by: HyukjinKwon <[email protected]>
HyukjinKwon pushed a commit that referenced this pull request Mar 3, 2021
… class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to #29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes #31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811)
Signed-off-by: HyukjinKwon <[email protected]>
@HyukjinKwon
Copy link
Member

Merged to master, branch-3.1 and branch-3.0. @rednaxelafx should we backport in bracnh-2.4 too?

@rednaxelafx
Copy link
Contributor Author

Merged to master, branch-3.1 and branch-3.0. @rednaxelafx should we backport in bracnh-2.4 too?

Since Spark 2.4 line is already EOL, I think we're safe to skip backporting to 2.4?

@HyukjinKwon
Copy link
Member

I think it's not explicitly announced as an EOL though it will likely be EOL according to https://spark.apache.org/versioning-policy.html. Sure, so I think it's not to backport for now.

@dongjoon-hyun
Copy link
Member

Well, we had better try to EOL. :)

I think it's not explicitly announced as an EOL

Could you backport it to branch-2.4, @HyukjinKwon ? Recently, ZSTD library is also upgraded still.

@dongjoon-hyun
Copy link
Member

@rednaxelafx . Or, your patch is not applicable to branch-2.4?

@rednaxelafx
Copy link
Contributor Author

@dongjoon-hyun this bug does affect Spark 2.4 branch as well (in fact to previous version too). The same fix could be applied to 2.4.

@dongjoon-hyun
Copy link
Member

Thank you for confirmation. I'll try to cherry-pick and test then.

@dongjoon-hyun
Copy link
Member

The fix seems to be applicable, but the test seems to fail unfortunately.

$ git diff HEAD~1
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
index 3189e6841a..ba9e9ecebb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala
@@ -487,7 +487,7 @@ case class NewInstance(
     ev.isNull = resultIsNull

     val constructorCall = outer.map { gen =>
-      s"${gen.value}.new ${cls.getSimpleName}($argString)"
+      s"${gen.value}.new ${Utils.getSimpleName(cls)}($argString)"
     }.getOrElse {
       s"new $className($argString)"
     }
[info] - encode/decode for nested Scala class should work: MalformedNameExample(42) (codegen path) *** FAILED *** (34 milliseconds)
[info]   Exception thrown while decoding
[info]   Converted: [0,2a]
[info]   Schema: x#836
[info]   root
[info]   -- x: integer (nullable = false)
[info]
[info]
[info]   Encoder:
[info]   class[x[0]: int] (ExpressionEncoderSuite.scala:399)
[info]   org.scalatest.exceptions.TestFailedException:
[info]   at org.scalatest.Assertions$class.newAssertionFailedException(Assertions.scala:528)
[info]   at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)
[info]   at org.scalatest.Assertions$class.fail(Assertions.scala:1105)
[info]   at org.scalatest.FunSuite.fail(FunSuite.scala:1560)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$$anonfun$encodeDecodeTest$1.apply(ExpressionEncoderSuite.scala:399)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite.org$apache$spark$sql$catalyst$encoders$ExpressionEncoderSuite$$verifyNotLeakingReflectionObjects(ExpressionEncoderSuite.scala:477)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$$anonfun$testAndVerifyNotLeakingReflectionObjects$1.apply(ExpressionEncoderSuite.scala:484)

...

[info]   Cause: java.lang.RuntimeException: Error while decoding: java.util.concurrent.ExecutionException: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 32, Column 124: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 32, Column 124: "org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$MalformedClassObject$" has no member type "ExpressionEncoderSuite$MalformedClassObject$MalformedNameExample"
[info] newInstance(class org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$MalformedClassObject$MalformedNameExample)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.fromRow(ExpressionEncoder.scala:305)
[info]   at org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$$anonfun$encodeDecodeTest$1.apply(ExpressionEncoderSuite.scala:397)

@dongjoon-hyun
Copy link
Member

Could you please make a backporting PR to make it sure to pass CIs since Apache Spark 2.4 is not EOL officially and this patch is important?

@HyukjinKwon
Copy link
Member

Yeah, let's backport. Looks we'll have another maintenance release that's likely EOL.

@rednaxelafx
Copy link
Contributor Author

The test failure on Spark 2.4 may be due to a difference in either Scala version or Janino version in use vs Spark 3.0+.

@dongjoon-hyun
Copy link
Member

Ur, @rednaxelafx and @HyukjinKwon . The test fails even in branch-3.1.

dongjoon-hyun pushed a commit that referenced this pull request Mar 5, 2021
… class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to #29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes #31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811)
Signed-off-by: HyukjinKwon <[email protected]>
@rednaxelafx
Copy link
Contributor Author

So I did some investigation and found that:

  • For the test case I gave in this PR (emulating a kind of Scala class nesting scenario), before this PR it'll fail to codegen because of the "Malformed class name" issue. So that's something that never worked before. In practice, it isn't that hard to hit it, if you run Spark code through a Scala REPL, because the Scala REPL would use either nested objects or nested classes to implement the notion of multiple commands. (Well, there are some REPL-specific code paths in Spark that avoids hitting this problem in some cases though...e.g. this one )
  • After applying this PR, the use of Utils.getSimpleName actually returns a wrong name for generated-Java-code's compilation purposes. We do need a real "simple name", but when the Scala nested class is not generated by the Scala REPL, the one returned by Utils.getSimpleName only strips off the package name and maybe the trailing dollar-sign, but that's it. So in the case of the test case in this PR, we'd want the simple name to be MalformedNameExample, but we'd actually get back ExpressionEncoderSuite$MalformedClassObject$MalformedNameExample instead. That'll cause Janino compilation to fail.
  • Actually, what's more interesting is the different rules of Java's nested classes and Scala's. Since there is no such notion of a "companion object" or built-in "singleton object" notion on the Java language level, at least not in Java 8, the Java language syntax for creating a new instance of a inner class object doesn't work well for Scala's class Foo { object Bar { class Baz {...} } } kind of nesting, i.e. even when we do use the correct "simple name" (e.g. MalformedNameExample), there would still be no valid Java syntax to create it (i.e. outerObj.new InnerClass(...) doesn't work when outerObj is a Scala object)
  • Then comes what's interesting. In Spark's tests, some environment setup will cause the codegen fallback to be in no-fallback mode, meaning a failed compilation will cause test failure; in some other setup, the fallback to interpreted is allowed, so compilation failure does not cause test failure. The reason why my test case in this PR sometimes pass and sometimes fail is exactly that: the compilation actually always fail due to incorrect generated code, but in some test setup it's allowed to fallback so it doesn't fail the test, but in some other setup it's banned from falling back, thus failing the test.

At this point I think it's a bit moot to make the generated code actually work for the class Foo { object Bar { class Baz {...} } } pattern mentioned here, as there's no Java syntax available for it anyway. Maybe there are workarounds, but we'll have to investigate a bit more to find them.

The original intent of this PR was to make sure when using Dataset on this kind of Scala class nesting it can still run without user-visible failures. I think that intent is fulfilled with the fix in this PR when running in non-testing mode -- the codegen fallback to interpreter would step in and make it work, instead of getting a JDK java.lang.InternalError that Spark's codegen doesn't handle.

cc @retronym I wonder if we can get some help from some wonderful Scala folks ^_^ Thanks!

cc @maropu @viirya @kiszk @cloud-fan do @dongjoon-hyun @HyukjinKwon you have any idea what the environment setup differences are with running build/sbt catalyst/test vs build/sbt "catalyst/testOnly *ExpressionEncoderSuite"? On branch-3.1, the former shows the compilation error as test failure, whereas the latter doesn't show it but under the hood the same compilation error actually happened as well...


Some more background on Java's "object creation expression", aka the new expression.

Roughly speaking, there are two variants of the new expression in Java,

  1. Unqualified: that's the regular new ClassName(args) form. The way to create non-nested object instances in the normal case, but can also create nested object instances if the enclosing object is this.
  2. Qualified: the way to create nested objects. The syntax looks like outerObjRef.new InnerClassSimpleName(args). The outerObjRef here would be an expression that refers to the enclosing object of the inner object, and the InnerClassSimpleName would be a simple class name of the inner class. Note that both javac and Janino demands that this InnerClassSimpleName cannot have dots (.) in it, so you can't use the fully qualified class name in this place. The compiler will check if the InnerClassSimpleName is indeed a member class of the static type of the outerObjRef.

In Scala, when you declare:

object Foo {
  class Bar { }
}

then Foo's class name is Foo$, but Bar's class name isn't in the Java convention of outerClassName + '$' + innerClassSimpleName (which would have been Foo$$Bar), but instead it's just Foo$Bar.
There's quite a bit of information recorded in the Class file that at runtime, reflection is still able to recognize the enclosing/member relationships, e.g.

val fooDollarClazz = Foo.getClass
fooDollarClazz.getDeclaredClasses //=> Array[Class[_]] = Array(class Foo$Bar)

val barClazz = classOf[Foo.Bar]
barClazz.getEnclosingClass //=> Class[_] = class Foo$
barClazz.getDeclaringClass //=> Class[_] = class Foo$

When Janino tries to find a member type, unfortunately, it doesn't use runtime reflection to figure out the member type, but instead it just relies on the Java naming convention of outerClassName + '$' + innerClassSimpleName,
https://github.com/janino-compiler/janino/blob/8142ec43a3be288e214de30c9f57fe50e9047766/janino/src/main/java/org/codehaus/janino/IClass.java#L803-L876 , like this:

            String memberDescriptor = Descriptor.fromClassName(
                Descriptor.toClassName(this.getDescriptor())
                + '$'
                + name
            );

which will fail to find the Bar type in a fooObjRef.new Bar() expression.

Same deal with javac. Here's an example:
foo.scala

object Foo {
  class Bar {
  }
}

TestScalaNestedType.java

import java.lang.reflect.*;

public class TestScalaNestedType {
  public static void main(String[] args) throws Exception {
    Class<?> fooDollarClazz = Foo$.class; // Class.forName("Foo$");
    Field fooModuleField = fooDollarClazz.getDeclaredField("MODULE$");
    Foo$ fooObjRef = (Foo$) fooModuleField.get(null);
    // compiles fine up to the above

    fooObjRef.new Bar(); // fails compilation: error: cannot find symbol Bar in Foo$
  }
}

command line:

$ javac -g TestScalaNestedType.java 
TestScalaNestedType.java:10: error: cannot find symbol
    fooObjRef.new Bar();
                  ^
  symbol:   class Bar
  location: class Foo$
1 error

If we change the new expression to: fooObjRef.new Foo.Bar(), then the error becomes:

$ javac -g TestScalaNestedType.java 
TestScalaNestedType.java:10: error: '(' expected
    fooObjRef.new Foo.Bar();
                     ^
1 error

obviously the dot (.) isn't accepted there.

@cloud-fan
Copy link
Contributor

Maybe we set CODEGEN_FACTORY_MODE somewhere without setting it back

@rednaxelafx
Copy link
Contributor Author

Maybe we set CODEGEN_FACTORY_MODE somewhere without setting it back

That is my guess as well, thus my earlier guess of the test being somewhat context/order sensitive

@dongjoon-hyun
Copy link
Member

Ya. I agree with you the following.

I think that intent is fulfilled with the fix in this PR when running in non-testing mode -- the codegen fallback to interpreter would step in and make it work, instead of getting a JDK java.lang.InternalError that Spark's codegen doesn't handle.

And, in that case, we should wrap the unit test case with CODEGEN_FACTORY_MODE setting explicitly because it works only in the fallback status. So, are you going to proceed in revise the test case in that way, @rednaxelafx ?

@viirya
Copy link
Member

viirya commented Mar 6, 2021

👌 I reverted it from branch-3.1.

Do we? I still see the commit in branch-3.1.

@dongjoon-hyun
Copy link
Member

@viirya . It was merged and reverted once . And, when @maropu 's patch arrives, I merged back with @maropu 's patch together. The AS-IS status is broken due to CODEGEN_FACTORY_MODE issue.

@viirya
Copy link
Member

viirya commented Mar 6, 2021

@dongjoon-hyun I see. Thanks for clarifying the situation.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 6, 2021

Sorry for the inconvenience. I'm looking at this.

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Mar 6, 2021

Hi, All.
I created a WIP PR to put the newly added test cases into FALLBACK mode always.

According to @rednaxelafx 's comment, it should pass conceptually. However, for now, it is failing still. So, I made a WIP to share it first and get your opinions.

@kiszk
Copy link
Member

kiszk commented Mar 6, 2021

I would like to share the finding that the following commands can reproduce the error. I hope that it can help you.

% build/sbt "catalyst/testOnly *AnsiCastSuiteWithAnsiModeOn  *ExpressionEncoderSuite"

or

% build/sbt "catalyst/testOnly *AnsiCastSuiteWithAnsiModeOff  *ExpressionEncoderSuite"

@kiszk
Copy link
Member

kiszk commented Mar 6, 2021

I think that this is the almost minimum set, which can be easily specified, to reproduce the problem.

ANSI mode: cast string to timestamp with parse error and encode/decode for nested Scala class should work may have implicit dependency. I think that SPARK-27671: cast from nested null type in struct is not related to this reproduction.

% build/sbt "catalyst/testOnly  *AnsiCastSuiteWithAnsiModeOn *ExpressionEncoderSuite -- -z nested -z parse "
...
[info] AnsiCastSuiteWithAnsiModeOn:
[info] - SPARK-27671: cast from nested null type in struct (1 second, 407 milliseconds)
[info] - ANSI mode: cast string to timestamp with parse error (1 second, 309 milliseconds)
[info] ExpressionEncoderSuite:
[info] - encode/decode for nested Scala class should work: MalformedNameExample(42) (codegen path) *** FAILED *** (203 milliseconds)
[info]   Exception thrown while decoding
[info]   Converted: [0,2a]
[info]   Schema: x#18318
[info]   root
[info]   -- x: integer (nullable = false)
[info]   
...

@kiszk
Copy link
Member

kiszk commented Mar 6, 2021

In the above command, when the following line is replaced with {, we cannot see any failure

@dongjoon-hyun
Copy link
Member

Thank you, @kiszk !

@dongjoon-hyun
Copy link
Member

I updated my PR according to @kiszk 's comment.

@kiszk
Copy link
Member

kiszk commented Mar 6, 2021

After adding the attached code, build/sbt "catalyst/testOnly *AnsiCastSuiteWithAnsiModeOn *ExpressionEncoderSuite -- -z nested" can reproduce the same error.

--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
@@ -960,6 +960,19 @@ abstract class AnsiCastSuiteBase extends CastSuiteBase {
       "invalid input syntax for type numeric")
   }
 
+  test("DUMMY nested") {
+    val activeConf = conf
+    new ParVector(ALL_TIMEZONES.toVector).foreach { zid =>
+      SQLConf.withExistingConf(activeConf) {
+        withSQLConf(SQLConf.CODEGEN_FACTORY_MODE.key ->
+          org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode.CODEGEN_ONLY
+            .toString) {
+          val i = 1
+        }
+      }
+    }
+  }
+

@dongjoon-hyun
Copy link
Member

@kiszk The problematic part is val activeConf = conf and SQLConf.withExistingConf(activeConf), isn't it?

@kiszk
Copy link
Member

kiszk commented Mar 6, 2021

I think so in branch 3.1. Your change also works in the above case.

But, I am curious why the same code works at the master branch cc @rednaxelafx

@dongjoon-hyun
Copy link
Member

Got it, @kiszk ~

@dongjoon-hyun
Copy link
Member

I created a PR to master branch, too. And, I'll track the discussion and adjust according to the decision.

maropu pushed a commit to maropu/spark that referenced this pull request Mar 19, 2021
… class name in NewInstance.doGenCode

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to apache#29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes apache#31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811)
Signed-off-by: HyukjinKwon <[email protected]>
viirya pushed a commit that referenced this pull request Mar 24, 2021
…ormed class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

NOTE: branch-2.4 does not have the interpreted implementation of `SafeProjection`, so it does not fall back into the interpreted mode if the compilation fails. Therefore, the test in this PR just checks that the compilation error happens instead of checking that the interpreted mode works well.

This is the backport PR of #31709 and the credit should be rednaxelafx .

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to #29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes #31888 from maropu/SPARK-34596-BRANCH2.4.

Lead-authored-by: Takeshi Yamamuro <[email protected]>
Co-authored-by: Kris Mok <[email protected]>
Signed-off-by: Liang-Chi Hsieh <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
… class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to apache#29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes apache#31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811)
Signed-off-by: HyukjinKwon <[email protected]>
flyrain pushed a commit to flyrain/spark that referenced this pull request Sep 21, 2021
… class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to apache#29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes apache#31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811)
Signed-off-by: HyukjinKwon <[email protected]>
fishcus pushed a commit to fishcus/spark that referenced this pull request Jan 12, 2022
… class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to apache#29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes apache#31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811)
Signed-off-by: HyukjinKwon <[email protected]>
fishcus added a commit to Kyligence/spark that referenced this pull request Feb 16, 2022
* [SPARK-34361][K8S] In case of downscaling avoid killing of executors already known by the scheduler backend in the pod allocator

### What changes were proposed in this pull request?

This PR modifies the POD allocator to use the scheduler backend to get the known executors and remove those from the pending and newly created list.

This is different from the normal `ExecutorAllocationManager` requested killing of executors where the  `spark.dynamicAllocation.executorIdleTimeout` is used.
In this case POD allocator kills the executors which  should be only responsible for terminating not satisfied POD allocations (new requests where no POD state is received yet and PODs in pending state).

### Why are the changes needed?

Because there is race between executor POD allocator and cluster scheduler backend.
Running several experiment during downscaling we experienced a lot of killed fresh executors wich has already running task on them.

The pattern in the log was the following (see executor 312 and TID 2079):

```
21/02/01 15:12:03 INFO ExecutorMonitor: New executor 312 has registered (new total is 138)
...
21/02/01 15:12:03 INFO TaskSetManager: Starting task 247.0 in stage 4.0 (TID 2079, 100.100.18.138, executor 312, partition 247, PROCESS_LOCAL, 8777 bytes)
21/02/01 15:12:03 INFO ExecutorPodsAllocator: Deleting 3 excess pod requests (408,312,307).
...
21/02/01 15:12:04 ERROR TaskSchedulerImpl: Lost executor 312 on 100.100.18.138: The executor with id 312 was deleted by a user or the framework.
21/02/01 15:12:04 INFO TaskSetManager: Task 2079 failed because while it was being computed, its executor exited for a reason unrelated to the task. Not counting this failure towards the maximum number of failures for the task.
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

#### Manually

With this change there was no executor lost with running task on it.

##### With unit test

A new test is added and existing test is modified to check these cases.

Closes #31513 from attilapiros/SPARK-34361.

Authored-by: “attilapiros” <[email protected]>
Signed-off-by: Holden Karau <[email protected]>
(cherry picked from commit 6c5322de6176726955b4bc941f92ecaa54a7f539)
Signed-off-by: Holden Karau <[email protected]>

* [SPARK-34948][K8S] Add ownerReference to executor configmap to fix leakages

This PR aims to add `ownerReference` to the executor ConfigMap to fix leakage.

SPARK-30985 maintains the executor config map explicitly inside Spark. However, this config map can be leaked when Spark drivers die accidentally or are killed by K8s. We need to add `ownerReference` to make K8s do the garbage collection these automatically.

The number of ConfigMap is one of the resource quota. So, the leaked configMaps currently cause Spark jobs submission failures.

No.

Pass the CIs and check manually.

K8s IT is tested manually.
```
KubernetesSuite:
- Run SparkPi with no resources
- Run SparkPi with a very long application name.
- Use SparkLauncher.NO_RESOURCE
- Run SparkPi with a master URL without a scheme.
- Run SparkPi with an argument.
- Run SparkPi with custom labels, annotations, and environment variables.
- All pods have the same service account by default
- Run extraJVMOptions check on driver
- Run SparkRemoteFileTest using a remote data file
- Verify logging configuration is picked from the provided SPARK_CONF_DIR/log4j.properties
- Run SparkPi with env and mount secrets.
- Run PySpark on simple pi.py example
- Run PySpark to test a pyfiles example
- Run PySpark with memory customization
- Run in client mode.
- Start pod creation from template
- PVs with local storage
- Launcher client dependencies
- SPARK-33615: Launcher client archives
- SPARK-33748: Launcher python client respecting PYSPARK_PYTHON
- SPARK-33748: Launcher python client respecting spark.pyspark.python and spark.pyspark.driver.python
- Launcher python client dependencies using a zip file
- Test basic decommissioning
- Test basic decommissioning with shuffle cleanup
- Test decommissioning with dynamic allocation & shuffle cleanups
- Test decommissioning timeouts
- Run SparkR on simple dataframe.R example
Run completed in 19 minutes, 2 seconds.
Total number of tests run: 27
Suites: completed 2, aborted 0
Tests: succeeded 27, failed 0, canceled 0, ignored 0, pending 0
All tests passed.
```

**BEFORE**
```
$ k get cm spark-exec-450b417895b3b2c7-conf-map -oyaml | grep ownerReferences
```

**AFTER**
```
$ k get cm spark-exec-bb37a27895b1c26c-conf-map -oyaml | grep ownerReferences
        f:ownerReferences:
```

Closes #32042 from dongjoon-hyun/SPARK-34948.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a42dc93a2abf9490d68146b3586aec7fe2f9c102)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-35482][K8S] Use `spark.blockManager.port` not the wrong `spark.blockmanager.port` in BasicExecutorFeatureStep

### What changes were proposed in this pull request?

most spark conf keys are case sensitive, including `spark.blockManager.port`, we can not get the correct port number with `spark.blockmanager.port`.

This PR changes the wrong key to `spark.blockManager.port` in `BasicExecutorFeatureStep`.

This PR also ensures a fast fail when the port value is invalid for executor containers. When 0 is specified(it is valid as random port, but invalid as a k8s request), it should not be put in the `containerPort` field of executor pod desc. We do not expect executor pods to continuously fail to create because of invalid requests.

### Why are the changes needed?

bugfix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #32621 from yaooqinn/SPARK-35482.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit d957426351149dd1b4e1106d1230f395934f61d2)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-35493][K8S] make `spark.blockManager.port` fallback for `spark.driver.blockManager.port` as same as other cluster managers

### What changes were proposed in this pull request?

`spark.blockManager.port` does not work for k8s driver pods now, we should make it work as other cluster managers.

### Why are the changes needed?

`spark.blockManager.port` should be able to work for spark driver pod

### Does this PR introduce _any_ user-facing change?

yes, `spark.blockManager.port` will be respect iff it is present  && `spark.driver.blockManager.port` is absent

### How was this patch tested?

new tests

Closes #32639 from yaooqinn/SPARK-35493.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 96b0548ab6d5fe36833812f7b6424c984f75c6dd)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-32975][K8S] Add config for driver readiness timeout before executors start

### What changes were proposed in this pull request?
Add a new config that controls the timeout of waiting for driver pod's readiness before allocating executor pods. This wait only happens once on application start.

### Why are the changes needed?
The driver's headless service can be resolved by DNS only after the driver pod is ready. If the executor tries to connect to the headless service before driver pod is ready, it will hit UnkownHostException and get into error state but will not be restarted. **This case usually happens when the driver pod has sidecar containers but hasn't finished their creation when executors start.** So basically there is a race condition. This issue can be mitigated by tweaking this config.

### Does this PR introduce _any_ user-facing change?
A new config `spark.kubernetes.allocation.driver.readinessTimeout` added.

### How was this patch tested?
Exisiting tests.

Closes #32752 from cchriswu/SPARK-32975-fix.

Lead-authored-by: Chris Wu <[email protected]>
Co-authored-by: Chris Wu <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 497c80a1ad7fdd605b75c8a6601fce35c7449578)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-32975][K8S][FOLLOWUP] Avoid None.get exception

### What changes were proposed in this pull request?

A follow-up for SPARK-32975 to avoid unexpected the `None.get` exception

Run SparkPi with docker desktop, as podName is an option, we will got
```logtalk
21/06/09 01:09:12 ERROR Utils: Uncaught exception in thread main
java.util.NoSuchElementException: None.get
	at scala.None$.get(Option.scala:529)
	at scala.None$.get(Option.scala:527)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.$anonfun$start$1(ExecutorPodsAllocator.scala:110)
	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1417)
	at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.start(ExecutorPodsAllocator.scala:111)
	at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterSchedulerBackend.start(KubernetesClusterSchedulerBackend.scala:99)
	at org.apache.spark.scheduler.TaskSchedulerImpl.start(TaskSchedulerImpl.scala:220)
	at org.apache.spark.SparkContext.<init>(SparkContext.scala:581)
	at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2686)
	at org.apache.spark.sql.SparkSession$Builder.$anonfun$getOrCreate$2(SparkSession.scala:948)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:942)
	at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:30)
	at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:955)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1043)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1052)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
```

### Why are the changes needed?

fix a regression

### Does this PR introduce _any_ user-facing change?

no
### How was this patch tested?

Manual.

Closes #32830 from yaooqinn/SPARK-32975.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b4b78ce26567ce7ab83d47ce3b6af87c866bcacb)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [MINOR][K8S] Print the driver pod name instead of Some(name) if absent

Print the driver pod name instead of Some(name) if absent

fix error hint

no

new test

Closes #32889 from yaooqinn/minork8s.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 1125afd4622e6d3f7f14fca1ebcfebdfba6d9529)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-37049][K8S] executorIdleTimeout should check `creationTimestamp` instead of `startTime`

SPARK-33099 added the support to respect `spark.dynamicAllocation.executorIdleTimeout` in `ExecutorPodsAllocator`. However, when it checks if a pending executor pod is timed out, it checks against the pod's [startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667), see code [here](https://github.com/apache/spark/blob/c2ba498ff678ddda034cedf45cc17fbeefe922fd/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala#L459). A pending pod `startTime` is empty, and this causes the function `isExecutorIdleTimedOut()` always return true for pending pods.

This can be reproduced locally, run the following job

```
${SPARK_HOME}/bin/spark-submit --master k8s://http://localhost:8001 --deploy-mode cluster --name spark-group-example \
  --master k8s://http://localhost:8001 --deploy-mode cluster \
  --class org.apache.spark.examples.GroupByTest \
  --conf spark.executor.instances=1 \
  --conf spark.kubernetes.namespace=spark-test \
  --conf spark.kubernetes.executor.request.cores=1 \
  --conf spark.dynamicAllocation.enabled=true \
  --conf spark.shuffle.service.enabled=true \
  --conf spark.dynamicAllocation.shuffleTracking.enabled=true \
  --conf spark.shuffle.service.enabled=false \
  --conf spark.kubernetes.container.image=local/spark:3.3.0 \
  --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark \
  local:///opt/spark/examples/jars/spark-examples_2.12-3.3.0-SNAPSHOT.jar \
  1000 1000 100 1000
```

the local cluster doesn't have enough resources to run more than 4 executors, the rest of the executor pods will be pending. The job will have task backlogs and triggers to request more executors from K8s:

```
21/10/19 22:51:45 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 1 running: 0.
21/10/19 22:51:51 INFO ExecutorPodsAllocator: Going to request 1 executors from Kubernetes for ResourceProfile Id: 0, target: 2 running: 1.
21/10/19 22:51:52 INFO ExecutorPodsAllocator: Going to request 2 executors from Kubernetes for ResourceProfile Id: 0, target: 4 running: 2.
21/10/19 22:51:53 INFO ExecutorPodsAllocator: Going to request 4 executors from Kubernetes for ResourceProfile Id: 0, target: 8 running: 4.
...
21/10/19 22:52:14 INFO ExecutorPodsAllocator: Deleting 39 excess pod requests (23,59,32,41,50,68,35,44,17,8,53,62,26,71,11,56,29,38,47,20,65,5,14,46,64,73,55,49,40,67,58,13,22,31,7,16,52,70,43).
21/10/19 22:52:18 INFO ExecutorPodsAllocator: Deleting 28 excess pod requests (25,34,61,37,10,19,28,60,69,63,45,54,72,36,18,9,27,21,57,12,48,30,39,66,15,42,24,33).
```

At `22:51:45`, it starts to request executors; and at  `22:52:14` it starts to delete excess executor pods. This is 29s but spark.dynamicAllocation.executorIdleTimeout is set to 60s. The config was not honored.

### What changes were proposed in this pull request?
Change the check from using pod's `startTime` to `creationTimestamp`. [creationTimestamp](https://github.com/kubernetes/apimachinery/blob/e6c90c4366be1504309a6aafe0d816856450f36a/pkg/apis/meta/v1/types.go#L193-L201) is the timestamp when a pod gets created on K8s:

```
// CreationTimestamp is a timestamp representing the server time when this object was
// created. It is not guaranteed to be set in happens-before order across separate operations.
// Clients may not set this value. It is represented in RFC3339 form and is in UTC.
```

[startTime](https://github.com/kubernetes/api/blob/2a5dae08c42b1e8fdc1379432d8898efece65363/core/v1/types.go#L3664-L3667) is the timestamp when pod gets started:

```
// RFC 3339 date and time at which the object was acknowledged by the Kubelet.
// This is before the Kubelet pulled the container image(s) for the pod.
// +optional
```

a pending pod's startTime is empty. Here is a example of a pending pod:

```
NAMESPACE     NAME                                     READY   STATUS    RESTARTS   AGE
default       pending-pod-example                      0/1     Pending   0          2s

kubectl get pod pending-pod-example -o yaml | grep creationTimestamp
--->  creationTimestamp: "2021-10-19T16:17:52Z"

// pending pod has no startTime
kubectl get pod pending-pod-example -o yaml | grep startTime
---> // empty

// running pod has startTime set to the timestamp when the pod gets started
kubectl get pod coredns-558bd4d5db-6qrtx -n kube-system -o yaml | grep startTime
        f:startTime: {}
---> startTime: "2021-08-04T23:44:44Z"
```

### Why are the changes needed?
This fixed the issue that `spark.dynamicAllocation.executorIdleTimeout` currently is not honored by pending executor pods.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
The PR includes the UT changes, that has the testing coverage for this issue.

Closes #34319 from yangwwei/SPARK-37049.

Authored-by: Weiwei Yang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 041cd5d7d15ec4184ae51a8a10a26bef05bd261f)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-37497][K8S] Promote `ExecutorPods[PollingSnapshot|WatchSnapshot]Source` to DeveloperApi

### What changes were proposed in this pull request?

This PR aims to promote `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` as **stable** `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.3.0.

### Why are the changes needed?

- Since SPARK-24248 at Apache Spark 2.4.0, `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` have been used to monitor executor pods without any interface changes for over 3 years.

- Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. New `ExternalClusterManager` for K8s environment need to depend on this to monitor pods.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual review.

Closes #34751 from dongjoon-hyun/SPARK-37497.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 2b044962cd6eff5a3a76f2808ee93b40bdf931df)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-34949][CORE] Prevent BlockManager reregister when Executor is shutting down

### What changes were proposed in this pull request?

This PR prevents reregistering BlockManager when a Executor is shutting down. It is achieved by checking  `executorShutdown` before calling `env.blockManager.reregister()`.

### Why are the changes needed?

This change is required since Spark reports executors as active, even they are removed.
I was testing Dynamic Allocation on K8s with about 300 executors. While doing so, when the executors were torn down due to `spark.dynamicAllocation.executorIdleTimeout`, I noticed all the executor pods being removed from K8s, however, under the "Executors" tab in SparkUI, I could see some executors listed as alive.  [spark.sparkContext.statusTracker.getExecutorInfos.length](https://github.com/apache/spark/blob/65da9287bc5112564836a555cd2967fc6b05856f/core/src/main/scala/org/apache/spark/SparkStatusTracker.scala#L105) also returned a value greater than 1.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new test.

## Logs
Following are the logs of the executor(Id:303) which re-registers `BlockManager`
```
21/04/02 21:33:28 INFO CoarseGrainedExecutorBackend: Got assigned task 1076
21/04/02 21:33:28 INFO Executor: Running task 4.0 in stage 3.0 (TID 1076)
21/04/02 21:33:28 INFO MapOutputTrackerWorker: Updating epoch to 302 and clearing cache
21/04/02 21:33:28 INFO TorrentBroadcast: Started reading broadcast variable 3
21/04/02 21:33:28 INFO TransportClientFactory: Successfully created connection to /100.100.195.227:33703 after 76 ms (62 ms spent in bootstraps)
21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.4 KB, free 168.0 MB)
21/04/02 21:33:28 INFO TorrentBroadcast: Reading broadcast variable 3 took 168 ms
21/04/02 21:33:28 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 3.9 KB, free 168.0 MB)
21/04/02 21:33:29 INFO MapOutputTrackerWorker: Don't have map outputs for shuffle 1, fetching them
21/04/02 21:33:29 INFO MapOutputTrackerWorker: Doing the fetch; tracker endpoint = NettyRpcEndpointRef(spark://MapOutputTrackerda-lite-test-4-7a57e478947d206d-driver-svc.dex-app-n5ttnbmg.svc:7078)
21/04/02 21:33:29 INFO MapOutputTrackerWorker: Got the output locations
21/04/02 21:33:29 INFO ShuffleBlockFetcherIterator: Getting 2 non-empty blocks including 1 local blocks and 1 remote blocks
21/04/02 21:33:30 INFO TransportClientFactory: Successfully created connection to /100.100.80.103:40971 after 660 ms (528 ms spent in bootstraps)
21/04/02 21:33:30 INFO ShuffleBlockFetcherIterator: Started 1 remote fetches in 1042 ms
21/04/02 21:33:31 INFO Executor: Finished task 4.0 in stage 3.0 (TID 1076). 1276 bytes result sent to driver
.
.
.
21/04/02 21:34:16 INFO CoarseGrainedExecutorBackend: Driver commanded a shutdown
21/04/02 21:34:16 INFO Executor: Told to re-register on heartbeat
21/04/02 21:34:16 INFO BlockManager: BlockManager BlockManagerId(303, 100.100.122.34, 41265, None) re-registering with master
21/04/02 21:34:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(303, 100.100.122.34, 41265, None)
21/04/02 21:34:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(303, 100.100.122.34, 41265, None)
21/04/02 21:34:16 INFO BlockManager: Reporting 0 blocks to the master.
21/04/02 21:34:16 INFO MemoryStore: MemoryStore cleared
21/04/02 21:34:16 INFO BlockManager: BlockManager stopped
21/04/02 21:34:16 INFO FileDataSink: Closing sink with output file = /tmp/safari-events/.des_analysis/safari-events/hdp_spark_monitoring_random-container-037caf27-6c77-433f-820f-03cd9c7d9b6e-spark-8a492407d60b401bbf4309a14ea02ca2_events.tsv
21/04/02 21:34:16 INFO HonestProfilerBasedThreadSnapshotProvider: Stopping agent
21/04/02 21:34:16 INFO HonestProfilerHandler: Stopping honest profiler agent
21/04/02 21:34:17 INFO ShutdownHookManager: Shutdown hook called
21/04/02 21:34:17 INFO ShutdownHookManager: Deleting directory /var/data/spark-d886588c-2a7e-491d-bbcb-4f58b3e31001/spark-4aa337a0-60c0-45da-9562-8c50eaff3cea

```

Closes #32043 from sumeetgajjar/SPARK-34949.

Authored-by: Sumeet Gajjar <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit a9ca1978ae8ecc53e2ef9e14b4be70dc8f5d9341)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>

* [SPARK-34674][CORE][K8S] Close SparkContext after the Main method has finished

### What changes were proposed in this pull request?
Close SparkContext after the Main method has finished, to allow SparkApplication on K8S to complete.
This is fixed version of [merged and reverted PR](https://github.com/apache/spark/pull/32081).

### Why are the changes needed?
if I don't call the method sparkContext.stop() explicitly, then a Spark driver process doesn't terminate even after its Main method has been completed. This behaviour is different from spark on yarn, where the manual sparkContext stopping is not required. It looks like, the problem is in using non-daemon threads, which prevent the driver jvm process from terminating.
So I have inserted code that closes sparkContext automatically.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Manually on the production AWS EKS environment in my company.

Closes #32283 from kotlovs/close-spark-context-on-exit-2.

Authored-by: skotlov <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit b17a0e6931cac98cc839c047b1b5d4ea6d052009)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-36193][CORE] Recover SparkSubmit.runMain not to stop SparkContext in non-K8s env

### What changes were proposed in this pull request?

According to the discussion on https://github.com/apache/spark/pull/32283 , this PR aims to limit the feature of SPARK-34674 to K8s environment only.

### Why are the changes needed?

To reduce the behavior change in non-K8s environment.

### Does this PR introduce _any_ user-facing change?

The change behavior is consistent with 3.1.1 and older Spark releases.

### How was this patch tested?

N/A

Closes #33403 from dongjoon-hyun/SPARK-36193.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit fd3e9ce0b9ee09c7dce9f2e029fe96eac51eab96)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-35879][CORE][SHUFFLE] Fix performance regression caused by collectFetchRequests

### What changes were proposed in this pull request?

This PR fixes perf regression at the executor side when creating fetch requests with large initial partitions

![image](https://user-images.githubusercontent.com/8326978/123270865-dd21e800-d532-11eb-8447-ad80e47b034f.png)

In NetEase, we had an online job that took `45min` to "fetch" about 100MB of shuffle data, which actually turned out that it was just collecting fetch requests slowly. Normally, such a task should finish in seconds.

See the `DEBUG` log

```
21/06/22 11:52:26 DEBUG BlockManagerStorageEndpoint: Sent response: 0 to kyuubi.163.org:
21/06/22 11:53:05 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3941440 at BlockManagerId(12, .., 43559, None) with 19 blocks
21/06/22 11:53:44 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3716400 at BlockManagerId(20, .., 38287, None) with 18 blocks
21/06/22 11:54:41 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 4559280 at BlockManagerId(6, .., 39689, None) with 22 blocks
21/06/22 11:55:08 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 3120160 at BlockManagerId(33, .., 39449, None) with 15 blocks
```

I also create a test case locally with my local laptop docker env to give some reproducible cases.

```
bin/spark-sql --conf spark.kubernetes.file.upload.path=./ --master k8s://https://kubernetes.docker.internal:6443 --conf spark.kubernetes.container.image=yaooqinn/spark:v20210624-5 -c spark.kubernetes.context=docker-for-desktop_1 --num-executors 5 --driver-memory 5g --conf spark.kubernetes.executor.podNamePrefix=sparksql
```

```sql
 SET spark.sql.adaptive.enabled=true;
 SET spark.sql.shuffle.partitions=3000;
 SELECT /*+ REPARTITION */ 1 as pid, id from range(1, 1000000, 1, 500);
 SELECT /*+ REPARTITION(pid, id) */ 1 as pid, id from range(1, 1000000, 1, 500);
 ```

### Why are the changes needed?

fix perf regression which was introduced by SPARK-29292 (3ad4863673fc46080dda963be3055a3e554cfbc7) in v3.1.0.

3ad4863673fc46080dda963be3055a3e554cfbc7 is for support compilation with scala 2.13 but the performance losses is huge. We need to consider backporting this PR to branch 3.1.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

Mannully,

#### before
```log
 21/06/23 13:54:22 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
 21/06/23 13:54:38 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2314708 at BlockManagerId(2, 10.1.3.114, 36423, None) with 86 blocks
 21/06/23 13:54:59 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2636612 at BlockManagerId(3, 10.1.3.115, 34293, None) with 87 blocks
 21/06/23 13:55:18 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2508706 at BlockManagerId(4, 10.1.3.116, 41869, None) with 90 blocks
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 2350854 at BlockManagerId(5, 10.1.3.117, 45787, None) with 85 blocks
 21/06/23 13:55:34 INFO ShuffleBlockFetcherIterator: Getting 438 (11.8 MiB) non-empty blocks including 90 (2.5 MiB) local and 0 (0.0 B) host-local and 348 (9.4 MiB) remote blocks
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 87 blocks (2.5 MiB) from 10.1.3.115:34293
 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.115:34293 after 1 ms (0 ms spent in bootstraps)
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 90 blocks (2.4 MiB) from 10.1.3.116:41869
 21/06/23 13:55:34 INFO TransportClientFactory: Successfully created connection to /10.1.3.116:41869 after 2 ms (0 ms spent in bootstraps)
 21/06/23 13:55:34 DEBUG ShuffleBlockFetcherIterator: Sending request for 85 blocks (2.2 MiB) from 10.1.3.117:45787
 ```
```log
 21/06/23 14:00:45 INFO MapOutputTracker: Broadcast outputstatuses size = 411, actual size = 828997
 21/06/23 14:00:45 INFO MapOutputTrackerWorker: Got the map output locations
 21/06/23 14:00:45 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
 21/06/23 14:00:55 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1894389 at BlockManagerId(2, 10.1.3.114, 36423, None) with 99 blocks
 21/06/23 14:01:04 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1919993 at BlockManagerId(3, 10.1.3.115, 34293, None) with 100 blocks
 21/06/23 14:01:14 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1977186 at BlockManagerId(5, 10.1.3.117, 45787, None) with 103 blocks
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Creating fetch request of 1938336 at BlockManagerId(4, 10.1.3.116, 41869, None) with 101 blocks
 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Getting 500 (9.1 MiB) non-empty blocks including 97 (1820.3 KiB) local and 0 (0.0 B) host-local and 403 (7.4 MiB) remote blocks
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 101 blocks (1892.9 KiB) from 10.1.3.116:41869
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 103 blocks (1930.8 KiB) from 10.1.3.117:45787
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 99 blocks (1850.0 KiB) from 10.1.3.114:36423
 21/06/23 14:01:23 DEBUG ShuffleBlockFetcherIterator: Sending request for 100 blocks (1875.0 KiB) from 10.1.3.115:34293
 21/06/23 14:01:23 INFO ShuffleBlockFetcherIterator: Started 4 remote fetches in 37889 ms
 ```

#### After

```log
21/06/24 13:01:16 DEBUG ShuffleBlockFetcherIterator: maxBytesInFlight: 50331648, targetRemoteRequestSize: 10066329, maxBlocksInFlightPerAddress: 2147483647
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call blockInfos.map(_._2).sum: 40 ms
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_9_2990_2997/9: 0 ms
21/06/24 13:01:16 INFO ShuffleBlockFetcherIterator: ==> Call mergeFetchBlockInfo for shuffle_0_15_2395_2997/15: 0 ms
```

Closes #33063 from yaooqinn/SPARK-35879.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit 14d4decf736297e2bf4d824ccbd604c9da49ccf4)
Signed-off-by: Kent Yao <[email protected]>

* [SPARK-34473][SQL] Avoid NPE in DataFrameReader.schema(StructType)

### What changes were proposed in this pull request?

This fixes a regression in `DataFrameReader.schema(StructType)`, to avoid NPE if the given `StructType` is null. Note that, passing null to Spark public APIs leads to undefined behavior. There is no document mentioning the null behavior, and it's just an accident that `DataFrameReader.schema(StructType)` worked before. So I think this is not a 3.1 blocker.

### Why are the changes needed?

It fixes a 3.1 regression

### Does this PR introduce _any_ user-facing change?

yea, now `df.read.schema(null: StructType)` is a noop as before, while in the current branch-3.1 it throws NPE.

### How was this patch tested?

It's undefined behavior and is very obvious, so I didn't add a test. We should add tests when we clearly define and fix the null behavior for all public APIs.

Closes #31593 from cloud-fan/minor.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 02c784ca686fc675b63ce37f03215bc6c2fec869)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34490][SQL] Analysis should fail if the view refers a dropped table

When resolving a view, we use the captured view name in `AnalysisContext` to
distinguish whether a relation name is a view or a table. But if the resolution failed,
other rules (e.g. `ResolveTables`) will try to resolve the relation again but without
`AnalysisContext`. So, in this case, the resolution may be incorrect. For example,
if the view refers to a dropped table while a view with the same name exists, the
dropped table will be resolved as a view rather than an unresolved exception.

bugfix

no

newly added test cases

Closes #31606 from linhongliu-db/fix-temp-view-master.

Lead-authored-by: Linhong Liu <[email protected]>
Co-authored-by: Linhong Liu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit be675a052c38a36ce5e33ba56bdc69cc8972b3e8)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34515][SQL] Fix NPE if InSet contains null value during getPartitionsByFilter

### What changes were proposed in this pull request?

Skip null value during rewrite `InSet` to `>= and <=` at getPartitionsByFilter.

### Why are the changes needed?

Spark will convert `InSet` to `>= and <=` if it's values size over `spark.sql.hive.metastorePartitionPruningInSetThreshold` during pruning partition . At this case, if values contain a null, we will get such exception 
 
```
java.lang.NullPointerException
 at org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:1389)
 at org.apache.spark.unsafe.types.UTF8String.compareTo(UTF8String.java:50)
 at scala.math.LowPriorityOrderingImplicits$$anon$3.compare(Ordering.scala:153)
 at java.util.TimSort.countRunAndMakeAscending(TimSort.java:355)
 at java.util.TimSort.sort(TimSort.java:220)
 at java.util.Arrays.sort(Arrays.java:1438)
 at scala.collection.SeqLike.sorted(SeqLike.scala:659)
 at scala.collection.SeqLike.sorted$(SeqLike.scala:647)
 at scala.collection.AbstractSeq.sorted(Seq.scala:45)
 at org.apache.spark.sql.hive.client.Shim_v0_13.convert$1(HiveShim.scala:772)
 at org.apache.spark.sql.hive.client.Shim_v0_13.$anonfun$convertFilters$4(HiveShim.scala:826)
 at scala.collection.immutable.Stream.flatMap(Stream.scala:489)
 at org.apache.spark.sql.hive.client.Shim_v0_13.convertFilters(HiveShim.scala:826)
 at org.apache.spark.sql.hive.client.Shim_v0_13.getPartitionsByFilter(HiveShim.scala:848)
 at org.apache.spark.sql.hive.client.HiveClientImpl.$anonfun$getPartitionsByFilter$1(HiveClientImpl.scala:750)
```

### Does this PR introduce _any_ user-facing change?

Yes, bug fix.

### How was this patch tested?

Add test.

Closes #31632 from ulysses-you/SPARK-34515.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 999d3b89b6df14a5ccb94ffc2ffadb82964e9f7d)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34436][SQL] DPP support LIKE ANY/ALL expression

### What changes were proposed in this pull request?

This pr make DPP support LIKE ANY/ALL expression:
```sql
SELECT date_id, product_id FROM fact_sk f
JOIN dim_store s
ON f.store_id = s.store_id WHERE s.country LIKE ANY ('%D%E%', '%A%B%')
```

### Why are the changes needed?

Improve query performance.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #31563 from wangyum/SPARK-34436.

Lead-authored-by: Yuming Wang <[email protected]>
Co-authored-by: Yuming Wang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4a3200b08ac3e7733b5a3dc7271d35e6872c5967)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34550][SQL] Skip InSet null value during push filter to Hive metastore

### What changes were proposed in this pull request?

Skip `InSet` null value during push filter to Hive metastore.

### Why are the changes needed?

If `InSet` contains a null value, we should skip it and push other values to metastore. To keep same behavior with `In`.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add test.

Closes #31659 from ulysses-you/SPARK-34550.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 82267acfe8c78a70d56a6ae6ab9a1135c0dc0836)
Signed-off-by: HyukjinKwon <[email protected]>

* [SPARK-34556][SQL] Checking duplicate static partition columns should respect case sensitive conf

### What changes were proposed in this pull request?

This PR makes partition spec parsing respect case sensitive conf.

### Why are the changes needed?

When parsing the partition spec, Spark will call `org.apache.spark.sql.catalyst.parser.ParserUtils.checkDuplicateKeys` to check if there are duplicate partition column names in the list. But this method is always case sensitive and doesn't detect duplicate partition column names when using different cases.

### Does this PR introduce _any_ user-facing change?

Yep. This prevents users from writing incorrect queries such as `INSERT OVERWRITE t PARTITION (c='2', C='3') VALUES (1)` when they don't enable case sensitive conf.

### How was this patch tested?

The new added test will fail without this change.

Closes #31669 from zsxwing/SPARK-34556.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 62737e140c7b04805726a33c392c297335db7b45)
Signed-off-by: HyukjinKwon <[email protected]>

* [SPARK-34417][SQL] org.apache.spark.sql.DataFrameNaFunctions.fillMap fails for column name having a dot

**What changes were proposed in this pull request?**

This PR fixes dataframe.na.fillMap() for column having a dot in the name as mentioned in [SPARK-34417](https://issues.apache.org/jira/browse/SPARK-34417).

Use resolved attributes of a column for replacing null values.

**Why are the changes needed?**
dataframe.na.fillMap() does not work for column having a dot in the name

**Does this PR introduce any user-facing change?**
None

**How was this patch tested?**
Added unit test for the same

Closes #31545 from amandeep-sharma/master.

Lead-authored-by: Amandeep Sharma <[email protected]>
Co-authored-by: Amandeep Sharma <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4bda3c0f0225817456c4e423d4c85cc6b796f0c9)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34547][SQL] Only use metadata columns for resolution as last resort

### What changes were proposed in this pull request?

Today, child expressions may be resolved based on "real" or metadata output attributes. We should prefer the real attribute during resolution if one exists.

### Why are the changes needed?

Today, attempting to resolve an expression when there is a "real" output attribute and a metadata attribute with the same name results in resolution failure. This is likely unexpected, as the user may not know about the metadata attribute.

### Does this PR introduce _any_ user-facing change?

Yes. Previously, the user would see an error message when resolving a column with the same name as a "real" output attribute and a metadata attribute as below:
```
org.apache.spark.sql.AnalysisException: Reference 'index' is ambiguous, could be: testcat.ns1.ns2.tableTwo.index, testcat.ns1.ns2.tableOne.index.; line 1 pos 71
at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:363)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:107)
```

Now, resolution succeeds and provides the "real" output attribute.

### How was this patch tested?

Added a unit test.

Closes #31654 from karenfeng/fallback-resolve-metadata.

Authored-by: Karen Feng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 2e54d68eb94cf39b59166f2b1bbb8f6c317760b8)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34596][SQL] Use Utils.getSimpleName to avoid hitting Malformed class name in NewInstance.doGenCode

### What changes were proposed in this pull request?

Use `Utils.getSimpleName` to avoid hitting `Malformed class name` error in `NewInstance.doGenCode`.

### Why are the changes needed?

On older JDK versions (e.g. JDK8u), nested Scala classes may trigger `java.lang.Class.getSimpleName` to throw an `java.lang.InternalError: Malformed class name` error.
In this particular case, creating an `ExpressionEncoder` on such a nested Scala class would create a `NewInstance` expression under the hood, which will trigger the problem during codegen.

Similar to https://github.com/apache/spark/pull/29050, we should use  Spark's `Utils.getSimpleName` utility function in place of `Class.getSimpleName` to avoid hitting the issue.

There are two other occurrences of `java.lang.Class.getSimpleName` in the same file, but they're safe because they're only guaranteed to be only used on Java classes, which don't have this problem, e.g.:
```scala
    // Make a copy of the data if it's unsafe-backed
    def makeCopyIfInstanceOf(clazz: Class[_ <: Any], value: String) =
      s"$value instanceof ${clazz.getSimpleName}? ${value}.copy() : $value"
    val genFunctionValue: String = lambdaFunction.dataType match {
      case StructType(_) => makeCopyIfInstanceOf(classOf[UnsafeRow], genFunction.value)
      case ArrayType(_, _) => makeCopyIfInstanceOf(classOf[UnsafeArrayData], genFunction.value)
      case MapType(_, _, _) => makeCopyIfInstanceOf(classOf[UnsafeMapData], genFunction.value)
      case _ => genFunction.value
    }
```
The Unsafe-* family of types are all Java types, so they're okay.

### Does this PR introduce _any_ user-facing change?

Fixes a bug that throws an error when using `ExpressionEncoder` on some nested Scala types, otherwise no changes.

### How was this patch tested?

Added a test case to `org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite`. It'll fail on JDK8u before the fix, and pass after the fix.

Closes #31709 from rednaxelafx/spark-34596-master.

Authored-by: Kris Mok <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit ecf4811764f1ef91954c865a864e0bf6691f99a6)
Signed-off-by: HyukjinKwon <[email protected]>

* [SPARK-34534] Fix blockIds order when use FetchShuffleBlocks to fetch blocks

### What changes were proposed in this pull request?

Fix a problems which can lead to data correctness after part blocks retry in `OneForOneBlockFetcher` when use `FetchShuffleBlocks` .

### Why are the changes needed?
This is a data correctness bug, It's is no problems when use old protocol to send `OpenBlocks` before fetch chunks in `OneForOneBlockFetcher`;
In latest branch, `OpenBlocks`  has been replaced to `FetchShuffleBlocks`. Howerver, `FetchShuffleBlocks` read shuffle blocks order is not the same as `blockIds` in `OneForOneBlockFetcher`; the `blockIds` is used to match blockId with shuffle data with index, now it is out of order;
It will lead to read wrong block chunk when some blocks fetch failed in `OneForOneBlockFetcher`, it will retry the rest of the blocks in `blockIds`  based on the `blockIds`'s order.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Closes #31643 from seayoun/yuhaiyang_fix_use_FetchShuffleBlocks_order.

Lead-authored-by: yuhaiyang <[email protected]>
Co-authored-by: yuhaiyang <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 4e438196114eff2e1fc4dd726fdc1bda1af267da)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34555][SQL] Resolve metadata output from DataFrame

Add metadataOutput as a fallback to resolution.
Builds off https://github.com/apache/spark/pull/31654.

The metadata columns could not be resolved via `df.col("metadataColName")` from the DataFrame API.

Yes, the metadata columns can now be resolved as described above.

Scala unit test.

Closes #31668 from karenfeng/spark-34555.

Authored-by: Karen Feng <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit b01dd12805f7b40318f183ee48bc0012bb4e847f)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34584][SQL] Static partition should also follow StoreAssignmentPolicy when insert into v2 tables

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/27597 and simply apply the fix in the v2 table insertion code path.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

yes, now v2 table insertion with static partitions also follow StoreAssignmentPolicy.

### How was this patch tested?

moved the test from https://github.com/apache/spark/pull/27597 to the general test suite `SQLInsertTestSuite`, which covers DS v2, file source, and hive tables.

Closes #31726 from cloud-fan/insert.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit 8f1eec4d138da604b890111d0a6daaef86d44ef2)
Signed-off-by: HyukjinKwon <[email protected]>

* [SPARK-34599][SQL] Fix the issue that INSERT INTO OVERWRITE doesn't support partition columns containing dot for DSv2

### What changes were proposed in this pull request?

`ResolveInsertInto.staticDeleteExpression` should use `UnresolvedAttribute.quoted` to create the delete expression so that we will treat the entire `attr.name` as a column name.

### Why are the changes needed?

When users use `dot` in a partition column name, queries like ```INSERT OVERWRITE $t1 PARTITION (`a.b` = 'a') (`c.d`) VALUES('b')``` is not working.

### Does this PR introduce _any_ user-facing change?

Without this test, the above query will throw
```
[info]   org.apache.spark.sql.AnalysisException: cannot resolve '`a.b`' given input columns: [a.b, c.d];
[info] 'OverwriteByExpression RelationV2[a.b#17, c.d#18] default.tbl, ('a.b <=> cast(a as string)), false
[info] +- Project [a.b#19, ansi_cast(col1#16 as string) AS c.d#20]
[info]    +- Project [cast(a as string) AS a.b#19, col1#16]
[info]       +- LocalRelation [col1#16]
```

With the fix, the query will run correctly.

### How was this patch tested?

The new added test.

Closes #31713 from zsxwing/SPARK-34599.

Authored-by: Shixiong Zhu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 53e4dba7c489ac5c0ad61f0121c4e247de5b485c)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34567][SQL] CreateTableAsSelect should update metrics too

### What changes were proposed in this pull request?
For command `CreateTableAsSelect` we use `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand` to insert data.
We will update metrics of  `InsertIntoHiveTable`, `InsertIntoHadoopFsRelationCommand`  in `FileFormatWriter.write()`, but we only show CreateTableAsSelectCommand in WebUI SQL Tab.
We need to update `CreateTableAsSelectCommand`'s metrics too.

Before this PR:
![image](https://user-images.githubusercontent.com/46485123/109411226-81f44480-79db-11eb-99cb-b9686b15bf61.png)

After this PR:
![image](https://user-images.githubusercontent.com/46485123/109411232-8ae51600-79db-11eb-9111-3bea0bc2d475.png)

![image](https://user-images.githubusercontent.com/46485123/109905192-62aa2f80-7cd9-11eb-91f9-04b16c9238ae.png)

### Why are the changes needed?
Complete SQL Metrics

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
<!--
MT

Closes #31679 from AngersZhuuuu/SPARK-34567.

Authored-by: Angerszhuuuu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 401e270c179021dd7bfd136b143ccc6d01c04755)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34482][SS] Correct the active SparkSession for StreamExecution.logicalPlan

### What changes were proposed in this pull request?

Set the active SparkSession to `sparkSessionForStream` and diable AQE & CBO before initializing the `StreamExecution.logicalPlan`.

### Why are the changes needed?

The active session should be `sparkSessionForStream`. Otherwise, settings like

https://github.com/apache/spark/blob/6b34745cb9b294c91cd126c2ea44c039ee83cb84/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala#L332-L335

wouldn't take effect if callers access them from the active SQLConf, e.g., the rule of `InsertAdaptiveSparkPlan`. Besides, unlike `InsertAdaptiveSparkPlan` (which skips streaming plan), `CostBasedJoinReorder` seems to have the chance to take effect theoretically.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Tested manually. Before the fix, `InsertAdaptiveSparkPlan` would try to apply AQE on the plan(wouldn't take effect though). After this fix, the rule returns directly.

Closes #31600 from Ngone51/active-session-for-stream.

Authored-by: yi.wu <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e7e016192f882cfb430d706c2099e58e1bcc014c)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-32924][WEBUI] Make duration column in master UI sorted in the correct order

### What changes were proposed in this pull request?

Make the "duration" column in standalone mode master UI sorted by numeric duration, hence the column can be sorted by the correct order.

Before changes:
![image](https://user-images.githubusercontent.com/26694233/110025426-f5a49300-7cf4-11eb-86f0-2febade86be9.png)

After changes:
![image](https://user-images.githubusercontent.com/26694233/110025604-33092080-7cf5-11eb-8b34-215688faf56d.png)

### Why are the changes needed?

Fix a UI bug to make the sorting consistent across different pages.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Ran several apps with different durations and verified the duration column on the master page can be sorted correctly.

Closes #31743 from baohe-zhang/SPARK-32924.

Authored-by: Baohe Zhang <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 9ac5ee2e17ca491eabf2e6e7d33ce7cfb5a002a7)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-34613][SQL] Fix view does not capture disable hint config

### What changes were proposed in this pull request?

Add allow list to capture sql config for view.

### Why are the changes needed?

Spark use origin text sql to store view then capture and store sql config into view metadata.

Capture config will skip some config with some prefix, e.g. `spark.sql.optimizer.` but unfortunately `spark.sql.optimizer.disableHints` is start with `spark.sql.optimizer.`.

We need a allow list to help capture the config.

### Does this PR introduce _any_ user-facing change?

Yes bug fix.

### How was this patch tested?

Add test.

Closes #31732 from ulysses-you/SPARK-34613.

Authored-by: ulysses-you <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit 43aacd5069294e1215e86cd43bd0810bda998be2)
Signed-off-by: Wenchen Fan <[email protected]>

* [SPARK-34607][SQL][3.1] Add `Utils.isMemberClass` to fix a malformed class name error on jdk8u

This PR intends to fix a bug of `objects.NewInstance` if a user runs Spark on jdk8u and a given `cls` in `NewInstance` is a deeply-nested inner class, e.g.,.
```
  object OuterLevelWithVeryVeryVeryLongClassName1 {
    object OuterLevelWithVeryVeryVeryLongClassName2 {
      object OuterLevelWithVeryVeryVeryLongClassName3 {
        object OuterLevelWithVeryVeryVeryLongClassName4 {
          object OuterLevelWithVeryVeryVeryLongClassName5 {
            object OuterLevelWithVeryVeryVeryLongClassName6 {
              object OuterLevelWithVeryVeryVeryLongClassName7 {
                object OuterLevelWithVeryVeryVeryLongClassName8 {
                  object OuterLevelWithVeryVeryVeryLongClassName9 {
                    object OuterLevelWithVeryVeryVeryLongClassName10 {
                      object OuterLevelWithVeryVeryVeryLongClassName11 {
                        object OuterLevelWithVeryVeryVeryLongClassName12 {
                          object OuterLevelWithVeryVeryVeryLongClassName13 {
                            object OuterLevelWithVeryVeryVeryLongClassName14 {
                              object OuterLevelWithVeryVeryVeryLongClassName15 {
                                object OuterLevelWithVeryVeryVeryLongClassName16 {
                                  object OuterLevelWithVeryVeryVeryLongClassName17 {
                                    object OuterLevelWithVeryVeryVeryLongClassName18 {
                                      object OuterLevelWithVeryVeryVeryLongClassName19 {
                                        object OuterLevelWithVeryVeryVeryLongClassName20 {
                                          case class MalformedNameExample2(x: Int)
                                        }}}}}}}}}}}}}}}}}}}}
```

The root cause that Kris (@rednaxelafx) investigated is as follows (Kudos to Kris);

The reason why the test case above is so convoluted is in the way Scala generates the class name for nested classes. In general, Scala generates a class name for a nested class by inserting the dollar-sign ( `$` ) in between each level of class nesting. The problem is that this format can concatenate into a very long string that goes beyond certain limits, so Scala will change the class name format beyond certain length threshold.

For the example above, we can see that the first two levels of class nesting have class names that look like this:
```
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassName1$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassName1$OuterLevelWithVeryVeryVeryLongClassName2$
```
If we leave out the fact that Scala uses a dollar-sign ( `$` ) suffix for the class name of the companion object, `OuterLevelWithVeryVeryVeryLongClassName1`'s full name is a prefix (substring) of `OuterLevelWithVeryVeryVeryLongClassName2`.

But if we keep going deeper into the levels of nesting, you'll find names that look like:
```
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$2a1321b953c615695d7442b2adb1$$$$ryVeryLongClassName8$OuterLevelWithVeryVeryVeryLongClassName9$OuterLevelWithVeryVeryVeryLongClassName10$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$2a1321b953c615695d7442b2adb1$$$$ryVeryLongClassName8$OuterLevelWithVeryVeryVeryLongClassName9$OuterLevelWithVeryVeryVeryLongClassName10$OuterLevelWithVeryVeryVeryLongClassName11$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$85f068777e7ecf112afcbe997d461b$$$$VeryLongClassName11$OuterLevelWithVeryVeryVeryLongClassName12$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$85f068777e7ecf112afcbe997d461b$$$$VeryLongClassName11$OuterLevelWithVeryVeryVeryLongClassName12$OuterLevelWithVeryVeryVeryLongClassName13$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$85f068777e7ecf112afcbe997d461b$$$$VeryLongClassName11$OuterLevelWithVeryVeryVeryLongClassName12$OuterLevelWithVeryVeryVeryLongClassName13$OuterLevelWithVeryVeryVeryLongClassName14$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$5f7ad51804cb1be53938ea804699fa$$$$VeryLongClassName14$OuterLevelWithVeryVeryVeryLongClassName15$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$5f7ad51804cb1be53938ea804699fa$$$$VeryLongClassName14$OuterLevelWithVeryVeryVeryLongClassName15$OuterLevelWithVeryVeryVeryLongClassName16$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$5f7ad51804cb1be53938ea804699fa$$$$VeryLongClassName14$OuterLevelWithVeryVeryVeryLongClassName15$OuterLevelWithVeryVeryVeryLongClassName16$OuterLevelWithVeryVeryVeryLongClassName17$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$69b54f16b1965a31e88968df1a58d8$$$$VeryLongClassName17$OuterLevelWithVeryVeryVeryLongClassName18$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$69b54f16b1965a31e88968df1a58d8$$$$VeryLongClassName17$OuterLevelWithVeryVeryVeryLongClassName18$OuterLevelWithVeryVeryVeryLongClassName19$
org.apache.spark.sql.catalyst.encoders.ExpressionEncoderSuite$OuterLevelWithVeryVeryVeryLongClassNam$$$$69b54f16b1965a31e88968df1a58d8$$$$VeryLongClassName17$OuterLevelWithVeryVeryVeryLongClassName18$OuterLevelWithVeryVeryVeryLongClassName19$OuterLevelWithVeryVeryVeryLongClassName20$
```
with a hash code in the middle and various levels of nesting omitted.

The `java.lang.Class.isMemberClass` method is implemented in JDK8u as:
http://hg.openjdk.java.net/jdk8u/jdk8u/jdk/file/tip/src/share/classes/java/lang/Class.java#l1425
```
    /**
     * Returns {@code true} if and only if the underlying class
     * is a member class.
     *
     * @return {@code true} if and only if this class is a member class.
     * @since 1.5
     */
    public boolean isMemberClass() {
        return getSimpleBinaryName() != null && !isLocalOrAnonymousClass();
    }

    /**
     * Returns the "simple binary name" of the underlying class, i.e.,
     * the binary name without the leading enclosing class name.
     * Returns {@code null} if the underlying class is a top level
     * class.
     */
    private String getSimpleBinaryName() {
        Class<?> enclosingClass = getEnclosingClass();
        if (enclosingClass == null) // top level class
            return null;
        // Otherwise, strip the enclosing class' name
        try {
            return getName().substring(enclosingClass.getName().length());
        } catch (IndexOutOfBoundsException ex) {
            throw new InternalError("Malformed class name", ex);
        }
    }
```
and the problematic code is `getName().substring(enclosingClass.getName().length())` -- if a class's enclosing class's full name is *longer* than the nested class's full name, this logic would end up going out of bounds.

The bug has been fixed in JDK9 by https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8057919 , but still exists in the latest JDK8u release. So from the Spark side we'd need to do something to avoid hitting this problem.

This is the backport of #31733.

Bugfix on jdk8u.

No.

Added tests.

* [SPARK-34681][SQL] Fix bug for full outer shuffled hash join when building left side with non-equal condition

### What changes were proposed in this pull request?

For full outer shuffled hash join with building hash map on left side, and having non-equal condition, the join can produce wrong result.

The root cause is `boundCondition` in `HashJoin.scala` always assumes the left side row is `streamedPlan` and right side row is `buildPlan` ([streamedPlan.output ++ buildPlan.output](https://github.com/apache/spark/blob/branch-3.1/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala#L141)). This is valid assumption, except for full outer + build left case.

The fix is to correct `boundCondition` in `HashJoin.scala` to handle full outer + build left case properly. See reproduce in https://issues.apache.org/jira/browse/SPARK-32399?focusedCommentId=17298414&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17298414 .

### Why are the changes needed?

Fix data correctness bug.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Changed the test in `OuterJoinSuite.scala` to cover full outer shuffled hash join.
Before this change, the unit test `basic full outer join using ShuffledHashJoin` in `OuterJoinSuite.scala` is failed.

Closes #31792 from c21/join-bugfix.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit a916690dd9aac40df38922dbea233785354a2f2a)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-34682][SQL] Fix regression in canonicalization error check in CustomShuffleReaderExec

### What changes were proposed in this pull request?
There is a regression in 3.1.1 compared to 3.0.2 when checking for a canonicalized plan when executing CustomShuffleReaderExec.

The regression was caused by the call to `sendDriverMetrics` which happens before the check and will always fail if the plan is canonicalized.

### Why are the changes needed?
This is a regression in a useful error check.

### Does this PR introduce _any_ user-facing change?
No. This is not an error that a user would typically see, as far as I know.

### How was this patch tested?
I tested this change locally by making a distribution from this PR branch. Before fixing the regression I saw:

```
java.util.NoSuchElementException: key not found: numPartitions
```

After fixing this regression I saw:

```
java.lang.IllegalStateException: operating on canonicalized plan
```

Closes #31793 from andygrove/SPARK-34682.

Lead-authored-by: Andy Grove <[email protected]>
Co-authored-by: Andy Grove <[email protected]>
Signed-off-by: HyukjinKwon <[email protected]>
(cherry picked from commit fd4843803c4670c656a94c1af652fb4b945bc82c)
Signed-off-by: HyukjinKwon <[email protected]>

* [SPARK-34682][SQL] Use PrivateMethodTester instead of reflection

### Why are the changes needed?
SPARK-34682 was merged prematurely. This PR implements feedback from the review. I wasn't sure whether I should create a new JIRA or not.

### Does this PR introduce _any_ user-facing change?
No. Just improves the test.

### How was this patch tested?
Updated test.

Closes #31798 from andygrove/SPARK-34682-follow-up.

Authored-by: Andy Grove <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit fc182f7e7f9ff55a6a005044ae0968340cf6f30d)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [MINOR][SQL] Remove unnecessary extend from BroadcastHashJoinExec

### What changes were proposed in this pull request?

This is just a minor fix. `HashJoin` already extends `JoinCodegenSupport`. So we don't need `CodegenSupport` here for `BroadcastHashJoinExec`. Submitted separately as a PR here per https://github.com/apache/spark/pull/31802#discussion_r592066686 .

### Why are the changes needed?

Clean up code.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing unit tests.

Closes #31805 from c21/bhj-minor.

Authored-by: Cheng Su <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 14ad7afa1aa0f3bfd75f1bf076a27af792721190)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-34713][SQL] Fix group by CreateStruct with ExtractValue

### What changes were proposed in this pull request?

This is a bug caused by https://issues.apache.org/jira/browse/SPARK-31670 . We remove the `Alias` when resolving column references in grouping expressions, which breaks `ResolveCreateNamedStruct`

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #31808 from cloud-fan/bug.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 6a42b633bf39981242f6f0d13ae40919f3fa7f8b)
Signed-off-by: Dongjoon Hyun <[email protected]>

* [SPARK-34724][SQL] Fix Interpreted evaluation by using getMethod instead of getDeclaredMethod

### What changes were proposed in this pull request?

This bug was introduced by SPARK-23583 at Apache Spark 2.4.0.

This PR aims to use `getMethod` instead of `getDeclaredMethod`.
```scala
- obj.getClass.getDeclaredMethod(functionName, argClasses: _*)
+ obj.getClass.getMethod(functionName, argClasses: _*)
```

### Why are the changes needed?

`getDeclaredMethod` does not search the super class's method. To invoke `GenericArrayData.toIntArray`, we need to use `getMethod` because it's declared at the super class `ArrayData`.

```
[info] - encode/decode …
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants