Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Fix
  • Loading branch information
maropu committed Nov 30, 2018
commit ef203250a69f8348779e8bcc7986c6f6cf0ec8c0
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -157,4 +158,24 @@ object InternalRow {
getValueNullSafe
}
}

/**
* Returns a writer for an `InternalRow` with given data type.
*/
def getWriter(ordinal: Int, dt: DataType): (InternalRow, Any) => Unit = dt match {
case BooleanType => (input, v) => input.setBoolean(ordinal, v.asInstanceOf[Boolean])
case ByteType => (input, v) => input.setByte(ordinal, v.asInstanceOf[Byte])
case ShortType => (input, v) => input.setShort(ordinal, v.asInstanceOf[Short])
case IntegerType | DateType => (input, v) => input.setInt(ordinal, v.asInstanceOf[Int])
case LongType | TimestampType => (input, v) => input.setLong(ordinal, v.asInstanceOf[Long])
case FloatType => (input, v) => input.setFloat(ordinal, v.asInstanceOf[Float])
case DoubleType => (input, v) => input.setDouble(ordinal, v.asInstanceOf[Double])
case DecimalType.Fixed(precision, _) =>
(input, v) => input.setDecimal(ordinal, v.asInstanceOf[Decimal], precision)
case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType |
_: MapType | _: ObjectType | _: UserDefinedType[_] =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should recursive into UDT.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

(input, v) => input.update(ordinal, v)
case NullType => (input, v) => {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the codegen version CodeGenerator.setColumn, we don't match NullType and eventually call row.update(null, i). Shall we follow and call row.setNullAt here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

case _ => throw new SparkException(s"Unsupported data type $dt")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.aggregate.NoOp
import org.apache.spark.sql.types._


/**
Expand Down Expand Up @@ -59,46 +57,20 @@ class InterpretedMutableProjection(expressions: Seq[Expression]) extends Mutable
}

private[this] val fieldWriters = validExprs.map { case (e, i) =>
val writer = generateRowWriter(i, e.dataType)
val writer = InternalRow.getWriter(i, e.dataType)
if (!e.nullable) {
(v: Any) => writer(v)
(v: Any) => writer(mutableRow, v)
} else {
(v: Any) => {
if (v == null) {
mutableRow.setNullAt(i)
} else {
writer(v)
writer(mutableRow, v)
}
}
}
}

private def generateRowWriter(ordinal: Int, dt: DataType): Any => Unit = dt match {
case BooleanType =>
v => mutableRow.setBoolean(ordinal, v.asInstanceOf[Boolean])
case ByteType =>
v => mutableRow.setByte(ordinal, v.asInstanceOf[Byte])
case ShortType =>
v => mutableRow.setShort(ordinal, v.asInstanceOf[Short])
case IntegerType | DateType =>
v => mutableRow.setInt(ordinal, v.asInstanceOf[Int])
case LongType | TimestampType =>
v => mutableRow.setLong(ordinal, v.asInstanceOf[Long])
case FloatType =>
v => mutableRow.setFloat(ordinal, v.asInstanceOf[Float])
case DoubleType =>
v => mutableRow.setDouble(ordinal, v.asInstanceOf[Double])
case DecimalType.Fixed(precision, _) =>
v => mutableRow.setDecimal(ordinal, v.asInstanceOf[Decimal], precision)
case CalendarIntervalType | BinaryType | _: ArrayType | StringType | _: StructType |
_: MapType | _: UserDefinedType[_] =>
v => mutableRow.update(ordinal, v)
case NullType =>
v => {}
case _ =>
throw new SparkException(s"Unsupported data type $dt")
}

override def apply(input: InternalRow): InternalRow = {
var i = 0
while (i < validExprs.length) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.{Locale, TimeZone}

import scala.util.control.NonFatal

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
Expand Down Expand Up @@ -142,12 +143,10 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {

val (comments, code) = input.split("\n").partition(_.startsWith("--"))

// Runs all the tests on both codegen-only and interpreter modes. Since explain results differ
// when `WHOLESTAGE_CODEGEN_ENABLED` disabled, we don't run these tests now.
val codegenConfigSets = Array(("false", "NO_CODEGEN"), ("true", "CODEGEN_ONLY")).map {
case (wholeStageCodegenEnabled, codegenFactoryMode) =>
Array( // SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> wholeStageCodegenEnabled,
SQLConf.CODEGEN_FACTORY_MODE.key -> codegenFactoryMode)
// Runs all the tests on both codegen-only and interpreter modes
val codegenConfigSets = Array(CODEGEN_ONLY, NO_CODEGEN).map {
case codegenFactoryMode =>
Array(SQLConf.CODEGEN_FACTORY_MODE.key -> codegenFactoryMode.toString)
}
val configSets = {
val configLines = comments.filter(_.startsWith("--SET")).map(_.substring(5))
Expand Down