Skip to content
Closed
Prev Previous commit
Next Next commit
set conf to lazy val
  • Loading branch information
yaooqinn committed Feb 1, 2019
commit d57226a2823fec0c881a2a32d9d595725d0d481b
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,28 @@ import org.apache.spark.util.{MutableURLClassLoader, Utils}

/**
* A class that holds all state shared across sessions in a given [[SQLContext]].
*
* @param sparkContext The Spark context associated with this SharedState
* @param initialConfigs The configs from the very first created SparkSession
*/
private[sql] class SharedState(
val sparkContext: SparkContext,
initialConfigs: scala.collection.Map[String, String])
extends Logging {
private val conf = sparkContext.conf.clone()
private val hadoopConf = new Configuration(sparkContext.hadoopConfiguration)

// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
initialConfigs.foreach { case (k, v) =>
logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v")
conf.set(k, v)
hadoopConf.set(k, v)

private lazy val (conf, hadoopConf) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we just move this code block after val warehousePath: String ...? Then we don't need the lazy val.

val confClone = sparkContext.conf.clone()
val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration)
// If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing
// `SharedState`, all `SparkSession` level configurations have higher priority to generate a
// `SharedState` instance. This will be done only once then shared across `SparkSession`s
for((k, v) <- initialConfigs
if k != "hive.metastore.warehouse.dir" || k != WAREHOUSE_PATH.key) {
logDebug(s"Applying initial SparkSession options to SparkConf/HadoopConf: $k -> $v")
confClone.set(k, v)
hadoopConfClone.set(k, v)
}
(confClone, hadoopConfClone)
}

// Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on
Expand All @@ -62,17 +69,17 @@ private[sql] class SharedState(
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
logInfo(s"loading hive config file: $configFile")
hadoopConf.addResource(configFile)
sparkContext.hadoopConfiguration.addResource(configFile)
}

// hive.metastore.warehouse.dir only stay in hadoopConf
conf.remove("hive.metastore.warehouse.dir")
sparkContext.conf.remove("hive.metastore.warehouse.dir")
// Set the Hive metastore warehouse path to the one we use
val hiveWarehouseDir = hadoopConf.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !conf.contains(WAREHOUSE_PATH.key)) {
val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
// If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set,
// we will respect the value of hive.metastore.warehouse.dir.
conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " +
s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " +
s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').")
Expand All @@ -82,14 +89,13 @@ private[sql] class SharedState(
// the value of spark.sql.warehouse.dir.
// When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set,
// we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir.
val sparkWarehouseDir = conf.get(WAREHOUSE_PATH)
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " +
s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
hadoopConf.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
sparkWarehouseDir
}
}
sparkContext.conf.set(WAREHOUSE_PATH.key, warehousePath)
logInfo(s"Warehouse path is '$warehousePath'.")


Expand All @@ -102,9 +108,9 @@ private[sql] class SharedState(
* A status store to query SQL status/metrics of this Spark application, based on SQL-specific
* [[org.apache.spark.scheduler.SparkListenerEvent]]s.
*/
val statusStore: SQLAppStatusStore = {
lazy val statusStore: SQLAppStatusStore = {
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
val listener = new SQLAppStatusListener(conf, kvStore, live = true)
sparkContext.listenerBus.addToStatusQueue(listener)
val statusStore = new SQLAppStatusStore(kvStore, Some(listener))
sparkContext.ui.foreach(new SQLTab(statusStore, _))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ private[hive] class TestHiveExternalCatalog(
private[hive] class TestHiveSharedState(
sc: SparkContext,
hiveClient: Option[HiveClient] = None)
extends SharedState(sc, Map.empty) {
extends SharedState(sc, initialConfigs = Map.empty) {

override lazy val externalCatalog: ExternalCatalogWithListener = {
new ExternalCatalogWithListener(new TestHiveExternalCatalog(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
* limitations under the License.
*/

package org.apache.spark.sql.internal
package org.apache.spark.sql.hive

import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.sql.SparkSession

class SharedStateSuite extends SparkFunSuite {
class HiveSharedStateSuite extends SparkFunSuite {

test("the catalog should be determined at the very first") {
val conf = new SparkConf().setMaster("local").setAppName("SharedState Test")
Expand Down