Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Merge branch 'master' into SPARK-25815
  • Loading branch information
Marcelo Vanzin committed Dec 3, 2018
commit cda56a9eb2434de19e4681bb636e6b4918f9b1bc
2 changes: 2 additions & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,8 @@ exportMethods("%<=>%",
"row_number",
"rpad",
"rtrim",
"schema_of_csv",
"schema_of_json",
"second",
"sha1",
"sha2",
Expand Down
22 changes: 16 additions & 6 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,6 @@ setMethod("repartition",
#' \item{2.} {Return a new SparkDataFrame range partitioned by the given column(s),
#' using \code{spark.sql.shuffle.partitions} as number of partitions.}
#'}
#'
#' At least one partition-by expression must be specified.
#' When no explicit sort order is specified, "ascending nulls first" is assumed.
#'
Expand Down Expand Up @@ -828,7 +827,6 @@ setMethod("repartitionByRange",
#' toJSON
#'
#' Converts a SparkDataFrame into a SparkDataFrame of JSON string.
#'
#' Each row is turned into a JSON document with columns as different fields.
#' The returned SparkDataFrame has a single character column with the name \code{value}
#'
Expand Down Expand Up @@ -2732,13 +2730,25 @@ setMethod("union",
dataFrame(unioned)
})

#' Return a new SparkDataFrame containing the union of rows
#' Return a new SparkDataFrame containing the union of rows.
#'
#' This is an alias for `union`.
#' This is an alias for \code{union}.
#'
#' @rdname union
#' @name unionAll
#' @param x a SparkDataFrame.
#' @param y a SparkDataFrame.
#' @return A SparkDataFrame containing the result of the unionAll operation.
#' @family SparkDataFrame functions
#' @aliases unionAll,SparkDataFrame,SparkDataFrame-method
#' @rdname unionAll
#' @name unionAll
#' @seealso \link{union}
#' @examples
#'\dontrun{
#' sparkR.session()
#' df1 <- read.json(path)
#' df2 <- read.json(path2)
#' unionAllDF <- unionAll(df1, df2)
#' }
#' @note unionAll since 1.4.0
setMethod("unionAll",
signature(x = "SparkDataFrame", y = "SparkDataFrame"),
Expand Down
77 changes: 70 additions & 7 deletions R/pkg/R/functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,18 @@ NULL
#' also supported for the schema.
#' \item \code{from_csv}: a DDL-formatted string
#' }
#' @param ... additional argument(s). In \code{to_json}, \code{to_csv} and \code{from_json},
#' this contains additional named properties to control how it is converted, accepts
#' the same options as the JSON/CSV data source. Additionally \code{to_json} supports
#' the "pretty" option which enables pretty JSON generation. In \code{arrays_zip},
#' this contains additional Columns of arrays to be merged.
#' @param ... additional argument(s).
#' \itemize{
#' \item \code{to_json}, \code{from_json} and \code{schema_of_json}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the JSON data source.
#' \item \code{to_json}: it supports the "pretty" option which enables pretty
#' JSON generation.
#' \item \code{to_csv}, \code{from_csv} and \code{schema_of_csv}: this contains
#' additional named properties to control how it is converted and accepts the
#' same options as the CSV data source.
#' \item \code{arrays_zip}, this contains additional Columns of arrays to be merged.
#' }
#' @name column_collection_functions
#' @rdname column_collection_functions
#' @family collection functions
Expand Down Expand Up @@ -1771,12 +1778,16 @@ setMethod("to_date",
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a JSON object
#' df2 <- sql("SELECT map('name', 'Bob')) as people")
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts an array of maps into a JSON array
#' df2 <- sql("SELECT array(map('name', 'Bob'), map('name', 'Alice')) as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people))}
#' df2 <- mutate(df2, people_json = to_json(df2$people))
#'
#' # Converts a map into a pretty JSON object
#' df2 <- sql("SELECT map('name', 'Bob') as people")
#' df2 <- mutate(df2, people_json = to_json(df2$people, pretty = TRUE))}
#' @note to_json since 2.2.0
setMethod("to_json", signature(x = "Column"),
function(x, ...) {
Expand Down Expand Up @@ -2285,6 +2296,32 @@ setMethod("from_json", signature(x = "Column", schema = "characterOrstructType")
column(jc)
})

#' @details
#' \code{schema_of_json}: Parses a JSON string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_json schema_of_json,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' json <- "{\"name\":\"Bob\"}"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_json(json)))}
#' @note schema_of_json since 3.0.0
setMethod("schema_of_json", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_json",
col, options)
column(jc)
})

#' @details
#' \code{from_csv}: Parses a column containing a CSV string into a Column of \code{structType}
#' with the specified \code{schema}.
Expand Down Expand Up @@ -2315,6 +2352,32 @@ setMethod("from_csv", signature(x = "Column", schema = "characterOrColumn"),
column(jc)
})

#' @details
#' \code{schema_of_csv}: Parses a CSV string and infers its schema in DDL format.
#'
#' @rdname column_collection_functions
#' @aliases schema_of_csv schema_of_csv,characterOrColumn-method
#' @examples
#'
#' \dontrun{
#' csv <- "Amsterdam,2018"
#' df <- sql("SELECT * FROM range(1)")
#' head(select(df, schema_of_csv(csv)))}
#' @note schema_of_csv since 3.0.0
setMethod("schema_of_csv", signature(x = "characterOrColumn"),
function(x, ...) {
if (class(x) == "character") {
col <- callJStatic("org.apache.spark.sql.functions", "lit", x)
} else {
col <- x@jc
}
options <- varargsToStrEnv(...)
jc <- callJStatic("org.apache.spark.sql.functions",
"schema_of_csv",
col, options)
column(jc)
})

#' @details
#' \code{from_utc_timestamp}: This is a common function for databases supporting TIMESTAMP WITHOUT
#' TIMEZONE. This function takes a timestamp which is timezone-agnostic, and interprets it as a
Expand Down
10 changes: 9 additions & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ setGeneric("toRDD", function(x) { standardGeneric("toRDD") })
#' @rdname union
setGeneric("union", function(x, y) { standardGeneric("union") })

#' @rdname union
#' @rdname unionAll
setGeneric("unionAll", function(x, y) { standardGeneric("unionAll") })

#' @rdname unionByName
Expand Down Expand Up @@ -1206,6 +1206,14 @@ setGeneric("rpad", function(x, len, pad) { standardGeneric("rpad") })
#' @name NULL
setGeneric("rtrim", function(x, trimString) { standardGeneric("rtrim") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_csv", function(x, ...) { standardGeneric("schema_of_csv") })

#' @rdname column_collection_functions
#' @name NULL
setGeneric("schema_of_json", function(x, ...) { standardGeneric("schema_of_json") })

#' @rdname column_aggregate_functions
#' @name NULL
setGeneric("sd", function(x, na.rm = FALSE) { standardGeneric("sd") })
Expand Down
16 changes: 14 additions & 2 deletions R/pkg/tests/fulltests/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1620,14 +1620,20 @@ test_that("column functions", {
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][1], 2)
expect_equal(collect(select(df, bround(df$x, 0)))[[1]][2], 4)

# Test from_csv()
# Test from_csv(), schema_of_csv()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, alias(from_csv(df$col, "a INT"), "csv")))
expect_equal(c[[1]][[1]]$a, 1)
c <- collect(select(df, alias(from_csv(df$col, lit("a INT")), "csv")))
expect_equal(c[[1]][[1]]$a, 1)

# Test to_json(), from_json()
df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_csv("Amsterdam,2018")))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")
c <- collect(select(df, schema_of_csv(lit("Amsterdam,2018"))))
expect_equal(c[[1]], "struct<_c0:string,_c1:int>")

# Test to_json(), from_json(), schema_of_json()
df <- sql("SELECT array(named_struct('name', 'Bob'), named_struct('name', 'Alice')) as people")
j <- collect(select(df, alias(to_json(df$people), "json")))
expect_equal(j[order(j$json), ][1], "[{\"name\":\"Bob\"},{\"name\":\"Alice\"}]")
Expand All @@ -1654,6 +1660,12 @@ test_that("column functions", {
expect_true(any(apply(s, 1, function(x) { x[[1]]$age == 16 })))
}

df <- as.DataFrame(list(list("col" = "1")))
c <- collect(select(df, schema_of_json('{"name":"Bob"}')))
expect_equal(c[[1]], "struct<name:string>")
c <- collect(select(df, schema_of_json(lit('{"name":"Bob"}'))))
expect_equal(c[[1]], "struct<name:string>")

# Test to_json() supports arrays of primitive types and arrays
df <- sql("SELECT array(19, 42, 70) as age")
j <- collect(select(df, alias(to_json(df$age), "json")))
Expand Down
16 changes: 13 additions & 3 deletions bin/docker-image-tool.sh
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@ function build {
fi

local BUILD_ARGS=(${BUILD_PARAMS})

# If a custom SPARK_UID was set add it to build arguments
if [ -n "$SPARK_UID" ]; then
BUILD_ARGS+=(--build-arg spark_uid=$SPARK_UID)
fi

local BINDING_BUILD_ARGS=(
${BUILD_PARAMS}
--build-arg
Expand Down Expand Up @@ -207,8 +213,10 @@ Options:
-t tag Tag to apply to the built image, or to identify the image to be pushed.
-m Use minikube's Docker daemon.
-n Build docker image with --no-cache
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.
-u uid UID to use in the USER directive to set the user the main Spark process runs as inside the
resulting container
-b arg Build arg to build or push the image. For multiple build args, this option needs to
be used separately for each build arg.

Using minikube when building images will do so directly into minikube's Docker daemon.
There is no need to push the images into minikube in that case, they'll be automatically
Expand Down Expand Up @@ -243,7 +251,8 @@ PYDOCKERFILE=
RDOCKERFILE=
NOCACHEARG=
BUILD_PARAMS=
while getopts f:p:R:mr:t:nb: option
SPARK_UID=
while getopts f:p:R:mr:t:nb:u: option
do
case "${option}"
in
Expand All @@ -263,6 +272,7 @@ do
fi
eval $(minikube docker-env)
;;
u) SPARK_UID=${OPTARG};;
esac
done

Expand Down
13 changes: 13 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,19 @@
<scope>provided</scope>
</dependency>

<!--
The following kafka dependency used to obtain delegation token.
In order to prevent spark-core from depending on kafka, these deps have been placed in the
"provided" scope, rather than the "compile" scope, and NoClassDefFoundError exceptions are
handled when the user explicitly use neither spark-streaming-kafka nor spark-sql-kafka modules.
-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>

</dependencies>
<build>
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/resources/org/apache/spark/ui/static/stagepage.js
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,10 @@ function createDataTableForTaskSummaryMetricsTable(taskSummaryMetricsTable) {
"searching": false,
"order": [[0, "asc"]],
"bSort": false,
"bAutoWidth": false
"bAutoWidth": false,
"oLanguage": {
"sEmptyTable": "No tasks have reported metrics yet"
}
};
taskSummaryMetricsDataTable = $(taskMetricsTable).DataTable(taskConf);
}
Expand Down Expand Up @@ -426,7 +429,10 @@ $(document).ready(function () {
}
],
"order": [[0, "asc"]],
"bAutoWidth": false
"bAutoWidth": false,
"oLanguage": {
"sEmptyTable": "No data to show yet"
}
}
var executorSummaryTableSelector =
$("#summary-executor-table").DataTable(executorSummaryConf);
Expand Down
29 changes: 25 additions & 4 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -660,6 +660,7 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
with Logging {

private var encryptionServer: PythonServer[Unit] = null
private var decryptionServer: PythonServer[Unit] = null

/**
* Read data from disks, then copy it to `out`
Expand Down Expand Up @@ -708,16 +709,36 @@ private[spark] class PythonBroadcast(@transient var path: String) extends Serial
override def handleConnection(sock: Socket): Unit = {
val env = SparkEnv.get
val in = sock.getInputStream()
val dir = new File(Utils.getLocalDir(env.conf))
val file = File.createTempFile("broadcast", "", dir)
path = file.getAbsolutePath
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(path))
val abspath = new File(path).getAbsolutePath
val out = env.serializerManager.wrapForEncryption(new FileOutputStream(abspath))
DechunkedInputStream.dechunkAndCopyToOutput(in, out)
}
}
Array(encryptionServer.port, encryptionServer.secret)
}

def setupDecryptionServer(): Array[Any] = {
decryptionServer = new PythonServer[Unit]("broadcast-decrypt-server-for-driver") {
override def handleConnection(sock: Socket): Unit = {
val out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream()))
Utils.tryWithSafeFinally {
val in = SparkEnv.get.serializerManager.wrapForEncryption(new FileInputStream(path))
Utils.tryWithSafeFinally {
Utils.copyStream(in, out, false)
} {
in.close()
}
out.flush()
} {
JavaUtils.closeQuietly(out)
}
}
}
Array(decryptionServer.port, decryptionServer.secret)
}

def waitTillBroadcastDataSent(): Unit = decryptionServer.getResult()

def waitTillDataReceived(): Unit = encryptionServer.getResult()
}
// scalastyle:on no.finalize
Expand Down
8 changes: 6 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -814,14 +814,14 @@ private[spark] class SparkSubmit extends Logging {
mainClass = Utils.classForName(childMainClass)
} catch {
case e: ClassNotFoundException =>
logWarning(s"Failed to load $childMainClass.", e)
logError(s"Failed to load class $childMainClass.")
if (childMainClass.contains("thriftserver")) {
logInfo(s"Failed to load main class $childMainClass.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
}
throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
case e: NoClassDefFoundError =>
logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
logError(s"Failed to load $childMainClass: ${e.getMessage()}")
if (e.getMessage.contains("org/apache/hadoop/hive")) {
logInfo(s"Failed to load hive class.")
logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
Expand Down Expand Up @@ -916,13 +916,17 @@ object SparkSubmit extends CommandLineUtils with Logging {
override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

override protected def logWarning(msg: => String): Unit = self.logWarning(msg)

override protected def logError(msg: => String): Unit = self.logError(msg)
}
}

override protected def logInfo(msg: => String): Unit = printMessage(msg)

override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

override protected def logError(msg: => String): Unit = printMessage(s"Error: $msg")

override def doSubmit(args: Array[String]): Unit = {
try {
super.doSubmit(args)
Expand Down
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.