Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,11 @@ case class WindowExec(
*
* @param frame to evaluate. This can either be a Row or Range frame.
* @param bound with respect to the row.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add param timeZone.

* @param timeZone the session local timezone for time related calculations.
* @return a bound ordering object.
*/
private[this] def createBoundOrdering(frame: FrameType, bound: Expression): BoundOrdering = {
private[this] def createBoundOrdering(
frame: FrameType, bound: Expression, timeZone: String): BoundOrdering = {
(frame, bound) match {
case (RowFrame, CurrentRow) =>
RowBoundOrdering(0)
Expand Down Expand Up @@ -144,7 +146,7 @@ case class WindowExec(
val boundExpr = (expr.dataType, boundOffset.dataType) match {
case (DateType, IntegerType) => DateAdd(expr, boundOffset)
case (TimestampType, CalendarIntervalType) =>
TimeAdd(expr, boundOffset, Some(conf.sessionLocalTimeZone))
TimeAdd(expr, boundOffset, Some(timeZone))
case (a, b) if a== b => Add(expr, boundOffset)
}
val bound = newMutableProjection(boundExpr :: Nil, child.output)
Expand Down Expand Up @@ -197,6 +199,7 @@ case class WindowExec(

// Map the groups to a (unbound) expression and frame factory pair.
var numExpressions = 0
val timeZone = conf.sessionLocalTimeZone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: should we also touch up the use of sqlContext.conf in doExecute()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that one is OK because it's not in a closure and passed to executors.

framedFunctions.toSeq.map {
case (key, (expressions, functionSeq)) =>
val ordinal = numExpressions
Expand Down Expand Up @@ -237,7 +240,7 @@ case class WindowExec(
new UnboundedPrecedingWindowFunctionFrame(
target,
processor,
createBoundOrdering(frameType, upper))
createBoundOrdering(frameType, upper, timeZone))
}

// Shrinking Frame.
Expand All @@ -246,7 +249,7 @@ case class WindowExec(
new UnboundedFollowingWindowFunctionFrame(
target,
processor,
createBoundOrdering(frameType, lower))
createBoundOrdering(frameType, lower, timeZone))
}

// Moving Frame.
Expand All @@ -255,8 +258,8 @@ case class WindowExec(
new SlidingWindowFunctionFrame(
target,
processor,
createBoundOrdering(frameType, lower),
createBoundOrdering(frameType, upper))
createBoundOrdering(frameType, lower, timeZone),
createBoundOrdering(frameType, upper, timeZone))
}
}

Expand Down