Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Move added rule to IncrementalExecution.
  • Loading branch information
viirya committed Jul 31, 2018
commit c127053b5521bf742e5ecfb7412f87da9dbeec43
Original file line number Diff line number Diff line change
Expand Up @@ -2124,9 +2124,6 @@ class Analyzer(
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p if p.resolved => p
case p => p transformExpressionsUp {
// Produces a placeholder random seed for streaming query, the real random seed
// is given at the beginning of Optimizer.
case Uuid(None) if p.isStreaming => Uuid(Some(-1))
case Uuid(None) => Uuid(Some(random.nextLong()))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.optimizer

import scala.collection.mutable
import scala.util.Random

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis._
Expand Down Expand Up @@ -110,7 +109,6 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
EliminateSubqueryAliases,
EliminateView,
ReplaceExpressions,
ResolvedUuidExpressionsForStreaming,
ComputeCurrentTime,
GetCurrentDatabase(sessionCatalog),
RewriteDistinctAggregates,
Expand Down Expand Up @@ -1448,17 +1446,3 @@ object UpdateNullabilityInAttributeReferences extends Rule[LogicalPlan] {
}
}
}

/**
* Set the seed for random number generation in Uuid expressions for streaming query.
*/
object ResolvedUuidExpressionsForStreaming extends Rule[LogicalPlan] {
private lazy val random = new Random()

override def apply(plan: LogicalPlan): LogicalPlan = plan.transformUp {
case p => p transformExpressionsUp {
case _: Uuid if p.isStreaming => Uuid(Some(random.nextLong()))
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ package org.apache.spark.sql.execution.streaming
import java.util.UUID
import java.util.concurrent.atomic.AtomicInteger

import scala.util.Random

import org.apache.spark.internal.Logging
import org.apache.spark.sql.{AnalysisException, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.expressions.CurrentBatchTimestamp
import org.apache.spark.sql.catalyst.expressions.{CurrentBatchTimestamp, Uuid}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{AllTuples, ClusteredDistribution, HashPartitioning, SinglePartition}
import org.apache.spark.sql.catalyst.rules.Rule
Expand Down Expand Up @@ -73,10 +75,14 @@ class IncrementalExecution(
* with the desired literal
*/
override lazy val optimizedPlan: LogicalPlan = {
val random = new Random()

sparkSession.sessionState.optimizer.execute(withCachedData) transformAllExpressions {
case ts @ CurrentBatchTimestamp(timestamp, _, _) =>
logInfo(s"Current batch timestamp = $timestamp")
ts.toLiteral
// SPARK-24896: Set the seed for random number generation in Uuid expressions.
case _: Uuid => Uuid(Some(random.nextLong()))
}
}

Expand Down