Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export("sparkR.session")
export("sparkR.init")
export("sparkR.stop")
export("sparkR.session.stop")
export("sparkR.conf")
export("print.jobj")

export("sparkRSQL.init",
Expand Down
50 changes: 46 additions & 4 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,53 @@ infer_type <- function(x) {
}
}

getDefaultSqlSource <- function() {
#' Get Runtime Config from the current active SparkSession
#'
#' Get Runtime Config from the current active SparkSession.
#' To change SparkSession Runtime Config, please see `sparkR.session()`.
#'
#' @param key (optional) The key of the config to get, if omitted, all config is returned
#' @param defaultValue (optional) The default value of the config to return if they config is not
#' set, if omitted, the call fails if the config key is not set
#' @return a list of config values with keys as their names
#' @rdname sparkR.conf
#' @name sparkR.conf
#' @export
#' @examples
#'\dontrun{
#' sparkR.session()
#' allConfigs <- sparkR.conf()
#' masterValue <- unlist(sparkR.conf("spark.master"))
#' namedConfig <- sparkR.conf("spark.executor.memory", "0g")
#' }
#' @note sparkR.conf since 2.0.0
sparkR.conf <- function(key, defaultValue) {
sparkSession <- getSparkSession()
conf <- callJMethod(sparkSession, "conf")
source <- callJMethod(conf, "get", "spark.sql.sources.default", "org.apache.spark.sql.parquet")
source
if (missing(key)) {
m <- callJStatic("org.apache.spark.sql.api.r.SQLUtils", "getSessionConf", sparkSession)
as.list(m, all.names = TRUE, sorted = TRUE)
} else {
conf <- callJMethod(sparkSession, "conf")
value <- if (missing(defaultValue)) {
tryCatch(callJMethod(conf, "get", key),
error = function(e) {
if (any(grep("java.util.NoSuchElementException", as.character(e)))) {
stop(paste0("Config '", key, "' is not set"))
} else {
stop(paste0("Unknown error: ", as.character(e)))
}
})
} else {
callJMethod(conf, "get", key, defaultValue)
}
l <- setNames(list(value), key)
l
}
}

getDefaultSqlSource <- function() {
l <- sparkR.conf("spark.sql.sources.default", "org.apache.spark.sql.parquet")
l[["spark.sql.sources.default"]]
}

#' Create a SparkDataFrame
Expand Down
12 changes: 6 additions & 6 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2365,7 +2365,7 @@ test_that("randomSplit", {
expect_true(all(sapply(abs(counts / num - weights / sum(weights)), function(e) { e < 0.05 })))
})

test_that("Change config on SparkSession", {
test_that("Setting and getting config on SparkSession", {
# first, set it to a random but known value
conf <- callJMethod(sparkSession, "conf")
property <- paste0("spark.testing.", as.character(runif(1)))
Expand All @@ -2378,17 +2378,17 @@ test_that("Change config on SparkSession", {
names(l) <- property
sparkR.session(sparkConfig = l)

conf <- callJMethod(sparkSession, "conf")
newValue <- callJMethod(conf, "get", property, "")
newValue <- unlist(sparkR.conf(property, ""), use.names = FALSE)
expect_equal(value2, newValue)

value <- as.character(runif(1))
sparkR.session(spark.app.name = "sparkSession test", spark.testing.r.session.r = value)
conf <- callJMethod(sparkSession, "conf")
appNameValue <- callJMethod(conf, "get", "spark.app.name", "")
testValue <- callJMethod(conf, "get", "spark.testing.r.session.r", "")
allconf <- sparkR.conf()
appNameValue <- allconf[["spark.app.name"]]
testValue <- allconf[["spark.testing.r.session.r"]]
expect_equal(appNameValue, "sparkSession test")
expect_equal(testValue, value)
expect_error(sparkR.conf("completely.dummy"), "Config 'completely.dummy' is not set")
})

test_that("enableHiveSupport on SparkSession", {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ private[sql] object SQLUtils extends Logging {
}
}

def getSessionConf(spark: SparkSession): JMap[String, String] = {
spark.conf.getAll.asJava
}

def getJavaSparkContext(spark: SparkSession): JavaSparkContext = {
new JavaSparkContext(spark.sparkContext)
}
Expand Down