Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,15 @@ abstract class Expression extends TreeNode[Expression] {
def foldable: Boolean = false

/**
* Returns true when the current expression always return the same result for fixed input values.
* Returns true when the current expression always return the same result for fixed inputs from
* children.
*
* Note that this means that an expression should be considered as non-deterministic if:
* - if it relies on some mutable internal state, or
* - if it relies on some implicit input that is not part of the children expression list.
*
* An example would be `SparkPartitionID` that relies on the partition id returned by TaskContext.
*/
// TODO: Need to define explicit input values vs implicit input values.
def deterministic: Boolean = true

def nullable: Boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ private[sql] case class MonotonicallyIncreasingID() extends LeafExpression {
*/
@transient private[this] var count: Long = 0L

@transient private lazy val partitionMask = TaskContext.getPartitionId.toLong << 33
@transient private lazy val partitionMask = TaskContext.getPartitionId().toLong << 33

override def deterministic: Boolean = false

override def nullable: Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ import org.apache.spark.sql.types.{IntegerType, DataType}
*/
private[sql] case object SparkPartitionID extends LeafExpression {

override def deterministic: Boolean = false

override def nullable: Boolean = false

override def dataType: DataType = IntegerType

@transient private lazy val partitionId = TaskContext.getPartitionId
@transient private lazy val partitionId = TaskContext.getPartitionId()

override def eval(input: InternalRow): Int = partitionId

Expand Down