Skip to content

Commit dcee7a6

Browse files
committed
[SPARK-34892][SS] Introduce MergingSortWithSessionWindowStateIterator sorting input rows and rows in state efficiently
Introduction: this PR is a part of SPARK-10816 (EventTime based sessionization (session window)). Please refer #31937 to see the overall view of the code change. (Note that code diff could be diverged a bit.) ### What changes were proposed in this pull request? This PR introduces MergingSortWithSessionWindowStateIterator, which does "merge sort" between input rows and sessions in state based on group key and session's start time. Note that the iterator does merge sort among input rows and sessions grouped by grouping key. The iterator doesn't provide sessions in state which keys don't exist in input rows. For input rows, the iterator will provide all rows regardless of the existence of matching sessions in state. MergingSortWithSessionWindowStateIterator works on the precondition that given iterator is sorted by "group keys + start time of session window", and the iterator still retains the characteristic of the sort. ### Why are the changes needed? This part is a one of required on implementing SPARK-10816. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New UT added. Closes #33077 from HeartSaVioR/SPARK-34892-SPARK-10816-PR-31570-part-4. Authored-by: Jungtaek Lim <[email protected]> Signed-off-by: Jungtaek Lim <[email protected]> (cherry picked from commit 12a576f) Signed-off-by: Jungtaek Lim <[email protected]>
1 parent 5bc06fd commit dcee7a6

File tree

3 files changed

+400
-1
lines changed

3 files changed

+400
-1
lines changed
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.streaming
19+
20+
import org.apache.spark.internal.Logging
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
23+
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
24+
import org.apache.spark.sql.execution.streaming.state.{ReadStateStore, StreamingSessionWindowStateManager}
25+
26+
/**
27+
* This class technically does the merge sort between input rows and existing sessions in state,
28+
* to optimize the cost of sort on "input rows + existing sessions". This is based on the
29+
* precondition that input rows are sorted by "group keys + start time of session window".
30+
*
31+
* This only materializes the existing sessions into memory, which are tend to be not many per
32+
* group key. The cost of sorting existing sessions would be also minor based on the assumption.
33+
*
34+
* The output rows are sorted with "group keys + start time of session window", which is same as
35+
* the sort condition on input rows.
36+
*/
37+
class MergingSortWithSessionWindowStateIterator(
38+
iter: Iterator[InternalRow],
39+
stateManager: StreamingSessionWindowStateManager,
40+
store: ReadStateStore,
41+
groupWithoutSessionExpressions: Seq[Attribute],
42+
sessionExpression: Attribute,
43+
inputSchema: Seq[Attribute]) extends Iterator[InternalRow] with Logging {
44+
45+
private val keysProjection: UnsafeProjection = GenerateUnsafeProjection.generate(
46+
groupWithoutSessionExpressions, inputSchema)
47+
private val sessionProjection: UnsafeProjection =
48+
GenerateUnsafeProjection.generate(Seq(sessionExpression), inputSchema)
49+
50+
private case class SessionRowInformation(
51+
keys: UnsafeRow,
52+
sessionStart: Long,
53+
sessionEnd: Long,
54+
row: InternalRow)
55+
56+
private object SessionRowInformation {
57+
def of(row: InternalRow): SessionRowInformation = {
58+
val keys = keysProjection(row).copy()
59+
val session = sessionProjection(row).copy()
60+
val sessionRow = session.getStruct(0, 2)
61+
val sessionStart = sessionRow.getLong(0)
62+
val sessionEnd = sessionRow.getLong(1)
63+
64+
SessionRowInformation(keys, sessionStart, sessionEnd, row)
65+
}
66+
}
67+
68+
// Holds the latest fetched row from input side iterator.
69+
private var currentRowFromInput: SessionRowInformation = _
70+
71+
// Holds the latest fetched row from state side iterator.
72+
private var currentRowFromState: SessionRowInformation = _
73+
74+
// Holds the iterator of rows (sessions) in state for the session key.
75+
private var sessionIterFromState: Iterator[InternalRow] = _
76+
77+
// Holds the current session key.
78+
private var currentSessionKey: UnsafeRow = _
79+
80+
override def hasNext: Boolean = {
81+
currentRowFromInput != null || currentRowFromState != null ||
82+
(sessionIterFromState != null && sessionIterFromState.hasNext) || iter.hasNext
83+
}
84+
85+
override def next(): InternalRow = {
86+
if (currentRowFromInput == null) {
87+
mayFillCurrentRow()
88+
}
89+
90+
if (currentRowFromState == null) {
91+
mayFillCurrentStateRow()
92+
}
93+
94+
if (currentRowFromInput == null && currentRowFromState == null) {
95+
throw new IllegalStateException("No Row to provide in next() which should not happen!")
96+
}
97+
98+
// return current row vs current state row, should return smaller key, earlier session start
99+
val returnCurrentRow: Boolean = {
100+
if (currentRowFromInput == null) {
101+
false
102+
} else if (currentRowFromState == null) {
103+
true
104+
} else {
105+
// compare
106+
if (currentRowFromInput.keys != currentRowFromState.keys) {
107+
// state row cannot advance to row in input, so state row should be lower
108+
false
109+
} else {
110+
currentRowFromInput.sessionStart < currentRowFromState.sessionStart
111+
}
112+
}
113+
}
114+
115+
val ret: SessionRowInformation = {
116+
if (returnCurrentRow) {
117+
val toRet = currentRowFromInput
118+
currentRowFromInput = null
119+
toRet
120+
} else {
121+
val toRet = currentRowFromState
122+
currentRowFromState = null
123+
toRet
124+
}
125+
}
126+
127+
ret.row
128+
}
129+
130+
private def mayFillCurrentRow(): Unit = {
131+
if (iter.hasNext) {
132+
currentRowFromInput = SessionRowInformation.of(iter.next())
133+
}
134+
}
135+
136+
private def mayFillCurrentStateRow(): Unit = {
137+
if (sessionIterFromState != null && sessionIterFromState.hasNext) {
138+
currentRowFromState = SessionRowInformation.of(sessionIterFromState.next())
139+
} else {
140+
sessionIterFromState = null
141+
142+
if (currentRowFromInput != null && currentRowFromInput.keys != currentSessionKey) {
143+
// We expect a small number of sessions per group key, so materializing them
144+
// and sorting wouldn't hurt much. The important thing is that we shouldn't buffer input
145+
// rows to sort with existing sessions.
146+
val unsortedIter = stateManager.getSessions(store, currentRowFromInput.keys)
147+
val unsortedList = unsortedIter.map(_.copy()).toList
148+
149+
val sortedList = unsortedList.sortWith((row1, row2) => {
150+
def getSessionStart(r: InternalRow): Long = {
151+
val session = sessionProjection(r)
152+
val sessionRow = session.getStruct(0, 2)
153+
sessionRow.getLong(0)
154+
}
155+
156+
// here sorting is based on the fact that keys are same
157+
getSessionStart(row1).compareTo(getSessionStart(row2)) < 0
158+
})
159+
sessionIterFromState = sortedList.iterator
160+
161+
currentSessionKey = currentRowFromInput.keys
162+
if (sessionIterFromState.hasNext) {
163+
currentRowFromState = SessionRowInformation.of(sessionIterFromState.next())
164+
}
165+
}
166+
}
167+
}
168+
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ import org.apache.spark.util.{SizeEstimator, Utils}
7070
* to ensure re-executed RDD operations re-apply updates on the correct past version of the
7171
* store.
7272
*/
73-
private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging {
73+
private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with Logging {
7474

7575
class HDFSBackedReadStateStore(val version: Long, map: HDFSBackedStateStoreMap)
7676
extends ReadStateStore {

0 commit comments

Comments
 (0)