Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
ValidateExternalType should support interpreted execution
  • Loading branch information
maropu committed Apr 19, 2018
commit 78de6c8fb7be454f1f0dcdb49a460c4fa086fcd9
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,19 @@ object ScalaReflection extends ScalaReflection {
case _ => false
}

def classForNativeTypeOf(dt: DataType): Class[_] = dt match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this match CodeGenerator.boxedType in terms of functionality? See my previous comments, but I am also missing complex types (struct, map & array).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC this function would be better to only handles the native types that have the same format between the Spark SQL internals formats and externals (struct, map, and array have different formats between them). https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/RowEncoder.scala#L209 So, this function needs to handle more wider types than CodeGenerator.boxedType.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a difference between how we implement an expression and how we use an expression. In this case the implementations should behave the same, and not only in the context in which it is being used.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll brush up the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell How about the latest commit?

case NullType => classOf[Object]
case BooleanType => classOf[java.lang.Boolean]
case ByteType => classOf[java.lang.Byte]
case ShortType => classOf[java.lang.Short]
case IntegerType => classOf[java.lang.Integer]
case LongType => classOf[java.lang.Long]
case FloatType => classOf[java.lang.Float]
case DoubleType => classOf[java.lang.Double]
case BinaryType => classOf[Array[Byte]]
case CalendarIntervalType => classOf[CalendarInterval]
}

/**
* Returns an expression that can be used to deserialize an input row to an object of type `T`
* with a compatible schema. Fields of the row will be extracted using UnresolvedAttributes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1672,11 +1672,37 @@ case class ValidateExternalType(child: Expression, expected: DataType)

override def dataType: DataType = RowEncoder.externalDataTypeForInput(expected)

override def eval(input: InternalRow): Any =
throw new UnsupportedOperationException("Only code-generated evaluation is supported")

private val errMsg = s" is not a valid external type for schema of ${expected.simpleString}"

private lazy val checkType = expected match {
case _: DecimalType =>
(value: Any) => {
Seq(classOf[java.math.BigDecimal], classOf[scala.math.BigDecimal], classOf[Decimal])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we do 3 instanceOf checks instead? This seems a bit expensive.

.exists { x => value.getClass.isAssignableFrom(x) }
}
case _: ArrayType =>
(value: Any) => {
value.getClass.isAssignableFrom(classOf[Seq[_]]) || value.getClass.isArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is cheaper? isAssignableFrom or isArray? If it is the latter then we should swap the order.

Copy link
Member Author

@maropu maropu Mar 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have no idea for that and what do u know about it in jvm? @kiszk @rednaxelafx I feel isArray seems cheaper...?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi guys, sorry I'm late.

In your new code you're doing:

+    case _: ArrayType =>
+      (value: Any) => {
+        value.getClass.isArray || value.isInstanceOf[Seq[_]]
+      }

which is good. xxx.getClass().isAssignableFrom(some_class_literal) in the old version of this PR is actually backwards, it should have been some_class_literal.isAssignableFrom(xxx.getClass()), e.g.

scala> classOf[String].isAssignableFrom(classOf[Object])
res0: Boolean = false

scala> classOf[Object].isAssignableFrom(classOf[String])
res1: Boolean = true

and the latter is semantically the same as xxx.isInstanceOf[some_class]. isInstanceOf[] is guaranteed to be at least as fast as some_class_literal.isAssignableFrom(xxx.getClass()), and in general isInstanceOf[] is faster.

xxx.getClass().isArray() has a fixed overhead, whereas isInstanceOf[] can have a fast path slightly faster than the isArray and a slow path that can be much slower than isArray. So putting the isArray check first in your new code makes more sense to me.

Copy link
Contributor

@rednaxelafx rednaxelafx Mar 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For those curious:

In HotSpot, the straightforward interpreter/C1 implementation of xxx.getClass().isArray() path is actually something like:

// for getClass()
klazz = xxx._klass; // read the hidden klass pointer field from the object header
clazz = klazz._java_mirror; // read the java.lang.Class reference from the Klass
// for clazz.isArray(): go through JNI and call the native JVM_IsArrayClass() inside HotSpot
klazz1 = clazz->_klass;
result = klazz1->oop_is_array();

So a JNI native method call is involved and that's not really fast. But C2 will optimize this into something similar to:

klazz = xxx._klass;
result = inlined klazz->oop_is_array();

So that's pretty fast. No need to load the java.lang.Class (aka "Java Mirror") reference anymore.

In the xxx.isInstanceOf[Seq[_]] case, again the interpreter version would go through a JNI native method call, whereas the C1/C2 versions will inline a fast path logic and do a quick comparison against a per-type cache. This fast path check has similar overhead to the C2 isArray() overhead, and the slow path is a slow linear search over an array of implemented interfaces of the klass which can be much slower than the simple isArray check.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the valuable info., kris! Based on the comment, the current version is ok to me.

}
case _ if ScalaReflection.isNativeType(expected) =>
(value: Any) => {
value.getClass.isAssignableFrom(ScalaReflection.classForNativeTypeOf(expected))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move the ScalaReflection.classForNativeTypeOf(expected) out of the function body and into a val.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might also be a bit faster if we use isInstance here. All of these classes do not have subclasses.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

}
case _ =>
(value: Any) => {
value.getClass.isAssignableFrom(dataType.asInstanceOf[ObjectType].cls)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar as above. Let's move dataType.asInstanceOf[ObjectType].cls out of the function and into a val.

Also where is this code path in the code generated version?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since I thought the previous version is some difficult to read , I cleaned up the code along with the generated version. How about it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The external type for PythonUserDefinedType may not necessarily be ObjectType.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh..., great catch.

}
}

override def eval(input: InternalRow): Any = {
val result = child.eval(input)
if (checkType(result)) {
result
} else {
throw new RuntimeException(s"${result.getClass.getName}$errMsg")
}
}

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
// Use unnamed reference that doesn't create a local field here to reduce the number of fields
// because errMsgField is used only when the type doesn't match.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, Generic
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{CalendarInterval, UTF8String}

class InvokeTargetClass extends Serializable {
def filterInt(e: Any): Any = e.asInstanceOf[Int] > 0
Expand Down Expand Up @@ -274,6 +274,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
checkEvaluation(createExternalRow, Row.fromSeq(Seq(1, "x")), InternalRow.fromSeq(Seq()))
}

// This is an alternative version of `checkEvaluation` to compare results
// by scala values instead of catalyst values.
private def checkObjectExprEvaluation(
expression: => Expression, expected: Any, inputRow: InternalRow = EmptyRow): Unit = {
Expand All @@ -296,7 +297,7 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true)
val getRowField = GetExternalRowField(inputObject, index = 0, fieldName = "c0")
Seq((Row(1), 1), (Row(3), 3)).foreach { case (input, expected) =>
checkEvaluation(getRowField, expected, InternalRow.fromSeq(Seq(input)))
checkObjectExprEvaluation(getRowField, expected, InternalRow.fromSeq(Seq(input)))
}

// If an input row or a field are null, a runtime exception will be thrown
Expand Down Expand Up @@ -472,6 +473,35 @@ class ObjectExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
val deserializer = toMapExpr.copy(inputData = Literal.create(data))
checkObjectExprEvaluation(deserializer, expected = data)
}

test("SPARK-23595 ValidateExternalType should support interpreted execution") {
val inputObject = BoundReference(0, ObjectType(classOf[Row]), nullable = true)
Seq(
(true, BooleanType),
(2.toByte, ByteType),
(5.toShort, ShortType),
(23, IntegerType),
(61L, LongType),
(1.0f, FloatType),
(10.0, DoubleType),
("abcd".getBytes, BinaryType),
("abcd", StringType),
(BigDecimal.valueOf(10), DecimalType.IntDecimal),
(CalendarInterval.fromString("interval 3 day"), CalendarIntervalType),
(java.math.BigDecimal.valueOf(10), DecimalType.BigIntDecimal),
(Array(3, 2, 1), ArrayType(IntegerType))
).foreach { case (input, dt) =>
val validateType = ValidateExternalType(
GetExternalRowField(inputObject, index = 0, fieldName = "c0"), dt)
checkObjectExprEvaluation(validateType, input, InternalRow.fromSeq(Seq(Row(input))))
}

checkExceptionInExpression[RuntimeException](
ValidateExternalType(
GetExternalRowField(inputObject, index = 0, fieldName = "c0"), DoubleType),
InternalRow.fromSeq(Seq(Row(1))),
"java.lang.Integer is not a valid external type for schema of double")
}
}

class TestBean extends Serializable {
Expand Down