Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix json schema inference
  • Loading branch information
cloud-fan committed May 20, 2018
commit a1519d4aa692adceef1f3878a2ccd1715bf6175a
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.internal.config._
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.catalyst.analysis.Resolver
import org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator
import org.apache.spark.util.Utils

////////////////////////////////////////////////////////////////////////////////////////////////////
// This file defines the configuration options for Spark SQL.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,10 @@ object SQLExecution {
}
}

/**
* Wrap an action with specified SQL configs. These configs will be propagated to the executor
* side via job local properties.
*/
def withSQLConfPropagated[T](sparkSession: SparkSession)(body: => T): T = {
val sc = sparkSession.sparkContext
// Set all the specified SQL configs to local properties, so that they can be available at
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ private[sql] object JsonInferSchema {
val parseMode = configOptions.parseMode
val columnNameOfCorruptRecord = configOptions.columnNameOfCorruptRecord

// perform schema inference on each row and merge afterwards
val rootType = json.mapPartitions { iter =>
// In each RDD partition, perform schema inference on each row and merge afterwards.
val typeMerger = compatibleRootType(columnNameOfCorruptRecord, parseMode)
val mergedTypesFromPartitions = json.mapPartitions { iter =>
val factory = new JsonFactory()
configOptions.setJacksonOptions(factory)
iter.flatMap { row =>
Expand All @@ -66,9 +67,13 @@ private[sql] object JsonInferSchema {
s"Parse Mode: ${FailFastMode.name}.", e)
}
}
}
}.fold(StructType(Nil))(
compatibleRootType(columnNameOfCorruptRecord, parseMode))
}.reduceOption(typeMerger).toIterator
}

// Here we get RDD local iterator then fold, instead of calling `RDD.fold` directly, because
// `RDD.fold` will run the fold function in DAGScheduler event loop thread, which may not have
// active SparkSession and `SQLConf.get` may point to the wrong configs.
val rootType = mergedTypesFromPartitions.toLocalIterator.fold(StructType(Nil))(typeMerger)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I re-run the JsonBenmark and no performance regression is observed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can the same problem happen also in other places? this seems to be quite a tricky issue which may happen in general. Can we avoid it somehow?


canonicalizeType(rootType) match {
case Some(st: StructType) => st
Expand Down