Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add interpretedOrdering to ArrayType.
  • Loading branch information
yhuai committed Nov 15, 2015
commit f43a7f9c92dd4b5dc5e37b1fa41f8f8c00ca3020
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ case class SortArray(base: Expression, ascendingOrder: Expression)
private lazy val lt: Comparator[Any] = {
val ordering = base.dataType match {
case _ @ ArrayType(n: AtomicType, _) => n.ordering.asInstanceOf[Ordering[Any]]
case _ @ ArrayType(a: ArrayType, _) => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
case _ @ ArrayType(s: StructType, _) => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
}

Expand All @@ -90,6 +91,7 @@ case class SortArray(base: Expression, ascendingOrder: Expression)
private lazy val gt: Comparator[Any] = {
val ordering = base.dataType match {
case _ @ ArrayType(n: AtomicType, _) => n.ordering.asInstanceOf[Ordering[Any]]
case _ @ ArrayType(a: ArrayType, _) => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
case _ @ ArrayType(s: StructType, _) => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.types._


Expand All @@ -30,76 +29,39 @@ class InterpretedOrdering(ordering: Seq[SortOrder]) extends Ordering[InternalRow
def this(ordering: Seq[SortOrder], inputSchema: Seq[Attribute]) =
this(ordering.map(BindReferences.bindReference(_, inputSchema)))

private def compareValue(
left: Any,
right: Any,
dataType: DataType,
direction: SortDirection): Int = {
if (left == null && right == null) {
return 0
} else if (left == null) {
return if (direction == Ascending) -1 else 1
} else if (right == null) {
return if (direction == Ascending) 1 else -1
} else {
dataType match {
case dt: AtomicType if direction == Ascending =>
return dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case dt: AtomicType if direction == Descending =>
return dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case s: StructType if direction == Ascending =>
return s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case s: StructType if direction == Descending =>
return s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case a: ArrayType =>
val leftArray = left.asInstanceOf[ArrayData]
val rightArray = right.asInstanceOf[ArrayData]
val minLength = scala.math.min(leftArray.numElements(), rightArray.numElements())
var i = 0
while (i < minLength) {
val isNullLeft = leftArray.isNullAt(i)
val isNullRight = rightArray.isNullAt(i)
if (isNullLeft && isNullRight) {
// Do nothing.
} else if (isNullLeft) {
return if (direction == Ascending) -1 else 1
} else if (isNullRight) {
return if (direction == Ascending) 1 else -1
} else {
val comp =
compareValue(
leftArray.get(i, a.elementType),
rightArray.get(i, a.elementType),
a.elementType,
direction)
if (comp != 0) {
return comp
}
}
i += 1
}
if (leftArray.numElements() < rightArray.numElements()) {
return if (direction == Ascending) -1 else 1
} else if (leftArray.numElements() > rightArray.numElements()) {
return if (direction == Ascending) 1 else -1
} else {
return 0
}
case other =>
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
}
}
}

def compare(a: InternalRow, b: InternalRow): Int = {
var i = 0
while (i < ordering.size) {
val order = ordering(i)
val left = order.child.eval(a)
val right = order.child.eval(b)
val comparison = compareValue(left, right, order.dataType, order.direction)
if (comparison != 0) {
return comparison

if (left == null && right == null) {
// Both null, continue looking.
} else if (left == null) {
return if (order.direction == Ascending) -1 else 1
} else if (right == null) {
return if (order.direction == Ascending) 1 else -1
} else {
val comparison = order.dataType match {
case dt: AtomicType if order.direction == Ascending =>
dt.ordering.asInstanceOf[Ordering[Any]].compare(left, right)
case dt: AtomicType if order.direction == Descending =>
dt.ordering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case a: ArrayType if order.direction == Ascending =>
a.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case a: ArrayType if order.direction == Descending =>
a.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case s: StructType if order.direction == Ascending =>
s.interpretedOrdering.asInstanceOf[Ordering[Any]].compare(left, right)
case s: StructType if order.direction == Descending =>
s.interpretedOrdering.asInstanceOf[Ordering[Any]].reverse.compare(left, right)
case other =>
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
}
if (comparison != 0) {
return comparison
}
}
i += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

package org.apache.spark.sql.types

import org.apache.spark.sql.catalyst.util.ArrayData
import org.json4s.JsonDSL._

import org.apache.spark.annotation.DeveloperApi

import scala.math.Ordering


object ArrayType extends AbstractDataType {
/** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */
Expand Down Expand Up @@ -81,4 +84,49 @@ case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataT
override private[spark] def existsRecursively(f: (DataType) => Boolean): Boolean = {
f(this) || elementType.existsRecursively(f)
}

@transient
private[sql] lazy val interpretedOrdering: Ordering[ArrayData] = new Ordering[ArrayData] {
private[this] val elementOrdering: Ordering[Any] = elementType match {
case dt: AtomicType => dt.ordering.asInstanceOf[Ordering[Any]]
case a : ArrayType => a.interpretedOrdering.asInstanceOf[Ordering[Any]]
case s: StructType => s.interpretedOrdering.asInstanceOf[Ordering[Any]]
case other =>
throw new IllegalArgumentException(s"Type $other does not support ordered operations")
}

def compare(x: ArrayData, y: ArrayData): Int = {
val leftArray = x
val rightArray = y
val minLength = scala.math.min(leftArray.numElements(), rightArray.numElements())
var i = 0
while (i < minLength) {
val isNullLeft = leftArray.isNullAt(i)
val isNullRight = rightArray.isNullAt(i)
if (isNullLeft && isNullRight) {
// Do nothing.
} else if (isNullLeft) {
return -1
} else if (isNullRight) {
return 1
} else {
val comp =
elementOrdering.compare(
leftArray.get(i, elementType),
rightArray.get(i, elementType))
if (comp != 0) {
return comp
}
}
i += 1
}
if (leftArray.numElements() < rightArray.numElements()) {
return -1
} else if (leftArray.numElements() > rightArray.numElements()) {
return 1
} else {
return 0
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,14 @@ class DataFrameFunctionsSuite extends QueryTest with SharedSQLContext {
Row(null, null))
)

val df2 = Seq((Array[Array[Int]](Array(2)), "x")).toDF("a", "b")
assert(intercept[AnalysisException] {
df2.selectExpr("sort_array(a)").collect()
}.getMessage().contains("does not support sorting array of type array<int>"))
val df2 = Seq((Array[Array[Int]](Array(2), Array(1), Array(2, 4), null), "x")).toDF("a", "b")
checkAnswer(
df2.selectExpr("sort_array(a, true)", "sort_array(a, false)"),
Seq(
Row(
Seq[Seq[Int]](null, Seq(1), Seq(2), Seq(2, 4)),
Seq[Seq[Int]](Seq(2, 4), Seq(2), Seq(1), null)))
)

val df3 = Seq(("xxx", "x")).toDF("a", "b")
assert(intercept[AnalysisException] {
Expand Down