Skip to content
Prev Previous commit
Next Next commit
[SPARK-11080] [SQL] Incorporate per-JVM id into ExprId to prevent uns…
…afe cross-JVM comparisions

In the current implementation of named expressions' `ExprIds`, we rely on a per-JVM AtomicLong to ensure that expression ids are unique within a JVM. However, these expression ids will not be _globally_ unique. This opens the potential for id collisions if new expression ids happen to be created inside of tasks rather than on the driver.

There are currently a few cases where tasks allocate expression ids, which happen to be safe because those expressions are never compared to expressions created on the driver. In order to guard against the introduction of invalid comparisons between driver-created and executor-created expression ids, this patch extends `ExprId` to incorporate a UUID to identify the JVM that created the id, which prevents collisions.

Author: Josh Rosen <[email protected]>

Closes apache#9093 from JoshRosen/SPARK-11080.
  • Loading branch information
JoshRosen authored and davies committed Dec 11, 2015
commit 5e603a51c09a94280c346bee12def0c49479d069
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,32 @@

package org.apache.spark.sql.catalyst.expressions

import java.util.UUID

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.types._

object NamedExpression {
private val curId = new java.util.concurrent.atomic.AtomicLong()
def newExprId: ExprId = ExprId(curId.getAndIncrement())
private[expressions] val jvmId = UUID.randomUUID()
def newExprId: ExprId = ExprId(curId.getAndIncrement(), jvmId)
def unapply(expr: NamedExpression): Option[(String, DataType)] = Some(expr.name, expr.dataType)
}

/**
* A globally unique (within this JVM) id for a given named expression.
* A globally unique id for a given named expression.
* Used to identify which attribute output by a relation is being
* referenced in a subsequent computation.
*
* The `id` field is unique within a given JVM, while the `uuid` is used to uniquely identify JVMs.
*/
case class ExprId(id: Long)
case class ExprId(id: Long, jvmId: UUID)

object ExprId {
def apply(id: Long): ExprId = ExprId(id, NamedExpression.jvmId)
}

/**
* An [[Expression]] that is named.
Expand Down