-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-25159][SQL] json schema inference should only trigger one job #22152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -26,6 +26,7 @@ import org.apache.spark.rdd.RDD | |
| import org.apache.spark.sql.catalyst.analysis.TypeCoercion | ||
| import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil | ||
| import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, ParseMode, PermissiveMode} | ||
| import org.apache.spark.sql.internal.SQLConf | ||
| import org.apache.spark.sql.types._ | ||
| import org.apache.spark.util.Utils | ||
|
|
||
|
|
@@ -69,10 +70,17 @@ private[sql] object JsonInferSchema { | |
| }.reduceOption(typeMerger).toIterator | ||
| } | ||
|
|
||
| // Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because | ||
| // `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have | ||
| // active SparkSession and `SQLConf.get` may point to the wrong configs. | ||
| val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger) | ||
| // Here we manually submit a fold-like Spark job, so that we can set the SQLConf when running | ||
| // the fold functions in the scheduler event loop thread. | ||
| val existingConf = SQLConf.get | ||
| var rootType: DataType = StructType(Nil) | ||
| val foldPartition = (iter: Iterator[DataType]) => iter.fold(StructType(Nil))(typeMerger) | ||
| val mergeResult = (index: Int, taskResult: DataType) => { | ||
| rootType = SQLConf.withExistingConf(existingConf) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. just a question, wouldn't: do the same without requiring these changes?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This can work, but the problem is, we have to keep a large result array which can cause GC problems.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it would contain one result per partition, do you think this is enough to cause GC problems?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the schema can be very complex (e.g. very wide and deep schema).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes, makes sense, thanks.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same question was in my mind. thanks for clarification. |
||
| typeMerger(rootType, taskResult) | ||
| } | ||
| } | ||
| json.sparkContext.runJob(mergedTypesFromPartitions, foldPartition, mergeResult) | ||
|
|
||
| canonicalizeType(rootType, configOptions) match { | ||
| case Some(st: StructType) => st | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -82,6 +82,19 @@ object SQLConf { | |
| /** See [[get]] for more information. */ | ||
| def getFallbackConf: SQLConf = fallbackConf.get() | ||
|
|
||
| private lazy val existingConf = new ThreadLocal[SQLConf] { | ||
| override def initialValue: SQLConf = null | ||
| } | ||
|
|
||
| def withExistingConf[T](conf: SQLConf)(f: => T): T = { | ||
| existingConf.set(conf) | ||
| try { | ||
| f | ||
| } finally { | ||
| existingConf.remove() | ||
| } | ||
| } | ||
|
|
||
| /** | ||
| * Defines a getter that returns the SQLConf within scope. | ||
| * See [[get]] for more information. | ||
|
|
@@ -116,16 +129,24 @@ object SQLConf { | |
| if (TaskContext.get != null) { | ||
| new ReadOnlySQLConf(TaskContext.get()) | ||
| } else { | ||
| if (Utils.isTesting && SparkContext.getActive.isDefined) { | ||
| val isSchedulerEventLoopThread = SparkContext.getActive | ||
| .map(_.dagScheduler.eventProcessLoop.eventThread) | ||
| .exists(_.getId == Thread.currentThread().getId) | ||
| if (isSchedulerEventLoopThread) { | ||
| // DAGScheduler event loop thread does not have an active SparkSession, the `confGetter` | ||
| // will return `fallbackConf` which is unexpected. Here we prevent it from happening. | ||
| val schedulerEventLoopThread = | ||
| SparkContext.getActive.get.dagScheduler.eventProcessLoop.eventThread | ||
| if (schedulerEventLoopThread.getId == Thread.currentThread().getId) { | ||
| // will return `fallbackConf` which is unexpected. Here we requires the caller to get the | ||
|
||
| // conf within `withExistingConf`, otherwise fail the query. | ||
| val conf = existingConf.get() | ||
| if (conf != null) { | ||
| conf | ||
| } else if (Utils.isTesting) { | ||
| throw new RuntimeException("Cannot get SQLConf inside scheduler event loop thread.") | ||
| } else { | ||
| confGetter.get()() | ||
| } | ||
| } else { | ||
| confGetter.get()() | ||
| } | ||
| confGetter.get()() | ||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -27,6 +27,7 @@ import scala.util.Random | |
| import org.scalatest.Matchers._ | ||
|
|
||
| import org.apache.spark.SparkException | ||
| import org.apache.spark.scheduler.{SparkListener, SparkListenerJobEnd} | ||
| import org.apache.spark.sql.catalyst.TableIdentifier | ||
| import org.apache.spark.sql.catalyst.expressions.Uuid | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Filter, OneRowRelation, Union} | ||
|
|
@@ -2528,4 +2529,27 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { | |
| checkAnswer(aggPlusFilter1, aggPlusFilter2.collect()) | ||
| } | ||
| } | ||
|
|
||
| test("SPARK-25159: json schema inference should only trigger one job") { | ||
| withTempPath { path => | ||
| // This test is to prove that the `JsonInferSchema` does not use `RDD#toLocalIterator` which | ||
| // triggers one Spark job per RDD partition. | ||
| Seq(1 -> "a", 2 -> "b").toDF("i", "p") | ||
| // The data set has 2 partitions, so Spark will write at least 2 json files. | ||
| // Use a non-splittable compression (gzip), to make sure the json scan RDD has at lease 2 | ||
|
||
| // partitions. | ||
| .write.partitionBy("p").option("compression", "gzip").json(path.getCanonicalPath) | ||
|
|
||
| var numJobs = 0 | ||
| sparkContext.addSparkListener(new SparkListener { | ||
| override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { | ||
| numJobs += 1 | ||
| } | ||
| }) | ||
|
|
||
| val df = spark.read.json(path.getCanonicalPath) | ||
| assert(df.columns === Array("i", "p")) | ||
| assert(numJobs == 1) | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to do
sc.clean(typeMerger)manually here?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This closure is defined by us and I don't think we leak outer reference here. If we do, it's a bug and we should fix it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, agreed.