diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index dce64e1e607c8..cd119b06c78f4 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -76,7 +76,9 @@ exportMethods("glm", export("setJobGroup", "clearJobGroup", "cancelJobGroup", - "setJobDescription") + "setJobDescription", + "setLocalProperty", + "getLocalProperty") # Export Utility methods export("setLogLevel") diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index fb5f1d21fc723..965471f3b07a0 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -560,10 +560,55 @@ cancelJobGroup <- function(sc, groupId) { #'} #' @note setJobDescription since 2.3.0 setJobDescription <- function(value) { + if (!is.null(value)) { + value <- as.character(value) + } sc <- getSparkContext() invisible(callJMethod(sc, "setJobDescription", value)) } +#' Set a local property that affects jobs submitted from this thread, such as the +#' Spark fair scheduler pool. +#' +#' @param key The key for a local property. +#' @param value The value for a local property. +#' @rdname setLocalProperty +#' @name setLocalProperty +#' @examples +#'\dontrun{ +#' setLocalProperty("spark.scheduler.pool", "poolA") +#'} +#' @note setLocalProperty since 2.3.0 +setLocalProperty <- function(key, value) { + if (is.null(key) || is.na(key)) { + stop("key should not be NULL or NA.") + } + if (!is.null(value)) { + value <- as.character(value) + } + sc <- getSparkContext() + invisible(callJMethod(sc, "setLocalProperty", as.character(key), value)) +} + +#' Get a local property set in this thread, or \code{NULL} if it is missing. See +#' \code{setLocalProperty}. +#' +#' @param key The key for a local property. +#' @rdname getLocalProperty +#' @name getLocalProperty +#' @examples +#'\dontrun{ +#' getLocalProperty("spark.scheduler.pool") +#'} +#' @note getLocalProperty since 2.3.0 +getLocalProperty <- function(key) { + if (is.null(key) || is.na(key)) { + stop("key should not be NULL or NA.") + } + sc <- getSparkContext() + callJMethod(sc, "getLocalProperty", as.character(key)) +} + sparkConfToSubmitOps <- new.env() sparkConfToSubmitOps[["spark.driver.memory"]] <- "--driver-memory" sparkConfToSubmitOps[["spark.driver.extraClassPath"]] <- "--driver-class-path" diff --git a/R/pkg/tests/fulltests/test_context.R b/R/pkg/tests/fulltests/test_context.R index 77635c5a256b9..f0d0a5114f89f 100644 --- a/R/pkg/tests/fulltests/test_context.R +++ b/R/pkg/tests/fulltests/test_context.R @@ -100,7 +100,6 @@ test_that("job group functions can be called", { setJobGroup("groupId", "job description", TRUE) cancelJobGroup("groupId") clearJobGroup() - setJobDescription("job description") suppressWarnings(setJobGroup(sc, "groupId", "job description", TRUE)) suppressWarnings(cancelJobGroup(sc, "groupId")) @@ -108,6 +107,38 @@ test_that("job group functions can be called", { sparkR.session.stop() }) +test_that("job description and local properties can be set and got", { + sc <- sparkR.sparkContext(master = sparkRTestMaster) + setJobDescription("job description") + expect_equal(getLocalProperty("spark.job.description"), "job description") + setJobDescription(1234) + expect_equal(getLocalProperty("spark.job.description"), "1234") + setJobDescription(NULL) + expect_equal(getLocalProperty("spark.job.description"), NULL) + setJobDescription(NA) + expect_equal(getLocalProperty("spark.job.description"), NULL) + + setLocalProperty("spark.scheduler.pool", "poolA") + expect_equal(getLocalProperty("spark.scheduler.pool"), "poolA") + setLocalProperty("spark.scheduler.pool", NULL) + expect_equal(getLocalProperty("spark.scheduler.pool"), NULL) + setLocalProperty("spark.scheduler.pool", NA) + expect_equal(getLocalProperty("spark.scheduler.pool"), NULL) + + setLocalProperty(4321, 1234) + expect_equal(getLocalProperty(4321), "1234") + setLocalProperty(4321, NULL) + expect_equal(getLocalProperty(4321), NULL) + setLocalProperty(4321, NA) + expect_equal(getLocalProperty(4321), NULL) + + expect_error(setLocalProperty(NULL, "should fail"), "key should not be NULL or NA") + expect_error(getLocalProperty(NULL), "key should not be NULL or NA") + expect_error(setLocalProperty(NA, "should fail"), "key should not be NULL or NA") + expect_error(getLocalProperty(NA), "key should not be NULL or NA") + sparkR.session.stop() +}) + test_that("utility function can be called", { sparkR.sparkContext(master = sparkRTestMaster) setLogLevel("ERROR")