-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-1845] [SQL] Use AllScalaRegistrar for SparkSqlSerializer to register serializers of ... #790
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…of Scala collections.
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
|
Thanks. Merged into master & branch-1.0. |
…gister serializers of ... ...Scala collections. When I execute `orderBy` or `limit` for `SchemaRDD` including `ArrayType` or `MapType`, `SparkSqlSerializer` throws the following exception: ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.$colon$colon ``` or ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.Vector ``` or ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.HashMap$HashTrieMap ``` and so on. This is because registrations of serializers for each concrete collections are missing in `SparkSqlSerializer`. I believe it should use `AllScalaRegistrar`. `AllScalaRegistrar` covers a lot of serializers for concrete classes of `Seq`, `Map` for `ArrayType`, `MapType`. Author: Takuya UESHIN <[email protected]> Closes #790 from ueshin/issues/SPARK-1845 and squashes the following commits: d1ed992 [Takuya UESHIN] Use AllScalaRegistrar for SparkSqlSerializer to register serializers of Scala collections.
…gister serializers of ... ...Scala collections. When I execute `orderBy` or `limit` for `SchemaRDD` including `ArrayType` or `MapType`, `SparkSqlSerializer` throws the following exception: ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.$colon$colon ``` or ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.Vector ``` or ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.HashMap$HashTrieMap ``` and so on. This is because registrations of serializers for each concrete collections are missing in `SparkSqlSerializer`. I believe it should use `AllScalaRegistrar`. `AllScalaRegistrar` covers a lot of serializers for concrete classes of `Seq`, `Map` for `ArrayType`, `MapType`. Author: Takuya UESHIN <[email protected]> Closes #790 from ueshin/issues/SPARK-1845 and squashes the following commits: d1ed992 [Takuya UESHIN] Use AllScalaRegistrar for SparkSqlSerializer to register serializers of Scala collections. (cherry picked from commit db8cc6f) Signed-off-by: Reynold Xin <[email protected]>
|
@ueshin not sure how come github didn't close this pull request. Do you mind closing it? The change has been merged. |
|
Thanks! I'll close it. |
…gister serializers of ... ...Scala collections. When I execute `orderBy` or `limit` for `SchemaRDD` including `ArrayType` or `MapType`, `SparkSqlSerializer` throws the following exception: ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.$colon$colon ``` or ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.Vector ``` or ``` com.esotericsoftware.kryo.KryoException: Class cannot be created (missing no-arg constructor): scala.collection.immutable.HashMap$HashTrieMap ``` and so on. This is because registrations of serializers for each concrete collections are missing in `SparkSqlSerializer`. I believe it should use `AllScalaRegistrar`. `AllScalaRegistrar` covers a lot of serializers for concrete classes of `Seq`, `Map` for `ArrayType`, `MapType`. Author: Takuya UESHIN <[email protected]> Closes apache#790 from ueshin/issues/SPARK-1845 and squashes the following commits: d1ed992 [Takuya UESHIN] Use AllScalaRegistrar for SparkSqlSerializer to register serializers of Scala collections.
…ock to avoid possible deadlock (apache#790) ### What changes were proposed in this pull request? Do not hold `uninterruptibleLock` monitor while calling `super.interrupt()` in `UninterruptibleThread`, instead use newly introduced `awaitInterruptThread` flag and wait for `super.interrupt()` to be called. ### Why are the changes needed? There is potential deadlock as `UninterruptibleThread` may be blocked on NIO operation and interrupting channel while holding `uninterruptibleLock` monitor may cause deadlock like in ``` Found one Java-level deadlock: ============================= "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite": waiting to lock monitor 0x00006000036ee3c0 (object 0x000000070f3019d0, a java.lang.Object), which is held by "task thread" "task thread": waiting to lock monitor 0x00006000036e75a0 (object 0x000000070f70fe80, a java.lang.Object), which is held by "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite" Java stack information for the threads listed above: =================================================== "pool-1-thread-1-ScalaTest-running-UninterruptibleThreadSuite": at java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base17.0.14/AbstractInterruptibleChannel.java:157) - waiting to lock <0x000000070f3019d0> (a java.lang.Object) at java.lang.Thread.interrupt(java.base17.0.14/Thread.java:1004) - locked <0x000000070f70fc90> (a java.lang.Object) at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:99) - locked <0x000000070f70fe80> (a java.lang.Object) at org.apache.spark.util.UninterruptibleThreadSuite.$anonfun$new$5(UninterruptibleThreadSuite.scala:159) - locked <0x000000070f70f9f8> (a java.lang.Object) at org.apache.spark.util.UninterruptibleThreadSuite$$Lambda$216/0x000000700120d6c8.apply$mcV$sp(Unknown Source) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18) at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69) at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155) at org.apache.spark.SparkFunSuite$$Lambda$205/0x0000007001207700.apply(Unknown Source) at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.funsuite.AnyFunSuiteLike$$anon$1.apply(AnyFunSuiteLike.scala:226) at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:227) at org.scalatest.funsuite.AnyFunSuiteLike.invokeWithFixture$1(AnyFunSuiteLike.scala:224) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTest$1(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$343/0x00000070012867b0.apply(Unknown Source) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.funsuite.AnyFunSuiteLike.runTest(AnyFunSuiteLike.scala:236) at org.scalatest.funsuite.AnyFunSuiteLike.runTest$(AnyFunSuiteLike.scala:218) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(SparkFunSuite.scala:69) at org.scalatest.BeforeAndAfterEach.runTest(BeforeAndAfterEach.scala:234) at org.scalatest.BeforeAndAfterEach.runTest$(BeforeAndAfterEach.scala:227) at org.apache.spark.SparkFunSuite.runTest(SparkFunSuite.scala:69) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$runTests$1(AnyFunSuiteLike.scala:269) at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$339/0x00000070012833e0.apply(Unknown Source) at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413) at org.scalatest.SuperEngine$$Lambda$340/0x0000007001283998.apply(Unknown Source) at scala.collection.immutable.List.foreach(List.scala:334) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475) at org.scalatest.funsuite.AnyFunSuiteLike.runTests(AnyFunSuiteLike.scala:269) at org.scalatest.funsuite.AnyFunSuiteLike.runTests$(AnyFunSuiteLike.scala:268) at org.scalatest.funsuite.AnyFunSuite.runTests(AnyFunSuite.scala:1564) at org.scalatest.Suite.run(Suite.scala:1114) at org.scalatest.Suite.run$(Suite.scala:1096) at org.scalatest.funsuite.AnyFunSuite.org$scalatest$funsuite$AnyFunSuiteLike$$super$run(AnyFunSuite.scala:1564) at org.scalatest.funsuite.AnyFunSuiteLike.$anonfun$run$1(AnyFunSuiteLike.scala:273) at org.scalatest.funsuite.AnyFunSuiteLike$$Lambda$332/0x000000700127b000.apply(Unknown Source) at org.scalatest.SuperEngine.runImpl(Engine.scala:535) at org.scalatest.funsuite.AnyFunSuiteLike.run(AnyFunSuiteLike.scala:273) at org.scalatest.funsuite.AnyFunSuiteLike.run$(AnyFunSuiteLike.scala:272) at org.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:69) at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:213) at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210) at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208) at org.apache.spark.SparkFunSuite.run(SparkFunSuite.scala:69) at org.scalatest.tools.Framework.org$scalatest$tools$Framework$$runSuite(Framework.scala:321) at org.scalatest.tools.Framework$ScalaTestTask.execute(Framework.scala:517) at sbt.ForkMain$Run.lambda$runTest$1(ForkMain.java:414) at sbt.ForkMain$Run$$Lambda$107/0x0000007001110000.call(Unknown Source) at java.util.concurrent.FutureTask.run(java.base17.0.14/FutureTask.java:264) at java.util.concurrent.ThreadPoolExecutor.runWorker(java.base17.0.14/ThreadPoolExecutor.java:1136) at java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base17.0.14/ThreadPoolExecutor.java:635) at java.lang.Thread.run(java.base17.0.14/Thread.java:840) "task thread": at org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:96) - waiting to lock <0x000000070f70fe80> (a java.lang.Object) at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.implCloseChannel(UninterruptibleThreadSuite.scala:143) at java.nio.channels.spi.AbstractInterruptibleChannel.close(java.base17.0.14/AbstractInterruptibleChannel.java:112) - locked <0x000000070f3019d0> (a java.lang.Object) at org.apache.spark.util.UninterruptibleThreadSuite$InterruptibleChannel.<init>(UninterruptibleThreadSuite.scala:138) at org.apache.spark.util.UninterruptibleThreadSuite$$anon$5.run(UninterruptibleThreadSuite.scala:153) Found 1 deadlock. ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added 2 new test cases to the `UninterruptibleThreadSuite` ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#50810 from vrozov/SPARK-51821. Authored-by: Vlad Rozov <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> Co-authored-by: Vlad Rozov <[email protected]>
...Scala collections.
When I execute
orderByorlimitforSchemaRDDincludingArrayTypeorMapType,SparkSqlSerializerthrows the following exception:or
or
and so on.
This is because registrations of serializers for each concrete collections are missing in
SparkSqlSerializer.I believe it should use
AllScalaRegistrar.AllScalaRegistrarcovers a lot of serializers for concrete classes ofSeq,MapforArrayType,MapType.