Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
5244aaf
[SPARK-22897][CORE] Expose stageAttemptId in TaskContext
advancedxy Jan 2, 2018
b96a213
[SPARK-22938] Assert that SQLConf.get is accessed only on the driver.
juliuszsompolski Jan 3, 2018
a05e85e
[SPARK-22934][SQL] Make optional clauses order insensitive for CREATE…
gatorsmile Jan 3, 2018
b962488
[SPARK-20236][SQL] dynamic partition overwrite
cloud-fan Jan 3, 2018
27c949d
[SPARK-22932][SQL] Refactor AnalysisContext
gatorsmile Jan 2, 2018
79f7263
[SPARK-22896] Improvement in String interpolation
chetkhatri Jan 3, 2018
a51212b
[SPARK-20960][SQL] make ColumnVector public
cloud-fan Jan 3, 2018
f51c8fd
[SPARK-22944][SQL] improve FoldablePropagation
cloud-fan Jan 4, 2018
1860a43
[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, t…
felixcheung Jan 4, 2018
a7cfd6b
[SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent
yaooqinn Jan 4, 2018
eb99b8a
[SPARK-22945][SQL] add java UDF APIs in the functions object
cloud-fan Jan 4, 2018
1f5e354
[SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
gatorsmile Jan 4, 2018
bcfeef5
[SPARK-22771][SQL] Add a missing return statement in Concat.checkInpu…
maropu Jan 4, 2018
cd92913
[SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for ex…
jerryshao Jan 4, 2018
bc4bef4
[SPARK-22850][CORE] Ensure queued events are delivered to all event q…
Jan 4, 2018
2ab4012
[SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Jan 4, 2018
84707f0
[SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-…
liyinan926 Jan 4, 2018
ea9da61
[SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly.
Jan 5, 2018
158f7e6
[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
juliuszsompolski Jan 5, 2018
145820b
[SPARK-22825][SQL] Fix incorrect results of Casting Array to String
maropu Jan 5, 2018
5b524cc
[SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed…
MrBago Jan 5, 2018
f9dcdbc
[SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode
liyinan926 Jan 5, 2018
fd4e304
[SPARK-22961][REGRESSION] Constant columns should generate QueryPlanC…
adrian-ionescu Jan 5, 2018
0a30e93
[SPARK-22940][SQL] HiveExternalCatalogVersionsSuite should succeed on…
bersprockets Jan 5, 2018
d1f422c
[SPARK-13030][ML] Follow-up cleanups for OneHotEncoderEstimator
jkbradley Jan 5, 2018
55afac4
[SPARK-22914][DEPLOY] Register history.ui.port
gerashegalov Jan 6, 2018
bf85301
[SPARK-22937][SQL] SQL elt output binary for binary inputs
maropu Jan 6, 2018
3e3e938
[SPARK-22960][K8S] Revert use of ARG base_image in images
liyinan926 Jan 6, 2018
7236914
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs…
icexelloss Jan 6, 2018
e6449e8
[SPARK-22793][SQL] Memory leak in Spark Thrift Server
Jan 6, 2018
0377755
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
fjh100456 Jan 6, 2018
b66700a
[SPARK-22901][PYTHON][FOLLOWUP] Adds the doc for asNondeterministic f…
HyukjinKwon Jan 6, 2018
f9e7b0c
[HOTFIX] Fix style checking failure
gatorsmile Jan 6, 2018
285d342
[SPARK-22973][SQL] Fix incorrect results of Casting Map to String
maropu Jan 7, 2018
516c0a1
Merge pull request #1 from apache/master
fjh100456 Jan 8, 2018
bd1a80a
Merge remote-tracking branch 'upstream/branch-2.3'
fjh100456 Jan 8, 2018
51f4418
Merge branch 'master' of https://github.com/fjh100456/spark
fjh100456 Jan 9, 2018
cf73803
Merge pull request #3 from apache/master
fjh100456 Apr 20, 2018
6515fb1
Merge remote-tracking branch 'origin/master'
fjh100456 Apr 20, 2018
0c39ead
Merge pull request #4 from apache/master
fjh100456 Aug 29, 2018
61a1028
Merge remote-tracking branch 'origin/master'
fjh100456 Aug 29, 2018
a98d1a1
[SPARK-21786][SQL][FOLLOWUP] Add compressionCodec test for CTAS
fjh100456 Aug 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, t…
…rigger, partitionBy

## What changes were proposed in this pull request?

R Structured Streaming API for withWatermark, trigger, partitionBy

## How was this patch tested?

manual, unit tests

Author: Felix Cheung <[email protected]>

Closes #20129 from felixcheung/rwater.

(cherry picked from commit df95a90)
Signed-off-by: Felix Cheung <[email protected]>
  • Loading branch information
felixcheung authored and Felix Cheung committed Jan 4, 2018
commit 1860a43e9affb7619be0a5a1c786e264d09bc446
1 change: 1 addition & 0 deletions R/pkg/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,7 @@ exportMethods("arrange",
"with",
"withColumn",
"withColumnRenamed",
"withWatermark",
"write.df",
"write.jdbc",
"write.json",
Expand Down
96 changes: 92 additions & 4 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -3661,7 +3661,8 @@ setMethod("getNumPartitions",
#' isStreaming
#'
#' Returns TRUE if this SparkDataFrame contains one or more sources that continuously return data
#' as it arrives.
#' as it arrives. A dataset that reads data from a streaming source must be executed as a
#' \code{StreamingQuery} using \code{write.stream}.
#'
#' @param x A SparkDataFrame
#' @return TRUE if this SparkDataFrame is from a streaming source
Expand Down Expand Up @@ -3707,7 +3708,17 @@ setMethod("isStreaming",
#' @param df a streaming SparkDataFrame.
#' @param source a name for external data source.
#' @param outputMode one of 'append', 'complete', 'update'.
#' @param ... additional argument(s) passed to the method.
#' @param partitionBy a name or a list of names of columns to partition the output by on the file
#' system. If specified, the output is laid out on the file system similar to Hive's
#' partitioning scheme.
#' @param trigger.processingTime a processing time interval as a string, e.g. '5 seconds',
#' '1 minute'. This is a trigger that runs a query periodically based on the processing
#' time. If value is '0 seconds', the query will run as fast as possible, this is the
#' default. Only one trigger can be set.
#' @param trigger.once a logical, must be set to \code{TRUE}. This is a trigger that processes only
#' one batch of data in a streaming query then terminates the query. Only one trigger can be
#' set.
#' @param ... additional external data source specific named options.
#'
#' @family SparkDataFrame functions
#' @seealso \link{read.stream}
Expand All @@ -3725,7 +3736,8 @@ setMethod("isStreaming",
#' # console
#' q <- write.stream(wordCounts, "console", outputMode = "complete")
#' # text stream
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp")
#' q <- write.stream(df, "text", path = "/home/user/out", checkpointLocation = "/home/user/cp"
#' partitionBy = c("year", "month"), trigger.processingTime = "30 seconds")
#' # memory stream
#' q <- write.stream(wordCounts, "memory", queryName = "outs", outputMode = "complete")
#' head(sql("SELECT * from outs"))
Expand All @@ -3737,7 +3749,8 @@ setMethod("isStreaming",
#' @note experimental
setMethod("write.stream",
signature(df = "SparkDataFrame"),
function(df, source = NULL, outputMode = NULL, ...) {
function(df, source = NULL, outputMode = NULL, partitionBy = NULL,
trigger.processingTime = NULL, trigger.once = NULL, ...) {
if (!is.null(source) && !is.character(source)) {
stop("source should be character, NULL or omitted. It is the data source specified ",
"in 'spark.sql.sources.default' configuration by default.")
Expand All @@ -3748,12 +3761,43 @@ setMethod("write.stream",
if (is.null(source)) {
source <- getDefaultSqlSource()
}
cols <- NULL
if (!is.null(partitionBy)) {
if (!all(sapply(partitionBy, function(c) { is.character(c) }))) {
stop("All partitionBy column names should be characters.")
}
cols <- as.list(partitionBy)
}
jtrigger <- NULL
if (!is.null(trigger.processingTime) && !is.na(trigger.processingTime)) {
if (!is.null(trigger.once)) {
stop("Multiple triggers not allowed.")
}
interval <- as.character(trigger.processingTime)
if (nchar(interval) == 0) {
stop("Value for trigger.processingTime must be a non-empty string.")
}
jtrigger <- handledCallJStatic("org.apache.spark.sql.streaming.Trigger",
"ProcessingTime",
interval)
} else if (!is.null(trigger.once) && !is.na(trigger.once)) {
if (!is.logical(trigger.once) || !trigger.once) {
stop("Value for trigger.once must be TRUE.")
}
jtrigger <- callJStatic("org.apache.spark.sql.streaming.Trigger", "Once")
}
options <- varargsToStrEnv(...)
write <- handledCallJMethod(df@sdf, "writeStream")
write <- callJMethod(write, "format", source)
if (!is.null(outputMode)) {
write <- callJMethod(write, "outputMode", outputMode)
}
if (!is.null(cols)) {
write <- callJMethod(write, "partitionBy", cols)
}
if (!is.null(jtrigger)) {
write <- callJMethod(write, "trigger", jtrigger)
}
write <- callJMethod(write, "options", options)
ssq <- handledCallJMethod(write, "start")
streamingQuery(ssq)
Expand Down Expand Up @@ -3967,3 +4011,47 @@ setMethod("broadcast",
sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf)
dataFrame(sdf)
})

#' withWatermark
#'
#' Defines an event time watermark for this streaming SparkDataFrame. A watermark tracks a point in
#' time before which we assume no more late data is going to arrive.
#'
#' Spark will use this watermark for several purposes:
#' \itemize{
#' \item{-} To know when a given time window aggregation can be finalized and thus can be emitted
#' when using output modes that do not allow updates.
#' \item{-} To minimize the amount of state that we need to keep for on-going aggregations.
#' }
#' The current watermark is computed by looking at the \code{MAX(eventTime)} seen across
#' all of the partitions in the query minus a user specified \code{delayThreshold}. Due to the cost
#' of coordinating this value across partitions, the actual watermark used is only guaranteed
#' to be at least \code{delayThreshold} behind the actual event time. In some cases we may still
#' process records that arrive more than \code{delayThreshold} late.
#'
#' @param x a streaming SparkDataFrame
#' @param eventTime a string specifying the name of the Column that contains the event time of the
#' row.
#' @param delayThreshold a string specifying the minimum delay to wait to data to arrive late,
#' relative to the latest record that has been processed in the form of an
#' interval (e.g. "1 minute" or "5 hours"). NOTE: This should not be negative.
#' @return a SparkDataFrame.
#' @aliases withWatermark,SparkDataFrame,character,character-method
#' @family SparkDataFrame functions
#' @rdname withWatermark
#' @name withWatermark
#' @export
#' @examples
#' \dontrun{
#' sparkR.session()
#' schema <- structType(structField("time", "timestamp"), structField("value", "double"))
#' df <- read.stream("json", path = jsonDir, schema = schema, maxFilesPerTrigger = 1)
#' df <- withWatermark(df, "time", "10 minutes")
#' }
#' @note withWatermark since 2.3.0
setMethod("withWatermark",
signature(x = "SparkDataFrame", eventTime = "character", delayThreshold = "character"),
function(x, eventTime, delayThreshold) {
sdf <- callJMethod(x@sdf, "withWatermark", eventTime, delayThreshold)
dataFrame(sdf)
})
4 changes: 3 additions & 1 deletion R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -727,7 +727,9 @@ read.jdbc <- function(url, tableName,
#' @param schema The data schema defined in structType or a DDL-formatted string, this is
#' required for file-based streaming data source
#' @param ... additional external data source specific named options, for instance \code{path} for
#' file-based streaming data source
#' file-based streaming data source. \code{timeZone} to indicate a timezone to be used to
#' parse timestamps in the JSON/CSV data sources or partition values; If it isn't set, it
#' uses the default value, session local timezone.
#' @return SparkDataFrame
#' @rdname read.stream
#' @name read.stream
Expand Down
6 changes: 6 additions & 0 deletions R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,12 @@ setGeneric("withColumn", function(x, colName, col) { standardGeneric("withColumn
setGeneric("withColumnRenamed",
function(x, existingCol, newCol) { standardGeneric("withColumnRenamed") })

#' @rdname withWatermark
#' @export
setGeneric("withWatermark", function(x, eventTime, delayThreshold) {
standardGeneric("withWatermark")
})

#' @rdname write.df
#' @export
setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.df") })
Expand Down
107 changes: 107 additions & 0 deletions R/pkg/tests/fulltests/test_streaming.R
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,113 @@ test_that("Terminated by error", {
stopQuery(q)
})

test_that("PartitionBy", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
checkpointPath <- tempfile(pattern = "sparkr-test", fileext = ".checkpoint")
textPath <- tempfile(pattern = "sparkr-test", fileext = ".text")
df <- read.df(jsonPath, "json", stringSchema)
write.df(df, parquetPath, "parquet", "overwrite")

df <- read.stream(path = parquetPath, schema = stringSchema)

expect_error(write.stream(df, "json", path = textPath, checkpointLocation = "append",
partitionBy = c(1, 2)),
"All partitionBy column names should be characters")

q <- write.stream(df, "json", path = textPath, checkpointLocation = "append",
partitionBy = "name")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

dirs <- list.files(textPath)
expect_equal(length(dirs[substring(dirs, 1, nchar("name=")) == "name="]), 3)

unlink(checkpointPath)
unlink(textPath)
unlink(parquetPath)
})

test_that("Watermark", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
schema <- structType(structField("value", "string"))
t <- Sys.time()
df <- as.DataFrame(lapply(list(t), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
df <- read.stream(path = parquetPath, schema = "value STRING")
df <- withColumn(df, "eventTime", cast(df$value, "timestamp"))
df <- withWatermark(df, "eventTime", "10 seconds")
counts <- count(group_by(df, "eventTime"))
q <- write.stream(counts, "memory", queryName = "times", outputMode = "append")

# first events
df <- as.DataFrame(lapply(list(t + 1, t, t + 2), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# advance watermark to 15
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# old events, should be dropped
df <- as.DataFrame(lapply(list(t), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

# evict events less than previous watermark
df <- as.DataFrame(lapply(list(t + 25), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

times <- collect(sql("SELECT * FROM times"))
# looks like write timing can affect the first bucket; but it should be t
expect_equal(times[order(times$eventTime),][1, 2], 2)

stopQuery(q)
unlink(parquetPath)
})

test_that("Trigger", {
parquetPath <- tempfile(pattern = "sparkr-test", fileext = ".parquet")
schema <- structType(structField("value", "string"))
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
df <- read.stream(path = parquetPath, schema = "value STRING")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = "", trigger.once = ""), "Multiple triggers not allowed.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = ""),
"Value for trigger.processingTime must be a non-empty string.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.processingTime = "invalid"), "illegal argument")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.once = ""), "Value for trigger.once must be TRUE.")

expect_error(write.stream(df, "memory", queryName = "times", outputMode = "append",
trigger.once = FALSE), "Value for trigger.once must be TRUE.")

q <- write.stream(df, "memory", queryName = "times", outputMode = "append", trigger.once = TRUE)
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")
df <- as.DataFrame(lapply(list(Sys.time()), as.character), schema)
write.df(df, parquetPath, "parquet", "append")
awaitTermination(q, 5 * 1000)
callJMethod(q@ssq, "processAllAvailable")

expect_equal(nrow(collect(sql("SELECT * FROM times"))), 1)

stopQuery(q)
unlink(parquetPath)
})

unlink(jsonPath)
unlink(jsonPathNa)

Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,10 @@ def trigger(self, processingTime=None, once=None):
.. note:: Evolving.

:param processingTime: a processing time interval as a string, e.g. '5 seconds', '1 minute'.
Set a trigger that runs a query periodically based on the processing
time. Only one trigger can be set.
:param once: if set to True, set a trigger that processes only one batch of data in a
streaming query then terminates the query. Only one trigger can be set.

>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.streaming.Trigger

/**
* A [[Trigger]] that process only one batch of data in a streaming query then terminates
* A [[Trigger]] that processes only one batch of data in a streaming query then terminates
* the query.
*/
@Experimental
Expand Down