-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-48755][SS][PYTHON] transformWithState pyspark base implementation and ValueState support #47133
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-48755][SS][PYTHON] transformWithState pyspark base implementation and ValueState support #47133
Changes from 1 commit
b1175e4
0a98ed8
8e2b193
16e4c17
92ef716
c3eaf38
609d94e
a27f9d9
684939b
7f65fbd
c25d7da
9c8c616
c641192
8d3da4e
cc9bf95
f7df2dc
27cd169
3b5b3e5
5d910d8
df859ab
654f2f6
38832a6
0585ac0
0ee5029
6232c81
41f8234
d57633f
df9ea9e
68f7a7e
ca5216b
c9e3a7c
2320805
6e5de2e
200ec5e
dd3e46b
e8360d4
82983af
49dbc16
d4e04ea
e108f60
bae26c2
d96fa9e
92531db
d507793
5dcb4c8
37be02a
f63687f
263c087
c7b0a4f
c80b292
81276f3
5886b5c
23e54b4
2ba4fd0
2a9c20b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
- Loading branch information
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -34,24 +34,23 @@ import org.apache.spark.sql.streaming.ValueState | |
| import org.apache.spark.sql.types.StructType | ||
|
|
||
| /** | ||
| * This class is used to handle the state requests from the Python side. | ||
| * This class is used to handle the state requests from the Python side. It processes following | ||
|
||
| * state requests and return responses to the Python side: | ||
| * - Requests for managing explicit grouping key. | ||
| * - Stateful processor requests. | ||
| * - Requests for managing state variables (e.g. valueState). | ||
| */ | ||
| class TransformWithStateInPandasStateServer( | ||
bogao007 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| private val stateServerSocket: ServerSocket, | ||
|
||
| private val statefulProcessorHandle: StatefulProcessorHandleImpl, | ||
| private val groupingKeySchema: StructType) | ||
| extends Runnable | ||
| with Logging{ | ||
|
|
||
| extends Runnable with Logging { | ||
| private var inputStream: DataInputStream = _ | ||
| private var outputStream: DataOutputStream = _ | ||
|
|
||
| private val valueStates = mutable.HashMap[String, ValueState[Row]]() | ||
|
|
||
| def run(): Unit = { | ||
| val listeningSocket = stateServerSocket.accept() | ||
| logWarning(s"listening on socket - ${listeningSocket.getLocalAddress}") | ||
|
|
||
| inputStream = new DataInputStream( | ||
| new BufferedInputStream(listeningSocket.getInputStream)) | ||
| outputStream = new DataOutputStream( | ||
|
|
@@ -60,21 +59,15 @@ class TransformWithStateInPandasStateServer( | |
|
|
||
| while (listeningSocket.isConnected && | ||
| statefulProcessorHandle.getHandleState != StatefulProcessorHandleState.CLOSED) { | ||
|
|
||
| try { | ||
| val version = inputStream.readInt() | ||
|
|
||
| if (version != -1) { | ||
| assert(version == 0) | ||
| val messageLen = inputStream.readInt() | ||
|
|
||
| val messageBytes = new Array[Byte](messageLen) | ||
| inputStream.read(messageBytes) | ||
|
|
||
| val message = StateRequest.parseFrom(ByteString.copyFrom(messageBytes)) | ||
|
|
||
| handleRequest(message) | ||
|
|
||
| outputStream.flush() | ||
| } | ||
| } catch { | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.