Skip to content
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Make few variables as lazy.
  • Loading branch information
viirya committed Mar 21, 2018
commit a592882d0c61eacdae7dfd9309635468535b416c
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,13 @@ case class InMemoryTableScanExec(
override def outputOrdering: Seq[SortOrder] =
relation.child.outputOrdering.map(updateAttribute(_).asInstanceOf[SortOrder])

// When we make canonicalized plan, we can't find a normalized attribute in this map.
// We return a `ColumnStatisticsSchema` for normalized attribute in this case.
private def statsFor(a: Attribute) = relation.partitionStatistics.forAttribute.get(a).
getOrElse(new ColumnStatisticsSchema(a))
// Keeps relation's partition statistics because we don't serialize relation.
private val stats = relation.partitionStatistics
private def statsFor(a: Attribute) = stats.forAttribute(a)

// Returned filter predicate should return false iff it is impossible for the input expression
// to evaluate to `true' based on statistics collected about this partition batch.
@transient val buildFilter: PartialFunction[Expression, Expression] = {
@transient lazy val buildFilter: PartialFunction[Expression, Expression] = {
case And(lhs: Expression, rhs: Expression)
if buildFilter.isDefinedAt(lhs) || buildFilter.isDefinedAt(rhs) =>
(buildFilter.lift(lhs) ++ buildFilter.lift(rhs)).reduce(_ && _)
Expand Down Expand Up @@ -221,14 +220,14 @@ case class InMemoryTableScanExec(
l.asInstanceOf[Literal] <= statsFor(a).upperBound).reduce(_ || _)
}

val partitionFilters: Seq[Expression] = {
lazy val partitionFilters: Seq[Expression] = {
predicates.flatMap { p =>
val filter = buildFilter.lift(p)
val boundFilter =
filter.map(
BindReferences.bindReference(
_,
relation.partitionStatistics.schema,
stats.schema,
allowFailures = true))

boundFilter.foreach(_ =>
Expand All @@ -251,7 +250,7 @@ case class InMemoryTableScanExec(
private def filteredCachedBatches(): RDD[CachedBatch] = {
// Using these variables here to avoid serialization of entire objects (if referenced directly)
// within the map Partitions closure.
val schema = relation.partitionStatistics.schema
val schema = stats.schema
val schemaIndex = schema.zipWithIndex
val buffers = relation.cachedColumnBuffers

Expand Down