Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix issue without whole stage codegen
  • Loading branch information
mengxr committed Oct 20, 2016
commit ccd2fe70a7e0d3f7f9a1bf9ce54b1a00a544d5cc
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,9 @@ object GenerateUnsafeProjection extends CodeGenerator[Seq[Expression], UnsafePro

public SpecificUnsafeProjection(Object[] references) {
this.references = references;
}

public void initializeStatesForPartition(int partitionIndex) {
${ctx.initMutableStates()}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,11 @@ package object expressions {
abstract class Projection extends (InternalRow => InternalRow) {

/**
* Initialize internal state given the current partition index.
* This is used by non-deterministic expressions to set the initial state
* Initialize internal states given the current partition index.
* This is used by non-deterministic expressions to set initial states.
* The default implementation does nothing.
*/
def initializeStateForPartition(partitionIndex: Int): Unit = {}
def initializeStatesForPartition(partitionIndex: Int): Unit = {}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,10 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan)
}

protected override def doExecute(): RDD[InternalRow] = {
child.execute().mapPartitionsInternal { iter =>
child.execute().mapPartitionsWithIndexInternal { (index, iter) =>
val project = UnsafeProjection.create(projectList, child.output,
subexpressionEliminationEnabled)
project.initializeStatesForPartition(index)
iter.map(project)
}
}
Expand Down