Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
* --------------------------------------------------------------
* Both, `mapGroupsWithState` and `flatMapGroupsWithState` in `KeyValueGroupedDataset`
* will invoke the user-given function on each group (defined by the grouping function in
* `Dataset.groupByKey()`) while maintaining user-defined per-group state between invocations.
* `Dataset.groupByKey()`) while maintaining a user-defined per-group state between invocations.
* For a static batch Dataset, the function will be invoked once per group. For a streaming
* Dataset, the function will be invoked for each group repeatedly in every trigger.
* That is, in every batch of the `StreamingQuery`,
* the function will be invoked once for each group that has data in the trigger. Furthermore,
* if timeout is set, then the function will invoked on timed out groups (more detail below).
* if timeout is set, then the function will be invoked on timed-out groups (more detail below).
*
* The function is invoked with following parameters.
* The function is invoked with the following parameters.
* - The key of the group.
* - An iterator containing all the values for this group.
* - A user-defined state object set by previous invocations of the given function.
*
* In case of a batch Dataset, there is only one invocation and state object will be empty as
* In case of a batch Dataset, there is only one invocation and the state object will be empty as
* there is no prior state. Essentially, for batch Datasets, `[map/flatMap]GroupsWithState`
* is equivalent to `[map/flatMap]Groups` and any updates to the state and/or timeouts have
* no effect.
Expand Down Expand Up @@ -85,31 +85,31 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalGroupState
* - With `ProcessingTimeTimeout`, the timeout duration can be set by calling
* `GroupState.setTimeoutDuration`. The timeout will occur when the clock has advanced by the set
* duration. Guarantees provided by this timeout with a duration of D ms are as follows:
* - Timeout will never be occur before the clock time has advanced by D ms
* - Timeout will never occur before the clock time has advanced by D ms
* - Timeout will occur eventually when there is a trigger in the query
* (i.e. after D ms). So there is a no strict upper bound on when the timeout would occur.
* (i.e. after D ms). So there is no strict upper bound on when the timeout would occur.
* For example, the trigger interval of the query will affect when the timeout actually occurs.
* If there is no data in the stream (for any group) for a while, then their will not be
* If there is no data in the stream (for any group) for a while, then there will not be
* any trigger and timeout function call will not occur until there is data.
* - Since the processing time timeout is based on the clock time, it is affected by the
* variations in the system clock (i.e. time zone changes, clock skew, etc.).
* - With `EventTimeTimeout`, the user also has to specify the event time watermark in
* the query using `Dataset.withWatermark()`. With this setting, data that is older than the
* watermark are filtered out. The timeout can be set for a group by setting a timeout timestamp
* watermark is filtered out. The timeout can be set for a group by setting a timeout timestamp
* using`GroupState.setTimeoutTimestamp()`, and the timeout would occur when the watermark
* advances beyond the set timestamp. You can control the timeout delay by two parameters -
* (i) watermark delay and an additional duration beyond the timestamp in the event (which
* is guaranteed to be newer than watermark due to the filtering). Guarantees provided by this
* timeout are as follows:
* - Timeout will never be occur before watermark has exceeded the set timeout.
* - Similar to processing time timeouts, there is a no strict upper bound on the delay when
* - Timeout will never occur before the watermark has exceeded the set timeout.
* - Similar to processing time timeouts, there is no strict upper bound on the delay when
* the timeout actually occurs. The watermark can advance only when there is data in the
* stream, and the event time of the data has actually advanced.
* stream and the event time of the data has actually advanced.
* - When the timeout occurs for a group, the function is called for that group with no values, and
* `GroupState.hasTimedOut()` set to true.
* - The timeout is reset every time the function is called on a group, that is,
* when the group has new data, or the group has timed out. So the user has to set the timeout
* duration every time the function is called, otherwise there will not be any timeout set.
* duration every time the function is called, otherwise, there will not be any timeout set.
*
* Scala example of using GroupState in `mapGroupsWithState`:
* {{{
Expand Down