Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
eabb65b
WIP nothing worked, just recording the progress
HeartSaVioR Sep 4, 2018
c3076d2
WIP not working yet... lots of implementations needed
HeartSaVioR Sep 6, 2018
9d59c7a
WIP Finished implementing UpdatingSessionIterator
HeartSaVioR Sep 6, 2018
b38f2b9
WIP add verification on precondition "rows in iterator are sorted by …
HeartSaVioR Sep 7, 2018
668c1f5
Rename SymmetricHashJoinStateManager to MultiValuesStateManager
HeartSaVioR Sep 8, 2018
9f63a3c
Move package of UpdatingSessionIterator
HeartSaVioR Sep 8, 2018
5d17ac8
WIP add MergingSortWithMultiValuesStateIterator, now integrating with…
HeartSaVioR Sep 10, 2018
ec33265
WIP the first version of working one! Still have lots of TODOs and FI…
HeartSaVioR Sep 13, 2018
8b210d5
Add more explanations
HeartSaVioR Sep 13, 2018
7255bca
Silly bugfix & block session window for batch query as of now
HeartSaVioR Sep 13, 2018
7b57fe5
More works: majorly split out updating session to individual physical…
HeartSaVioR Sep 13, 2018
969859b
Fix a silly bug and also add check for session window against batch q…
HeartSaVioR Sep 13, 2018
f5ecbdd
WIP Fixed eviction on update mode
HeartSaVioR Sep 13, 2018
b180772
WIP found root reason of broken UT... fixed it
HeartSaVioR Sep 13, 2018
0a2d731
WIP remove printing "explain" on UTs
HeartSaVioR Sep 13, 2018
6ee901e
WIP address session to batch query (+ python) as well... not having t…
HeartSaVioR Sep 13, 2018
395606b
WIP add more test on session batch query
HeartSaVioR Sep 13, 2018
f6bb34d
WIP add UT for sessions with keys overlapped
HeartSaVioR Sep 14, 2018
22fffd2
WIP refactor a bit
HeartSaVioR Sep 14, 2018
847f69e
WIP add more FIXMEs for javadoc, and remove invalid FIXMEs
HeartSaVioR Sep 14, 2018
104df13
WIP Repackage & remove unnecessary field
HeartSaVioR Sep 17, 2018
8108fc5
WIP addressed UPDATE mode, but doesn't look like performant
HeartSaVioR Sep 17, 2018
35c8fef
WIP remove FIXME since it is not relevant
HeartSaVioR Sep 17, 2018
6b1d1e0
WIP update numOutputRows for Append mode
HeartSaVioR Sep 17, 2018
86b3060
WIP apply aggregations when merging sessions
HeartSaVioR Sep 18, 2018
f7c2deb
WIP simplify the code a bit
HeartSaVioR Sep 18, 2018
a81616b
WIP address batch distinct query for sessionization
HeartSaVioR Sep 18, 2018
013785d
WIP remove debug statements for test code
HeartSaVioR Sep 18, 2018
37fffef
WIP remove debug informations
HeartSaVioR Sep 18, 2018
df95e72
WIP port Sessionization example to UT of session window
HeartSaVioR Sep 18, 2018
16d6421
WIP remove unnecessary thing
HeartSaVioR Sep 18, 2018
dc43300
WIP fix all the issues with sessionization example UTs
HeartSaVioR Sep 19, 2018
3637f60
WIP apply merging session in each partition before shuffling
HeartSaVioR Sep 19, 2018
0d53831
Fix scala checkstyle
HeartSaVioR Sep 20, 2018
a781400
Fix python style check
HeartSaVioR Sep 20, 2018
918dad2
WIP add complete mode, fix tricky bugs, apply ExternalAppendOnlyUnsaf…
HeartSaVioR Oct 8, 2018
fd6377b
WIP add "session" function to exclude list for description
HeartSaVioR Oct 8, 2018
e029e12
WIP rename function & column name "session" to "session_window"
HeartSaVioR Oct 10, 2018
a2fc652
WIP reducing unnecessary codegen which seriously harmed performance
HeartSaVioR Oct 15, 2018
2dc413b
WIP reduce codegen once again for MergingSessionsIterator
HeartSaVioR Oct 15, 2018
4dd0e89
WIP optimize a bit more on codegen...
HeartSaVioR Oct 15, 2018
cf52044
WIP make the feature "merge session in local partition" optional
HeartSaVioR Oct 15, 2018
5c74609
WIP add "session_window" to exclude list
HeartSaVioR Oct 17, 2018
fb6c59f
WIP Enable versioning of session window state format
HeartSaVioR Oct 18, 2018
dd29af2
WIP some correction in comment
HeartSaVioR Oct 19, 2018
1f6e496
WIP cover all the cases for session window in UTs
HeartSaVioR Oct 20, 2018
0673b6e
WIP Add Linked List data structure for storing session windows
HeartSaVioR Oct 23, 2018
4698f6d
WIP add SessionWindowLinkedListStateStoreRDD
HeartSaVioR Oct 23, 2018
5c67f72
WIP add more functionalities to SessionWindowLinkedListState
HeartSaVioR Oct 24, 2018
7bb0060
WIP it works but a bit suboptimal
HeartSaVioR Oct 25, 2018
35c9712
WIP optimized!
HeartSaVioR Oct 25, 2018
ede078a
WIP remove requirement on sort, add UT to test linked list state with…
HeartSaVioR Oct 27, 2018
f8e8ff6
WIP add code to print out information when task crashes with dangling…
HeartSaVioR Oct 27, 2018
b05abc7
WIP fixed the issue with benchmark run
HeartSaVioR Oct 29, 2018
17570f2
WIP optimize a bit on storing new sessions
HeartSaVioR Oct 30, 2018
958de31
WIP Fixed critical bug which tasks don't respect preference on state …
HeartSaVioR Oct 31, 2018
ee67bca
WIP Fix critical perf. issue: remove codegen on generating session ro…
HeartSaVioR Nov 1, 2018
8a0331e
WIP Rolling back unnecessary changes
HeartSaVioR Nov 2, 2018
b6ccecd
WIP Apply removing codegen to UpdatingSessionIterator as well
HeartSaVioR Nov 2, 2018
75c7611
WIP remove state version for now: it will be reintroduced when actual…
HeartSaVioR Nov 2, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP fixed the issue with benchmark run
  • Loading branch information
HeartSaVioR committed Oct 29, 2018
commit b05abc71a103662448db649fc215dd17a2df3c11
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection, UnsafeRow}
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeProjection
import org.apache.spark.sql.catalyst.util.TypeUtils
import org.apache.spark.sql.execution.streaming.state.SessionWindowLinkedListState
Expand Down Expand Up @@ -115,30 +115,13 @@ class MergingSortWithSessionWindowLinkedListStateIterator(
val stateSessionsEnclosingCurrentRow = findSessionPointerEnclosingEvent(currentRow,
startPointer = lastCheckpointOnStateRows)

// FIXME: debugging...
def loadSession(
keys: UnsafeRow,
sessionStart: Long,
stateSessionsEnclosingCurrentRow: Option[(Option[Long], Option[Long])])
: UnsafeRow = {
val sessionState = state.get(currentRow.keys, sessionStart)
require(sessionState != null,
s"Session must be presented in state: it may represent dangling pointer - " +
s"$sessionStart / key: $keys / currentRow: $currentRow" +
s"sessionsEnclosingCurrentRow: $stateSessionsEnclosingCurrentRow" +
s"Pointers: ${state.iteratePointers(currentRow.keys).toList}" +
s"Values: ${state.get(currentRow.keys).toList}")
sessionState
}

var prevSessionToEmit: Option[SessionRowInformation] = None
stateSessionsEnclosingCurrentRow match {
case None =>
case Some(x) =>
x._1 match {
case Some(prev) =>
val prevSession = SessionRowInformation.of(
loadSession(currentRow.keys, prev, stateSessionsEnclosingCurrentRow))
val prevSession = SessionRowInformation.of(state.get(currentRow.keys, prev))

val sessionLaterThanCheckpoint = lastCheckpointOnStateRows match {
case Some(lastCheckpoint) => lastCheckpoint < prevSession.sessionStart
Expand All @@ -161,8 +144,7 @@ class MergingSortWithSessionWindowLinkedListStateIterator(

x._2 match {
case Some(next) =>
val nextSession = SessionRowInformation.of(
loadSession(currentRow.keys, next, stateSessionsEnclosingCurrentRow))
val nextSession = SessionRowInformation.of(state.get(currentRow.keys, next))

val sessionLaterThanCheckpoint = lastCheckpointOnStateRows match {
case Some(lastCheckpoint) => lastCheckpoint < nextSession.sessionStart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,6 @@ class SessionWindowLinkedListState(
def update(key: UnsafeRow, sessionStart: Long, newValue: UnsafeRow): Unit = {
val targetPointer = keyAndSessionStartToPointerStore.get(key, sessionStart)
assertValidPointer(targetPointer)

keyAndSessionStartToValueStore.put(key, sessionStart, newValue)
}

Expand Down Expand Up @@ -243,6 +242,9 @@ class SessionWindowLinkedListState(
val prevOption = targetPointer._1
val nextOption = targetPointer._2

keyAndSessionStartToPointerStore.remove(key, sessionStart)
keyAndSessionStartToValueStore.remove(key, sessionStart)

targetPointer match {
case (Some(prev), Some(next)) =>
keyAndSessionStartToPointerStore.updateNext(key, prev, nextOption)
Expand All @@ -260,11 +262,10 @@ class SessionWindowLinkedListState(
throw new IllegalStateException("The element has pointer information for head, " +
"but the list has different head.")
}
keyAndSessionStartToPointerStore.remove(key, sessionStart)

keyToHeadSessionStartStore.remove(key)
}

keyAndSessionStartToValueStore.remove(key, sessionStart)
}

def removeByValueCondition(removalCondition: UnsafeRow => Boolean,
Expand Down Expand Up @@ -454,15 +455,15 @@ class SessionWindowLinkedListState(
)
}

private[state] def getIteratorOfHeadPointers: Iterator[KeyAndHeadSessionStart] = {
private[sql] def getIteratorOfHeadPointers: Iterator[KeyAndHeadSessionStart] = {
keyToHeadSessionStartStore.iterator
}

private[state] def getIteratorOfRawPointers: Iterator[KeyWithSessionStartAndPointers] = {
private[sql] def getIteratorOfRawPointers: Iterator[KeyWithSessionStartAndPointers] = {
keyAndSessionStartToPointerStore.iterator
}

private[state] def getIteratorOfRawValues: Iterator[KeyWithSessionStartAndValue] = {
private[sql] def getIteratorOfRawValues: Iterator[KeyWithSessionStartAndValue] = {
keyAndSessionStartToValueStore.iterator
}

Expand Down Expand Up @@ -646,7 +647,7 @@ class SessionWindowLinkedListState(

def updatePrev(key: UnsafeRow, sessionStart: Long, prevSessionStart: Option[Long]): Unit = {
val actualKeyRow = keyWithSessionStartRow(key, sessionStart)
val row = stateStore.get(actualKeyRow)
val row = stateStore.get(actualKeyRow).copy()
setPrevSessionStart(row, prevSessionStart)
stateStore.put(actualKeyRow, row)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,6 @@ class MergingSortWithSessionWindowLinkedListStateIteratorSuite extends SharedSQL
}

private def assertRowsEquals(expectedRow: InternalRow, retRow: InternalRow): Unit = {

val tupleFromExpectedRow = getTupleFromRow(expectedRow)
val tupleFromInternalRow = getTupleFromRow(retRow)
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,200 +217,6 @@ class SessionWindowLinkedListStateSuite extends StreamTest {
removeByWatermarkTest(stopOnConditionMismatch = false)
}

test("run chaos monkey") {

// FIXME: too many args
def printFailureInformation(
ex: TestFailedException,
state: SessionWindowLinkedListState,
operation: Int,
addBefore: Boolean,
opIdx: Int,
targetIdx: Int,
key: UnsafeRow,
headPointersBeforeOp: List[(Int, Long)],
rawPointersBeforeOp: List[(Int, Long, Option[Long], Option[Long])],
pointersBeforeOp: List[Long],
valuesBeforeOp: List[(Int, Int)],
refListBeforeOp: java.util.LinkedList[String],
refList: java.util.LinkedList[String]): Unit = {
logError("Assertion failure!", ex)

logError("===== Operation information =====")
val opString = operation match {
case 0 => "Append"
case 1 => "Remove"
case 2 => "RemoveValuesByCondition"
case _ => throw new IllegalStateException(s"Unknown operation $operation")
}
val addPositionString = if (addBefore) "AddBefore" else "AddAfter"
logError(s"Operation Index: $opIdx")
logError(s"Operation: $opString")
logError(s"Position to add: $addPositionString")
logError(s"Target index: $targetIdx")

logError("===== Before applying operation =====")
logError(s"Head pointers in state: $headPointersBeforeOp")
logError(s"Raw pointers in state: $rawPointersBeforeOp")
logError(s"Pointers in state via iteratePointers: $pointersBeforeOp")
logError(s"Values in state: $valuesBeforeOp")
logError(s"Values in reference list: $refListBeforeOp")

logError("===== After applying operation =====")

val headPointers = state.getIteratorOfHeadPointers.map { pair =>
(toKeyInt(pair.key), pair.sessionStart)
}.toList
val pointers = state.iteratePointers(key).map(_._1).toList
val rawPointers = state.getIteratorOfRawPointers.map { pointer =>
(toKeyInt(pointer.key), pointer.sessionStart, pointer.prevSessionStart,
pointer.nextSessionStart)
}.toList
val values = state.getIteratorOfRawValues.map { value =>
(toKeyInt(value.key), toValueInt(value.value))
}.toList

logError(s"Head pointers in state: $headPointers")
logError(s"Raw pointers in state: $rawPointers")
logError(s"Pointers in state via iteratePointers: $pointers")
logError(s"Values in state: $values")
logError(s"Values in reference list: $refList")
}

withSessionWindowLinkedListState(inputValueAttribs, keyExprs) { state =>
implicit val st = state

assert(numRows === 0)

val rand = new Random()

val keys = (0 to 2).map(id => toKeyRow(id).copy())
// using String type to avoid confusion in remove(int) vs remove(Object)
// which LinkedList[Integer] will be remove(int) vs remove(Integer)
val refLists = keys.map(_ => new java.util.LinkedList[String]())

val maxOperations = 100000
(0 until maxOperations).foreach { opIdx =>

val selectedKeyIdx = rand.nextInt(keys.length)
val selectedKey = keys(selectedKeyIdx)
val selectedRefList = refLists(selectedKeyIdx)

// 0: append, 1: remove, 2: removeValueByCondition
val operation = rand.nextInt(3)
val addBefore = rand.nextBoolean()
val targetIdx = if (selectedRefList.isEmpty) -1 else rand.nextInt(selectedRefList.size())

val headPointersBeforeOp = state.getIteratorOfHeadPointers.map { pair =>
(toKeyInt(pair.key), pair.sessionStart)
}.toList
val pointersBeforeOp = state.iteratePointers(selectedKey).map(_._1).toList
val rawPointersBeforeOp = state.getIteratorOfRawPointers.map { pointer =>
(toKeyInt(pointer.key), pointer.sessionStart, pointer.prevSessionStart,
pointer.nextSessionStart)
}.toList
val valuesBeforeOp = state.getIteratorOfRawValues.map { value =>
(toKeyInt(value.key), toValueInt(value.value))
}.toList

val refListBeforeOp = new java.util.LinkedList[String](refLists(selectedKeyIdx))

operation match {
case 0 =>
if (selectedRefList.isEmpty) {
assert(state.isEmpty(selectedKey))
state.setHead(selectedKey, opIdx, toInputValue(opIdx))
selectedRefList.addFirst(String.valueOf(opIdx))
} else {
val addBefore = rand.nextBoolean()
if (addBefore) {
val idxToAddBefore = selectedRefList.get(targetIdx)
selectedRefList.add(targetIdx, String.valueOf(opIdx))
state.addBefore(selectedKey, opIdx, toInputValue(opIdx), idxToAddBefore.toInt)
} else {
val idxToAddAfter = selectedRefList.get(targetIdx)
selectedRefList.add(targetIdx + 1, String.valueOf(opIdx))
state.addAfter(selectedKey, opIdx, toInputValue(opIdx), idxToAddAfter.toInt)
}
}

case 1 =>
if (selectedRefList.isEmpty) {
assert(state.isEmpty(selectedKey))
// skip removing
} else {
val pointerToRemove = selectedRefList.get(targetIdx)
selectedRefList.remove(targetIdx)
state.remove(selectedKey, pointerToRemove.toInt)
}

case 2 =>
if (selectedRefList.isEmpty) {
assert(state.isEmpty(selectedKey))
// skip removing
} else {
val pointerToRemove = selectedRefList.get(targetIdx)
val removedIter = state.removeByValueCondition { r =>
toValueInt(r) <= pointerToRemove.toInt
}

val valuesFromRef = new scala.collection.mutable.MutableList[Int]()
refLists.foreach { refList =>
val refIter = refList.iterator()
while (refIter.hasNext) {
val ref = refIter.next()
if (ref.toInt <= pointerToRemove.toInt) {
valuesFromRef += ref.toInt
refIter.remove()
}
}
}

try {
assert(removedIter.map(pair => toValueInt(pair.value)).toSet ==
valuesFromRef.toSet)
} catch {
case ex: TestFailedException =>
printFailureInformation(ex, state, operation, addBefore, opIdx, targetIdx,
selectedKey, headPointersBeforeOp, rawPointersBeforeOp, pointersBeforeOp,
valuesBeforeOp, refListBeforeOp, selectedRefList)

throw ex
}
}
}

keys.indices.foreach { index =>
val key = keys(index)
val refList = refLists(index)

try {
if (refList.isEmpty) {
assert(state.isEmpty(key), s"Reference list is empty but " +
s"state list for $key is not empty")
} else {
import scala.collection.JavaConverters._
val statePointers = state.iteratePointers(key).map(_._1).toList
assert(refList.asScala.map(_.toInt) === statePointers,
s"State pointers for $key is expected to be $refList but $statePointers")

val stateValues = state.get(key).map(toValueInt).toList
assert(refList.asScala.map(_.toInt) ===
stateValues, s"State list for $key is expected to be $refList but $stateValues")
}
} catch {
case ex: TestFailedException =>
printFailureInformation(ex, state, operation, addBefore, opIdx, targetIdx,
selectedKey, headPointersBeforeOp, rawPointersBeforeOp, pointersBeforeOp,
valuesBeforeOp, refListBeforeOp, selectedRefList)

throw ex
}
}
}
}
}

private def removeByWatermarkTest(stopOnConditionMismatch: Boolean): Unit = {
withSessionWindowLinkedListState(inputValueAttribs, keyExprs) { state =>
implicit val st = state
Expand Down