Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
when colum is use alias ,the order by result is wrong #16890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Uh oh!
There was an error while loading. Please reload this page.
when colum is use alias ,the order by result is wrong #16890
Changes from 1 commit
18ee55d84f0b64a774bca00190056c00c06b79cc7c20e6280a83accfa23debd843ec8efee20df83dff872992a0ee7f982b4494cd9eefdf9f17ce0b5f85f296d06172b278fa1e33791a8c050c12569e506a81e336fe409f30c923182e625608ccca91064fadd640f94263d8390148a84b0bf605c039ed9fd50d12be20d9b1552e5f09b7a03fea31f9254268b4f174cdc3c2ba9fbcdabaa6113fe7aa014eb3dcad9f0c589e374e65cb772035ede6ad3df067acec4a6519c9949210ef14214a11d02c8aea745b258b8e49747249f5b0afcfd5d03bdf3eee576c1e60bd91aec9493bf27e0247c61c2acca86803c86fdd752502b0ff67a159c184ed9783388f3f73acdb691e15ef37440a4cfc3fdce8176db3940e821ecf1ddca5f6480b192afaa9965c82d47d5d0d2338451256a3a89effc2cd3dcb637045b8b2969fb49f523d31191fe2c0ba28490817a64172ff84e35c5a385d738a7ab6f921aa8c3bb1a1fe1b5ee2042ad93bcfcfc92f7c07dbe2e7b12ade075a06fbc35c0eda7ef9156d2be7425e26a4cbace112ce57d70d2081b7ad90638359ac0522f1a1f2604ee8cf5ed397bdf4a27cc5fcb7fb0985768303e201d5d2a9c86a57fbf4936820b4ca152d4f612f523fa48aafed050c20c22d4aae2f3c20b0674e7eb94f4b6b3e8980317fa7565b10ff77304267beb2270f16ff5aff5302d33021bcb2677bd6dc603fab0d627a0a630d904309a97edc2aee2bd28fd178d3d314d0e99e34d6ed285c7a7ce27b7277e073ee7398df4444e33aaa2aeb8034d4cd975266c1e75a0569cd60dde2e8d3fca0077bfc4d4d0de15627ac1aeb9f6c618ccd64cae2250a99129d9d67c1a09cd63fc8e8c40645741af0deeaf63c526287c94303f00afd6c3a0d5593f78e8afb38640dc0c5a6635dadff5fde8a03e3a43ae7d785217226d3880fbecc73881f34bc0a0e62bdbc87855a1b78f03ad504ad8224321ff95e7cd33ab88b241c4d10b0417ce83dbff9b905fdf09af8f740169360e02ac306e45b541ab97319b5e460457850e9c4405ee0eeb0f7b64f7af776e3bab9872da3626cada7aef7f48c5a5447b2b53973403b55563c733c59e8b75f8c601b9c3d22db625ad10c5a8a1398acf71c66eca21bc97f4e1671bc083755da759dc26e6a9a85b865b2fdf6c3bba21b4ba208c1972fc02ef98487902f041e553871d940e240543b4376854a30c8dcc2d5421fde5754d2359ed338f7File filter
Filter by extension
Conversations
Uh oh!
There was an error while loading. Please reload this page.
Jump to
Uh oh!
There was an error while loading. Please reload this page.
## What changes were proposed in this pull request? `mapGroupsWithState` is a new API for arbitrary stateful operations in Structured Streaming, similar to `DStream.mapWithState` *Requirements* - Users should be able to specify a function that can do the following - Access the input row corresponding to a key - Access the previous state corresponding to a key - Optionally, update or remove the state - Output any number of new rows (or none at all) *Proposed API* ``` // ------------ New methods on KeyValueGroupedDataset ------------ class KeyValueGroupedDataset[K, V] { // Scala friendly def mapGroupsWithState[S: Encoder, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => U) def flatMapGroupsWithState[S: Encode, U: Encoder](func: (K, Iterator[V], KeyedState[S]) => Iterator[U]) // Java friendly def mapGroupsWithState[S, U](func: MapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) def flatMapGroupsWithState[S, U](func: FlatMapGroupsWithStateFunction[K, V, S, R], stateEncoder: Encoder[S], resultEncoder: Encoder[U]) } // ------------------- New Java-friendly function classes ------------------- public interface MapGroupsWithStateFunction<K, V, S, R> extends Serializable { R call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } public interface FlatMapGroupsWithStateFunction<K, V, S, R> extends Serializable { Iterator<R> call(K key, Iterator<V> values, state: KeyedState<S>) throws Exception; } // ---------------------- Wrapper class for state data ---------------------- trait State[S] { def exists(): Boolean def get(): S // throws Exception is state does not exist def getOption(): Option[S] def update(newState: S): Unit def remove(): Unit // exists() will be false after this } ``` Key Semantics of the State class - The state can be null. - If the state.remove() is called, then state.exists() will return false, and getOption will returm None. - After that state.update(newState) is called, then state.exists() will return true, and getOption will return Some(...). - None of the operations are thread-safe. This is to avoid memory barriers. *Usage* ``` val stateFunc = (word: String, words: Iterator[String, runningCount: KeyedState[Long]) => { val newCount = words.size + runningCount.getOption.getOrElse(0L) runningCount.update(newCount) (word, newCount) } dataset // type is Dataset[String] .groupByKey[String](w => w) // generates KeyValueGroupedDataset[String, String] .mapGroupsWithState[Long, (String, Long)](stateFunc) // returns Dataset[(String, Long)] ``` ## How was this patch tested? New unit tests. Author: Tathagata Das <[email protected]> Closes #16758 from tdas/mapWithState.Uh oh!
There was an error while loading. Please reload this page.
There are no files selected for viewing
Uh oh!
There was an error while loading. Please reload this page.