Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revise PR to set tmpdir in SQLUtils
Also add a unit test that checks if new table is created in tmpdir
  • Loading branch information
shivaram committed Mar 6, 2017
commit b14c3028e325c7803353aafa81ba37706eba537f
13 changes: 6 additions & 7 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -377,12 +377,6 @@ sparkR.session <- function(
overrideEnvs(sparkConfigMap, paramMap)
}

# NOTE(shivaram): Set default warehouse dir to tmpdir to meet CRAN requirements
# See SPARK-18817 for more details
if (!exists("spark.sql.default.warehouse.dir", envir = sparkConfigMap)) {
assign("spark.sql.default.warehouse.dir", tempdir(), envir = sparkConfigMap)
}

deployMode <- ""
if (exists("spark.submit.deployMode", envir = sparkConfigMap)) {
deployMode <- sparkConfigMap[["spark.submit.deployMode"]]
Expand All @@ -407,11 +401,16 @@ sparkR.session <- function(
sparkConfigMap)
} else {
jsc <- get(".sparkRjsc", envir = .sparkREnv)
# NOTE(shivaram): Pass in a tempdir that is optionally used if the user has not
# overridden this. See SPARK-18817 for more details
warehouseTmpDir <- file.path(tempdir(), "spark-warehouse")

sparkSession <- callJStatic("org.apache.spark.sql.api.r.SQLUtils",
"getOrCreateSparkSession",
jsc,
sparkConfigMap,
enableHiveSupport)
enableHiveSupport,
warehouseTmpDir)
assign(".sparkRsession", sparkSession, envir = .sparkREnv)
}
sparkSession
Expand Down
14 changes: 0 additions & 14 deletions R/pkg/inst/tests/testthat/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,6 @@ test_that("repeatedly starting and stopping SparkSession", {
}
})

test_that("Default warehouse dir should be set to tempdir", {
sparkR.session.stop()
sparkR.session(enableHiveSupport = FALSE)

# Create a temporary table
sql("CREATE TABLE people_warehouse_test")
# spark-warehouse should be written only tempdir() and not current working directory
res <- list.files(path = ".", pattern = ".*spark-warehouse.*",
recursive = TRUE, include.dirs = TRUE)
expect_equal(length(res), 0)
result <- sql("DROP TABLE people_warehouse_test")
sparkR.session.stop()
})

test_that("rdd GC across sparkR.stop", {
sc <- sparkR.sparkContext() # sc should get id 0
rdd1 <- parallelize(sc, 1:20, 2L) # rdd1 should get id 1
Expand Down
14 changes: 14 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2890,6 +2890,20 @@ test_that("Collect on DataFrame when NAs exists at the top of a timestamp column
expect_equal(class(ldf3$col3), c("POSIXct", "POSIXt"))
})

test_that("Default warehouse dir should be set to tempdir", {
setHiveContext(sc)

# Create a temporary database and a table in it
sql("CREATE DATABASE db1")
sql("USE db1")
sql("CREATE TABLE boxes (width INT, length INT, height INT)")
# spark-warehouse should be written only tempdir() and not current working directory
expect_true(file.exists(file.path(tempdir(), "spark-warehouse", "db1.db", "boxes")))
sql("DROP TABLE boxes")
sql("DROP DATABASE db1")
unsetHiveContext(sc)
})

unlink(parquetPath)
unlink(orcPath)
unlink(jsonPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.execution.command.ShowTablesCommand
import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
import org.apache.spark.sql.internal.StaticSQLConf.WAREHOUSE_PATH
import org.apache.spark.sql.types._

private[sql] object SQLUtils extends Logging {
Expand All @@ -46,7 +47,17 @@ private[sql] object SQLUtils extends Logging {
def getOrCreateSparkSession(
jsc: JavaSparkContext,
sparkConfigMap: JMap[Object, Object],
enableHiveSupport: Boolean): SparkSession = {
enableHiveSupport: Boolean,
warehouseDir: String): SparkSession = {

// Check if SparkContext of sparkConfigMap contains spark.sql.warehouse.dir
// If not, set it to warehouseDir chosen by the R process.
// NOTE: We need to do this before creating the SparkSession.
val sqlWarehouseKey = WAREHOUSE_PATH.key
if (!jsc.sc.conf.contains(sqlWarehouseKey) && !sparkConfigMap.containsKey(sqlWarehouseKey)) {
jsc.sc.conf.set(sqlWarehouseKey, warehouseDir)
}

val spark = if (SparkSession.hiveClassesArePresent && enableHiveSupport
&& jsc.sc.conf.get(CATALOG_IMPLEMENTATION.key, "hive").toLowerCase == "hive") {
SparkSession.builder().sparkContext(withHiveExternalCatalog(jsc.sc)).getOrCreate()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -864,14 +864,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)

def warehousePath: String = {
if (contains(StaticSQLConf.WAREHOUSE_PATH.key) &&
getConf(StaticSQLConf.WAREHOUSE_PATH).isDefined) {
new Path(getConf(StaticSQLConf.WAREHOUSE_PATH).get).toString
} else {
new Path(getConf(StaticSQLConf.DEFAULT_WAREHOUSE_PATH)).toString
}
}
def warehousePath: String = new Path(getConf(StaticSQLConf.WAREHOUSE_PATH)).toString

def hiveThriftServerSingleSession: Boolean =
getConf(StaticSQLConf.HIVE_THRIFT_SERVER_SINGLESESSION)
Expand Down Expand Up @@ -1019,7 +1012,7 @@ object StaticSQLConf {
val WAREHOUSE_PATH = buildStaticConf("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
.stringConf
.createOptional
.createWithDefault(Utils.resolveURI("spark-warehouse").toString)

val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,23 @@

package org.apache.spark.sql.api.r

import org.apache.spark.sql.test.SharedSQLContext
import java.util.HashMap

class SQLUtilsSuite extends SharedSQLContext {
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.test.SharedSQLContext

import testImplicits._
class SQLUtilsSuite extends SparkFunSuite {

test("dfToCols should collect and transpose a data frame") {
val sparkSession = SparkSession.builder()
.master("local")
.config("spark.ui.enabled", value = false)
.getOrCreate()

import sparkSession.implicits._

val df = Seq(
(1, 2, 3),
(4, 5, 6)
Expand All @@ -33,6 +43,18 @@ class SQLUtilsSuite extends SharedSQLContext {
Array(2, 5),
Array(3, 6)
))
sparkSession.stop()
}

test("warehouse path is set correctly by R constructor") {
SparkSession.clearDefaultSession()
val conf = new SparkConf().setAppName("test").setMaster("local")
val sparkContext2 = new SparkContext(conf)
val jsc = new JavaSparkContext(sparkContext2)
val warehouseDir = "/tmp/test-warehouse-dir"
val session = SQLUtils.getOrCreateSparkSession(
jsc, new HashMap[Object, Object], false, warehouseDir)
assert(session.sessionState.conf.warehousePath == warehouseDir)
session.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,63 +221,6 @@ class SQLConfSuite extends QueryTest with SharedSQLContext {
.sessionState.conf.warehousePath.stripSuffix("/"))
}

test("changing default value of warehouse path") {
// Set sql.default.warehouse.dir but not sql.warehouse.dir
try {
val newWarehouseDefault = "spark-warehouse2"
val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString
sparkContext.conf.set("spark.sql.default.warehouse.dir", newWarehouseDefaultPath)
val spark = new SparkSession(sparkContext)
assert(newWarehouseDefaultPath.stripSuffix("/") === spark
.sessionState.conf.warehousePath.stripSuffix("/"))
assert(newWarehouseDefaultPath.stripSuffix("/") === spark
.sharedState.warehousePath.stripSuffix("/"))
} finally {
sparkContext.conf.remove("spark.sql.default.warehouse.dir")
}

// Set sql.warehouse.dir and sql.default.warehouse.dir. The first one should be used
try {
val newWarehouseDefault = "spark-warehouse2"
val newWarehouseDefaultPath = new Path(Utils.resolveURI(newWarehouseDefault)).toString
sparkContext.conf.set("spark.sql.default.warehouse.dir", newWarehouseDefaultPath)

val newWarehouse = "spark-warehouse3"
val newWarehousePath = new Path(Utils.resolveURI(newWarehouse)).toString
sparkContext.conf.set("spark.sql.warehouse.dir", newWarehousePath)
val spark = new SparkSession(sparkContext)
assert(newWarehousePath.stripSuffix("/") === spark
.sessionState.conf.warehousePath.stripSuffix("/"))
assert(newWarehousePath.stripSuffix("/") === spark
.sharedState.warehousePath.stripSuffix("/"))
} finally {
sparkContext.conf.remove("spark.sql.default.warehouse.dir")
sparkContext.conf.remove("spark.sql.warehouse.dir")
}

// Set sql.warehouse.dir but not sql.default.warehouse.dir
try {
val newWarehouse = "spark-warehouse4"
val newWarehousePath = new Path(Utils.resolveURI(newWarehouse)).toString
sparkContext.conf.set("spark.sql.warehouse.dir", newWarehousePath)
val spark = new SparkSession(sparkContext)
assert(newWarehousePath.stripSuffix("/") === spark
.sessionState.conf.warehousePath.stripSuffix("/"))
assert(newWarehousePath.stripSuffix("/") === spark
.sharedState.warehousePath.stripSuffix("/"))
} finally {
sparkContext.conf.remove("spark.sql.warehouse.dir")
}

// Set neither of the two configs. The default value should be "spark-warehouse"

val spark = new SparkSession(sparkContext)
assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark
.sessionState.conf.warehousePath.stripSuffix("/"))
assert(new Path(Utils.resolveURI("spark-warehouse")).toString.stripSuffix("/") === spark
.sharedState.warehousePath.stripSuffix("/"))
}

test("MAX_CASES_BRANCHES") {
withTable("tab1") {
spark.range(10).write.saveAsTable("tab1")
Expand Down