From d7acae55034d4ff5da3e7579cf44acb7b704b4a1 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Tue, 16 Aug 2016 17:40:07 -0700 Subject: [PATCH 01/13] [SPARK-16947][SQL] Support type coercion and foldable expression for inline tables --- .../sql/catalyst/analysis/Analyzer.scala | 1 + .../analysis/ResolveInlineTables.scala | 101 +++++++++++++++++ .../sql/catalyst/analysis/TypeCoercion.scala | 2 +- .../sql/catalyst/analysis/unresolved.scala | 17 +++ .../sql/catalyst/parser/AstBuilder.scala | 38 ++----- .../plans/logical/LocalRelation.scala | 2 + .../analysis/ResolveInlineTablesSuite.scala | 84 ++++++++++++++ .../sql-tests/inputs/inline-table.sql | 36 ++++++ .../sql-tests/results/inline-table.sql.out | 107 ++++++++++++++++++ 9 files changed, 359 insertions(+), 29 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala create mode 100644 sql/core/src/test/resources/sql-tests/inputs/inline-table.sql create mode 100644 sql/core/src/test/resources/sql-tests/results/inline-table.sql.out diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index a2a022c2476f..e7c92ec8b6de 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -107,6 +107,7 @@ class Analyzer( GlobalAggregates :: ResolveAggregateFunctions :: TimeWindowing :: + ResolveInlineTables :: TypeCoercion.typeCoercionRules ++ extendedResolutionRules : _*), Batch("Nondeterministic", Once, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala new file mode 100644 index 000000000000..441febbc8c83 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * An analyzer rule that replaces [[UnresolvedInlineTable]] with [[LocalRelation]]. + */ +object ResolveInlineTables extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { + case table: UnresolvedInlineTable if table.expressionsResolved => + validateInputDimension(table) + validateInputFoldable(table) + convert(table) + } + + /** Validates that all inline table data are foldable expressions. */ + def validateInputFoldable(table: UnresolvedInlineTable): Unit = { + table.rows.foreach { row => + row.foreach { e => + if (!e.resolved || !e.foldable) { + e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") + } + } + } + } + + /** + * Validates the input data dimension: + * 1. All rows have the same cardinality. + * 2. The number of column aliases defined is consistent with the number of columns in data. + */ + def validateInputDimension(table: UnresolvedInlineTable): Unit = { + if (table.rows.nonEmpty) { + val numCols = table.rows.head.size + table.rows.zipWithIndex.foreach { case (row, ri) => + if (row.size != numCols) { + table.failAnalysis(s"expected $numCols columns but found ${row.size} columns in row $ri") + } + } + + if (table.names.size != numCols) { + table.failAnalysis(s"expected ${table.names.size} columns but found $numCols in first row") + } + } + } + + /** + * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] + * into a [[LocalRelation]]. + * + * This function attempts to coerce inputs into consistent types. + */ + def convert(table: UnresolvedInlineTable): LocalRelation = { + val numCols = table.rows.head.size + + // For each column, traverse all the values and find a common data type. + val targetTypes = Seq.tabulate(numCols) { ci => + val inputTypes = table.rows.map(_(ci).dataType) + TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { + table.failAnalysis(s"incompatible types found in column $ci for inline table") + } + } + assert(targetTypes.size == table.names.size) + + val newRows: Seq[InternalRow] = table.rows.map { row => + InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) => + val targetType = targetTypes(ci) + if (e.dataType.sameType(targetType)) { + e.eval() + } else { + Cast(e, targetType).eval() + } + }) + } + + val attributes = StructType(targetTypes.zip(table.names) + .map { case (typ, name) => StructField(name, typ) }).toAttributes + LocalRelation(attributes, newRows) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 021952e7166f..ddf0e0f58f2f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -150,7 +150,7 @@ object TypeCoercion { * [[findTightestCommonType]], but can handle decimal types. If the wider decimal type exceeds * system limitation, this rule will truncate the decimal type before return it. */ - private def findWiderTypeWithoutStringPromotion(types: Seq[DataType]): Option[DataType] = { + def findWiderTypeWithoutStringPromotion(types: Seq[DataType]): Option[DataType] = { types.foldLeft[Option[DataType]](Some(NullType))((r, c) => r match { case Some(d) => findTightestCommonTypeOfTwo(d, c).orElse((d, c) match { case (t1: DecimalType, t2: DecimalType) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 42e7aae0b6b0..0e24512b82fd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -49,6 +49,23 @@ case class UnresolvedRelation( override lazy val resolved = false } +/** + * An inline table that has not been resolved yet. Once resolved, it is turned by the analyzer into + * a [[org.apache.spark.sql.catalyst.plans.logical.LocalRelation]]. + * + * @param names list of column names + * @param rows expressions for the data + */ +case class UnresolvedInlineTable( + names: Seq[String], + rows: Seq[Seq[Expression]]) + extends LeafNode { + + lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved)) + override def output: Seq[Attribute] = Nil + override lazy val resolved = false +} + /** * Holds the name of an attribute that has yet to be resolved. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 25c8445b4d33..8a3587f6b961 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -662,39 +662,21 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { */ override def visitInlineTable(ctx: InlineTableContext): LogicalPlan = withOrigin(ctx) { // Get the backing expressions. - val expressions = ctx.expression.asScala.map { eCtx => - val e = expression(eCtx) - assert(e.foldable, "All expressions in an inline table must be constants.", eCtx) - e - } - - // Validate and evaluate the rows. - val (structType, structConstructor) = expressions.head.dataType match { - case st: StructType => - (st, (e: Expression) => e) - case dt => - val st = CreateStruct(Seq(expressions.head)).dataType - (st, (e: Expression) => CreateStruct(Seq(e))) - } - val rows = expressions.map { - case expression => - val safe = Cast(structConstructor(expression), structType) - safe.eval().asInstanceOf[InternalRow] + val rows = ctx.expression.asScala.map { e => + expression(e) match { + case CreateStruct(children) => children + case child => Seq(child) + } } - // Construct attributes. - val baseAttributes = structType.toAttributes.map(_.withNullability(true)) - val attributes = if (ctx.identifierList != null) { - val aliases = visitIdentifierList(ctx.identifierList) - assert(aliases.size == baseAttributes.size, - "Number of aliases must match the number of fields in an inline table.", ctx) - baseAttributes.zip(aliases).map(p => p._1.withName(p._2)) + val aliases = if (ctx.identifierList != null) { + visitIdentifierList(ctx.identifierList) } else { - baseAttributes + Seq.tabulate(rows.head.size)(i => s"col${i + 1}") } - // Create plan and add an alias if a name has been defined. - LocalRelation(attributes, rows).optionalMap(ctx.identifier)(aliasPlan) + val table = UnresolvedInlineTable(aliases, rows) + table.optionalMap(ctx.identifier)(aliasPlan) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index 9d64f35efcc6..b868047835b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -75,4 +75,6 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) override lazy val statistics = Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length) + + override def maxRows: Option[Long] = Some(data.size) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala new file mode 100644 index 000000000000..34cdc4fd89c9 --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.scalatest.BeforeAndAfter + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.expressions.{Literal, Rand} +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.types.LongType + +/** + * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in + * end-to-end tests (in sql/core module) for verifying the correct error messages are shown + * in negative cases. + */ +class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { + + private def lit(v: Any): Literal = Literal(v) + + test("validate inputs are foldable") { + ResolveInlineTables.validateInputFoldable( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1))))) + + // nondeterministic (rand) + intercept[AnalysisException] { + ResolveInlineTables.validateInputFoldable( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Rand(1))))) + } + + // unresolved attribute + intercept[AnalysisException] { + ResolveInlineTables.validateInputFoldable( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A"))))) + } + } + + test("validate input dimensions") { + ResolveInlineTables.validateInputDimension( + UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2))))) + + // num alias != data dimension + intercept[AnalysisException] { + ResolveInlineTables.validateInputDimension( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1)), Seq(lit(2))))) + } + + // num alias == data dimension, but data themselves are inconsistent + intercept[AnalysisException] { + ResolveInlineTables.validateInputDimension( + UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(21), lit(22))))) + } + } + + test("do not fire the rule if not all expressions are resolved") { + val table = UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A")))) + assert(ResolveInlineTables(table) == table) + } + + test("convert") { + val table = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) + val converted = ResolveInlineTables.convert(table) + + assert(converted.output.map(_.dataType) == Seq(LongType)) + assert(converted.data.size == 2) + assert(converted.data(0).getLong(0) == 1L) + assert(converted.data(1).getLong(0) == 2L) + } +} diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql new file mode 100644 index 000000000000..42b12f6999e9 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -0,0 +1,36 @@ + +-- single row, without table and column alias +select * from values ("one", 1); + +-- single row, without column alias +select * from values ("one", 1) as data; + +-- single row +select * from values ("one", 1) as data(a, b); + +-- two rows +select * from values ("one", 1), ("two", 2) as data(a, b); + +-- int and long coercion +select * from values ("one", 1), ("two", 2L) as data(a, b); + +-- foldable expressions +select * from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b); + +-- complex types +select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b); + +-- decimal and double coercion +select * from values ("one", 2.0), ("two", 3.0D) as data(a, b); + +-- error reporting: different number of columns +select * from values ("one", 2.0), ("two") as data(a, b); + +-- error reporting: types that are incompatible +select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b); + +-- error reporting: number aliases different from number data values +select * from values ("one"), ("two") as data(a, b); + +-- error reporting: unresolved expression +select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out new file mode 100644 index 000000000000..a61d00f04240 --- /dev/null +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -0,0 +1,107 @@ +-- Automatically generated by SQLQueryTestSuite +-- Number of queries: 12 + + +-- !query 0 +select * from values ("one", 1) +-- !query 0 schema +struct +-- !query 0 output +one 1 + + +-- !query 1 +select * from values ("one", 1) as data +-- !query 1 schema +struct +-- !query 1 output +one 1 + + +-- !query 2 +select * from values ("one", 1) as data(a, b) +-- !query 2 schema +struct +-- !query 2 output +one 1 + + +-- !query 3 +select * from values ("one", 1), ("two", 2) as data(a, b) +-- !query 3 schema +struct +-- !query 3 output +one 1 +two 2 + + +-- !query 4 +select * from values ("one", 1), ("two", 2L) as data(a, b) +-- !query 4 schema +struct +-- !query 4 output +one 1 +two 2 + + +-- !query 5 +select * from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b) +-- !query 5 schema +struct +-- !query 5 output +one 1 +two 4 + + +-- !query 6 +select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b) +-- !query 6 schema +struct> +-- !query 6 output +one [0,1] +two [2,3] + + +-- !query 7 +select * from values ("one", 2.0), ("two", 3.0D) as data(a, b) +-- !query 7 schema +struct +-- !query 7 output +one 2.0 +two 3.0 + + +-- !query 8 +select * from values ("one", 2.0), ("two") as data(a, b) +-- !query 8 schema +struct<> +-- !query 8 output +org.apache.spark.sql.AnalysisException +expected 2 columns but found 1 columns in row 1; line 1 pos 14 + + +-- !query 9 +select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) +-- !query 9 schema +struct<> +-- !query 9 output +org.apache.spark.sql.AnalysisException +incompatible types found in column 1 for inline table; line 1 pos 14 + + +-- !query 10 +select * from values ("one"), ("two") as data(a, b) +-- !query 10 schema +struct<> +-- !query 10 output +org.apache.spark.sql.AnalysisException +expected 2 columns but found 1 in first row; line 1 pos 14 + + +-- !query 11 +select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) +-- !query 11 schema +struct<> +-- !query 11 output +org.apache.spark.sql.AnalysisException +Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 From 6a0450b079a6d7f2f21de5fb5dd1b8b806846cfe Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Tue, 16 Aug 2016 17:53:12 -0700 Subject: [PATCH 02/13] Add one more negative test case for aggregate expression. --- .../test/resources/sql-tests/inputs/inline-table.sql | 3 +++ .../resources/sql-tests/results/inline-table.sql.out | 11 ++++++++++- 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 42b12f6999e9..a893cdeed6e3 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -34,3 +34,6 @@ select * from values ("one"), ("two") as data(a, b); -- error reporting: unresolved expression select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b); + +-- error reporting: aggregate expression +select * from values ("one", count(1)), ("two", 2) as data(a, b); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index a61d00f04240..67997e6052f0 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 12 +-- Number of queries: 13 -- !query 0 @@ -105,3 +105,12 @@ struct<> -- !query 11 output org.apache.spark.sql.AnalysisException Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 + + +-- !query 12 +select * from values ("one", count(1)), ("two", 2) as data(a, b) +-- !query 12 schema +struct<> +-- !query 12 output +org.apache.spark.sql.AnalysisException +cannot evaluate expression count(1) in inline table definition; line 1 pos 29 From 2327b7971a845ce01b0b18fd5ccd2b1f0bb99be0 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Tue, 16 Aug 2016 17:54:54 -0700 Subject: [PATCH 03/13] update doc about visibility for testing. --- .../sql/catalyst/analysis/ResolveInlineTables.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 441febbc8c83..76b6c924602d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -34,7 +34,11 @@ object ResolveInlineTables extends Rule[LogicalPlan] { convert(table) } - /** Validates that all inline table data are foldable expressions. */ + /** + * Validates that all inline table data are foldable expressions. + * + * This is publicly visible for unit testing. + */ def validateInputFoldable(table: UnresolvedInlineTable): Unit = { table.rows.foreach { row => row.foreach { e => @@ -49,6 +53,8 @@ object ResolveInlineTables extends Rule[LogicalPlan] { * Validates the input data dimension: * 1. All rows have the same cardinality. * 2. The number of column aliases defined is consistent with the number of columns in data. + * + * This is publicly visible for unit testing. */ def validateInputDimension(table: UnresolvedInlineTable): Unit = { if (table.rows.nonEmpty) { @@ -70,6 +76,8 @@ object ResolveInlineTables extends Rule[LogicalPlan] { * into a [[LocalRelation]]. * * This function attempts to coerce inputs into consistent types. + * + * This is publicly visible for unit testing. */ def convert(table: UnresolvedInlineTable): LocalRelation = { val numCols = table.rows.head.size From fcc3cafe08c9549be51b33b5ac993fbb3fa46d37 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Tue, 16 Aug 2016 21:38:20 -0700 Subject: [PATCH 04/13] Fix PlanParserSuite --- .../sql/catalyst/parser/PlanParserSuite.scala | 21 +++++++------------ 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala index 34d52c75e0af..59b71a74849b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.parser import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.FunctionIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedGenerator +import org.apache.spark.sql.catalyst.analysis.{UnresolvedGenerator, UnresolvedInlineTable} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ @@ -427,19 +427,14 @@ class PlanParserSuite extends PlanTest { } test("inline table") { - assertEqual("values 1, 2, 3, 4", LocalRelation.fromExternalRows( - Seq('col1.int), - Seq(1, 2, 3, 4).map(x => Row(x)))) + assertEqual("values 1, 2, 3, 4", + UnresolvedInlineTable(Seq("col1"), Seq(1, 2, 3, 4).map(x => Seq(Literal(x))))) + assertEqual( - "values (1, 'a'), (2, 'b'), (3, 'c') as tbl(a, b)", - LocalRelation.fromExternalRows( - Seq('a.int, 'b.string), - Seq((1, "a"), (2, "b"), (3, "c")).map(x => Row(x._1, x._2))).as("tbl")) - intercept("values (a, 'a'), (b, 'b')", - "All expressions in an inline table must be constants.") - intercept("values (1, 'a'), (2, 'b') as tbl(a, b, c)", - "Number of aliases must match the number of fields in an inline table.") - intercept[ArrayIndexOutOfBoundsException](parsePlan("values (1, 'a'), (2, 'b', 5Y)")) + "values (1, 'a'), (2, 'b') as tbl(a, b)", + UnresolvedInlineTable( + Seq("a", "b"), + Seq(Literal(1), Literal("a")) :: Seq(Literal(2), Literal("b")) :: Nil).as("tbl")) } test("simple select query with !> and !<") { From 092605be786adae0aa241da43e25d1f1be5de492 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Tue, 16 Aug 2016 21:45:23 -0700 Subject: [PATCH 05/13] Fix tests --- .../apache/spark/sql/catalyst/plans/logical/LocalRelation.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala index b868047835b1..9d64f35efcc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala @@ -75,6 +75,4 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil) override lazy val statistics = Statistics(sizeInBytes = output.map(_.dataType.defaultSize).sum * data.length) - - override def maxRows: Option[Long] = Some(data.size) } From 47239020ac7008d8630a5c810df3db29a77795cf Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Tue, 16 Aug 2016 22:42:20 -0700 Subject: [PATCH 06/13] Add two cases with null. --- .../sql-tests/inputs/inline-table.sql | 5 +- .../sql-tests/results/inline-table.sql.out | 64 +++++++++++-------- 2 files changed, 41 insertions(+), 28 deletions(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index a893cdeed6e3..7f06d68c2ebe 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -9,7 +9,10 @@ select * from values ("one", 1) as data; select * from values ("one", 1) as data(a, b); -- two rows -select * from values ("one", 1), ("two", 2) as data(a, b); +select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b); + +-- null type +select * from values ("one", null), ("two", null) as data(a, b); -- int and long coercion select * from values ("one", 1), ("two", 2L) as data(a, b); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index 67997e6052f0..71b39d019c3b 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 13 +-- Number of queries: 14 -- !query 0 @@ -27,90 +27,100 @@ one 1 -- !query 3 -select * from values ("one", 1), ("two", 2) as data(a, b) +select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b) -- !query 3 schema struct -- !query 3 output one 1 +three NULL two 2 -- !query 4 -select * from values ("one", 1), ("two", 2L) as data(a, b) +select * from values ("one", null), ("two", null) as data(a, b) -- !query 4 schema -struct +struct -- !query 4 output -one 1 -two 2 +one NULL +two NULL -- !query 5 -select * from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b) +select * from values ("one", 1), ("two", 2L) as data(a, b) -- !query 5 schema struct -- !query 5 output one 1 -two 4 +two 2 -- !query 6 -select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b) +select * from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b) -- !query 6 schema -struct> +struct -- !query 6 output -one [0,1] -two [2,3] +one 1 +two 4 -- !query 7 -select * from values ("one", 2.0), ("two", 3.0D) as data(a, b) +select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b) -- !query 7 schema -struct +struct> -- !query 7 output -one 2.0 -two 3.0 +one [0,1] +two [2,3] -- !query 8 -select * from values ("one", 2.0), ("two") as data(a, b) +select * from values ("one", 2.0), ("two", 3.0D) as data(a, b) -- !query 8 schema -struct<> +struct -- !query 8 output -org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 1; line 1 pos 14 +one 2.0 +two 3.0 -- !query 9 -select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) +select * from values ("one", 2.0), ("two") as data(a, b) -- !query 9 schema struct<> -- !query 9 output org.apache.spark.sql.AnalysisException -incompatible types found in column 1 for inline table; line 1 pos 14 +expected 2 columns but found 1 columns in row 1; line 1 pos 14 -- !query 10 -select * from values ("one"), ("two") as data(a, b) +select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) -- !query 10 schema struct<> -- !query 10 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 in first row; line 1 pos 14 +incompatible types found in column 1 for inline table; line 1 pos 14 -- !query 11 -select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) +select * from values ("one"), ("two") as data(a, b) -- !query 11 schema struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 +expected 2 columns but found 1 in first row; line 1 pos 14 -- !query 12 -select * from values ("one", count(1)), ("two", 2) as data(a, b) +select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) -- !query 12 schema struct<> -- !query 12 output org.apache.spark.sql.AnalysisException +Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 + + +-- !query 13 +select * from values ("one", count(1)), ("two", 2) as data(a, b) +-- !query 13 schema +struct<> +-- !query 13 output +org.apache.spark.sql.AnalysisException cannot evaluate expression count(1) in inline table definition; line 1 pos 29 From c1071afc2b46402eea1c2c179401e1f4f5cde5b4 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Wed, 17 Aug 2016 16:46:50 -0700 Subject: [PATCH 07/13] Code review --- .../analysis/ResolveInlineTables.scala | 32 ++++++++--------- .../sql-tests/inputs/inline-table.sql | 3 ++ .../sql-tests/results/inline-table.sql.out | 35 ++++++++++++------- 3 files changed, 39 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 76b6c924602d..cdd17b4901ba 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.Cast +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.{StructField, StructType} @@ -30,16 +30,16 @@ object ResolveInlineTables extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case table: UnresolvedInlineTable if table.expressionsResolved => validateInputDimension(table) - validateInputFoldable(table) + // validateInputFoldable(table) convert(table) } /** * Validates that all inline table data are foldable expressions. * - * This is publicly visible for unit testing. + * This is package visible for unit testing. */ - def validateInputFoldable(table: UnresolvedInlineTable): Unit = { + private[analysis] def validateInputFoldable(table: UnresolvedInlineTable): Unit = { table.rows.foreach { row => row.foreach { e => if (!e.resolved || !e.foldable) { @@ -54,9 +54,9 @@ object ResolveInlineTables extends Rule[LogicalPlan] { * 1. All rows have the same cardinality. * 2. The number of column aliases defined is consistent with the number of columns in data. * - * This is publicly visible for unit testing. + * This is package visible for unit testing. */ - def validateInputDimension(table: UnresolvedInlineTable): Unit = { + private[analysis] def validateInputDimension(table: UnresolvedInlineTable): Unit = { if (table.rows.nonEmpty) { val numCols = table.rows.head.size table.rows.zipWithIndex.foreach { case (row, ri) => @@ -77,29 +77,25 @@ object ResolveInlineTables extends Rule[LogicalPlan] { * * This function attempts to coerce inputs into consistent types. * - * This is publicly visible for unit testing. + * This is package visible for unit testing. */ - def convert(table: UnresolvedInlineTable): LocalRelation = { + private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = { val numCols = table.rows.head.size // For each column, traverse all the values and find a common data type. - val targetTypes = Seq.tabulate(numCols) { ci => - val inputTypes = table.rows.map(_(ci).dataType) + val targetTypes = table.rows.transpose.zip(table.names).map { case (column, name) => + val inputTypes = column.map(_.dataType) TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { - table.failAnalysis(s"incompatible types found in column $ci for inline table") + table.failAnalysis(s"incompatible types found in column $name for inline table") } } assert(targetTypes.size == table.names.size) val newRows: Seq[InternalRow] = table.rows.map { row => - InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) => + new InterpretedProjection(row.zipWithIndex.map { case (e, ci) => val targetType = targetTypes(ci) - if (e.dataType.sameType(targetType)) { - e.eval() - } else { - Cast(e, targetType).eval() - } - }) + if (e.dataType.sameType(targetType)) e else Cast(e, targetType) + }).apply(null) } val attributes = StructType(targetTypes.zip(table.names) diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 7f06d68c2ebe..0c8a1d99944f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -26,6 +26,9 @@ select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b); -- decimal and double coercion select * from values ("one", 2.0), ("two", 3.0D) as data(a, b); +-- nondeterministic function rand +select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b); + -- error reporting: different number of columns select * from values ("one", 2.0), ("two") as data(a, b); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index 71b39d019c3b..f7eec8754af6 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 14 +-- Number of queries: 15 -- !query 0 @@ -82,45 +82,54 @@ two 3.0 -- !query 9 -select * from values ("one", 2.0), ("two") as data(a, b) +select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) -- !query 9 schema -struct<> +struct -- !query 9 output -org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 columns in row 1; line 1 pos 14 +one 0.087440518337355 +two 3.0 -- !query 10 -select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) +select * from values ("one", 2.0), ("two") as data(a, b) -- !query 10 schema struct<> -- !query 10 output org.apache.spark.sql.AnalysisException -incompatible types found in column 1 for inline table; line 1 pos 14 +expected 2 columns but found 1 columns in row 1; line 1 pos 14 -- !query 11 -select * from values ("one"), ("two") as data(a, b) +select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) -- !query 11 schema struct<> -- !query 11 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 in first row; line 1 pos 14 +incompatible types found in column b for inline table; line 1 pos 14 -- !query 12 -select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) +select * from values ("one"), ("two") as data(a, b) -- !query 12 schema struct<> -- !query 12 output org.apache.spark.sql.AnalysisException -Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 +expected 2 columns but found 1 in first row; line 1 pos 14 -- !query 13 -select * from values ("one", count(1)), ("two", 2) as data(a, b) +select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) -- !query 13 schema struct<> -- !query 13 output org.apache.spark.sql.AnalysisException -cannot evaluate expression count(1) in inline table definition; line 1 pos 29 +Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 + + +-- !query 14 +select * from values ("one", count(1)), ("two", 2) as data(a, b) +-- !query 14 schema +struct<> +-- !query 14 output +java.lang.UnsupportedOperationException +Cannot evaluate expression: count(1) From 1b39a97ec27a385d56fd4fa21fecde616807bdbc Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Wed, 17 Aug 2016 17:36:46 -0700 Subject: [PATCH 08/13] Code review --- .../analysis/ResolveInlineTables.scala | 8 +- .../sql/catalyst/parser/AstBuilder.scala | 7 +- .../analysis/ResolveInlineTablesSuite.scala | 17 ++-- .../sql-tests/inputs/inline-table.sql | 3 + .../sql-tests/results/inline-table.sql.out | 86 +++++++++++-------- 5 files changed, 71 insertions(+), 50 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index cdd17b4901ba..2746ddfd9db9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection} +import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.{StructField, StructType} @@ -30,7 +30,7 @@ object ResolveInlineTables extends Rule[LogicalPlan] { override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp { case table: UnresolvedInlineTable if table.expressionsResolved => validateInputDimension(table) - // validateInputFoldable(table) + validateInputEvaluable(table) convert(table) } @@ -39,10 +39,10 @@ object ResolveInlineTables extends Rule[LogicalPlan] { * * This is package visible for unit testing. */ - private[analysis] def validateInputFoldable(table: UnresolvedInlineTable): Unit = { + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { table.rows.foreach { row => row.foreach { e => - if (!e.resolved || !e.foldable) { + if (!e.resolved || e.isInstanceOf[Unevaluable]) { e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index dc7818291ef1..4dcddb7f62b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -664,8 +664,11 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging { // Get the backing expressions. val rows = ctx.expression.asScala.map { e => expression(e) match { - case CreateStruct(children) => children - case child => Seq(child) + // inline table comes in two styles: + // style 1: values (1), (2), (3) -- multiple columns are supported + // style 2: values 1, 2, 3 -- only a single column is supported here + case CreateStruct(children) => children // style 1 + case child => Seq(child) // style 2 } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index 34cdc4fd89c9..8e44f37cf5f4 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -21,6 +21,7 @@ import org.scalatest.BeforeAndAfter import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Literal, Rand} +import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.types.LongType @@ -34,18 +35,22 @@ class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { private def lit(v: Any): Literal = Literal(v) test("validate inputs are foldable") { - ResolveInlineTables.validateInputFoldable( + ResolveInlineTables.validateInputEvaluable( UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1))))) - // nondeterministic (rand) + // nondeterministic (rand) should be fine + ResolveInlineTables.validateInputEvaluable( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Rand(1))))) + + // aggregate should not work intercept[AnalysisException] { - ResolveInlineTables.validateInputFoldable( - UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Rand(1))))) + ResolveInlineTables.validateInputEvaluable( + UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Count(lit(1)))))) } - // unresolved attribute + // unresolved attribute should not work intercept[AnalysisException] { - ResolveInlineTables.validateInputFoldable( + ResolveInlineTables.validateInputEvaluable( UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A"))))) } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 0c8a1d99944f..621c7ff1280b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -8,6 +8,9 @@ select * from values ("one", 1) as data; -- single row select * from values ("one", 1) as data(a, b); +-- single column multiple rows +select * from values 1, 2, 3 as data(a); + -- two rows select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b); diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index f7eec8754af6..2d51ec0b504d 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -1,5 +1,5 @@ -- Automatically generated by SQLQueryTestSuite --- Number of queries: 15 +-- Number of queries: 16 -- !query 0 @@ -27,109 +27,119 @@ one 1 -- !query 3 -select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b) +select * from values 1, 2, 3 as data(a) -- !query 3 schema -struct +struct -- !query 3 output +1 +2 +3 + + +-- !query 4 +select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b) +-- !query 4 schema +struct +-- !query 4 output one 1 three NULL two 2 --- !query 4 +-- !query 5 select * from values ("one", null), ("two", null) as data(a, b) --- !query 4 schema +-- !query 5 schema struct --- !query 4 output +-- !query 5 output one NULL two NULL --- !query 5 +-- !query 6 select * from values ("one", 1), ("two", 2L) as data(a, b) --- !query 5 schema +-- !query 6 schema struct --- !query 5 output +-- !query 6 output one 1 two 2 --- !query 6 +-- !query 7 select * from values ("one", 1 + 0), ("two", 1 + 3L) as data(a, b) --- !query 6 schema +-- !query 7 schema struct --- !query 6 output +-- !query 7 output one 1 two 4 --- !query 7 +-- !query 8 select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b) --- !query 7 schema +-- !query 8 schema struct> --- !query 7 output +-- !query 8 output one [0,1] two [2,3] --- !query 8 +-- !query 9 select * from values ("one", 2.0), ("two", 3.0D) as data(a, b) --- !query 8 schema +-- !query 9 schema struct --- !query 8 output +-- !query 9 output one 2.0 two 3.0 --- !query 9 +-- !query 10 select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) --- !query 9 schema +-- !query 10 schema struct --- !query 9 output +-- !query 10 output one 0.087440518337355 two 3.0 --- !query 10 +-- !query 11 select * from values ("one", 2.0), ("two") as data(a, b) --- !query 10 schema +-- !query 11 schema struct<> --- !query 10 output +-- !query 11 output org.apache.spark.sql.AnalysisException expected 2 columns but found 1 columns in row 1; line 1 pos 14 --- !query 11 +-- !query 12 select * from values ("one", array(0, 1)), ("two", struct(1, 2)) as data(a, b) --- !query 11 schema +-- !query 12 schema struct<> --- !query 11 output +-- !query 12 output org.apache.spark.sql.AnalysisException incompatible types found in column b for inline table; line 1 pos 14 --- !query 12 +-- !query 13 select * from values ("one"), ("two") as data(a, b) --- !query 12 schema +-- !query 13 schema struct<> --- !query 12 output +-- !query 13 output org.apache.spark.sql.AnalysisException expected 2 columns but found 1 in first row; line 1 pos 14 --- !query 13 +-- !query 14 select * from values ("one", random_not_exist_func(1)), ("two", 2) as data(a, b) --- !query 13 schema +-- !query 14 schema struct<> --- !query 13 output +-- !query 14 output org.apache.spark.sql.AnalysisException Undefined function: 'random_not_exist_func'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 29 --- !query 14 +-- !query 15 select * from values ("one", count(1)), ("two", 2) as data(a, b) --- !query 14 schema +-- !query 15 schema struct<> --- !query 14 output -java.lang.UnsupportedOperationException -Cannot evaluate expression: count(1) +-- !query 15 output +org.apache.spark.sql.AnalysisException +cannot evaluate expression count(1) in inline table definition; line 1 pos 29 From 2e6843844d126e2ba466fe6b34ea59b3b67942c7 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Wed, 17 Aug 2016 21:53:25 -0700 Subject: [PATCH 09/13] small code review --- .../spark/sql/catalyst/analysis/ResolveInlineTables.scala | 6 +----- .../src/test/resources/sql-tests/inputs/inline-table.sql | 2 +- .../test/resources/sql-tests/results/inline-table.sql.out | 2 +- 3 files changed, 3 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 2746ddfd9db9..79935d09437b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -58,16 +58,12 @@ object ResolveInlineTables extends Rule[LogicalPlan] { */ private[analysis] def validateInputDimension(table: UnresolvedInlineTable): Unit = { if (table.rows.nonEmpty) { - val numCols = table.rows.head.size + val numCols = table.names.size table.rows.zipWithIndex.foreach { case (row, ri) => if (row.size != numCols) { table.failAnalysis(s"expected $numCols columns but found ${row.size} columns in row $ri") } } - - if (table.names.size != numCols) { - table.failAnalysis(s"expected ${table.names.size} columns but found $numCols in first row") - } } } diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index 621c7ff1280b..ffe0da3ace9e 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -11,7 +11,7 @@ select * from values ("one", 1) as data(a, b); -- single column multiple rows select * from values 1, 2, 3 as data(a); --- two rows +-- three rows select * from values ("one", 1), ("two", 2), ("three", null) as data(a, b); -- null type diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index 2d51ec0b504d..72e6dc9aaa3b 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -124,7 +124,7 @@ select * from values ("one"), ("two") as data(a, b) struct<> -- !query 13 output org.apache.spark.sql.AnalysisException -expected 2 columns but found 1 in first row; line 1 pos 14 +expected 2 columns but found 1 columns in row 0; line 1 pos 14 -- !query 14 From fb9de341aa5c43907ab4a51a9187434f13defcd3 Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Wed, 17 Aug 2016 22:56:01 -0700 Subject: [PATCH 10/13] Add comment explaining attribute reference. --- .../sql/catalyst/analysis/ResolveInlineTables.scala | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index 79935d09437b..ec628c9c0aab 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -35,13 +35,18 @@ object ResolveInlineTables extends Rule[LogicalPlan] { } /** - * Validates that all inline table data are foldable expressions. + * Validates that all inline table data are valid expressions that can be evaluated. * * This is package visible for unit testing. */ private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { table.rows.foreach { row => row.foreach { e => + // We want to support foldable expressions and nondeterministic expressions (e.g. rand) + // that are evaluable. + // Note that there are expressions that are evaluable but not actually valid for inline + // tables. One example is AttributeReference. However, since UnresolvedInlineTable is a + // leaf node, the analyzer would not resolve UnresolvedAttribute into AttributeReference. if (!e.resolved || e.isInstanceOf[Unevaluable]) { e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") } @@ -76,8 +81,6 @@ object ResolveInlineTables extends Rule[LogicalPlan] { * This is package visible for unit testing. */ private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = { - val numCols = table.rows.head.size - // For each column, traverse all the values and find a common data type. val targetTypes = table.rows.transpose.zip(table.names).map { case (column, name) => val inputTypes = column.map(_.dataType) From aed7c5ee030472e001e1eee7bd7f88ac21b16a9a Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Thu, 18 Aug 2016 12:20:39 -0700 Subject: [PATCH 11/13] nullability and remove rand --- .../analysis/ResolveInlineTables.scala | 70 +++++++++++-------- .../analysis/ResolveInlineTablesSuite.scala | 24 +++++-- .../sql-tests/results/inline-table.sql.out | 6 +- 3 files changed, 60 insertions(+), 40 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala index ec628c9c0aab..7323197b10f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTables.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.catalyst.analysis +import scala.util.control.NonFatal + import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Cast, InterpretedProjection, Unevaluable} +import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.types.{StructField, StructType} @@ -34,26 +36,6 @@ object ResolveInlineTables extends Rule[LogicalPlan] { convert(table) } - /** - * Validates that all inline table data are valid expressions that can be evaluated. - * - * This is package visible for unit testing. - */ - private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { - table.rows.foreach { row => - row.foreach { e => - // We want to support foldable expressions and nondeterministic expressions (e.g. rand) - // that are evaluable. - // Note that there are expressions that are evaluable but not actually valid for inline - // tables. One example is AttributeReference. However, since UnresolvedInlineTable is a - // leaf node, the analyzer would not resolve UnresolvedAttribute into AttributeReference. - if (!e.resolved || e.isInstanceOf[Unevaluable]) { - e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") - } - } - } - } - /** * Validates the input data dimension: * 1. All rows have the same cardinality. @@ -72,6 +54,23 @@ object ResolveInlineTables extends Rule[LogicalPlan] { } } + /** + * Validates that all inline table data are valid expressions that can be evaluated + * (in this they must be foldable). + * + * This is package visible for unit testing. + */ + private[analysis] def validateInputEvaluable(table: UnresolvedInlineTable): Unit = { + table.rows.foreach { row => + row.foreach { e => + // Note that nondeterministic expressions are not supported since they are not foldable. + if (!e.resolved || !e.foldable) { + e.failAnalysis(s"cannot evaluate expression ${e.sql} in inline table definition") + } + } + } + } + /** * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable]] * into a [[LocalRelation]]. @@ -81,24 +80,33 @@ object ResolveInlineTables extends Rule[LogicalPlan] { * This is package visible for unit testing. */ private[analysis] def convert(table: UnresolvedInlineTable): LocalRelation = { - // For each column, traverse all the values and find a common data type. - val targetTypes = table.rows.transpose.zip(table.names).map { case (column, name) => + // For each column, traverse all the values and find a common data type and nullability. + val fields = table.rows.transpose.zip(table.names).map { case (column, name) => val inputTypes = column.map(_.dataType) - TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { + val tpe = TypeCoercion.findWiderTypeWithoutStringPromotion(inputTypes).getOrElse { table.failAnalysis(s"incompatible types found in column $name for inline table") } + StructField(name, tpe, nullable = column.exists(_.nullable)) } - assert(targetTypes.size == table.names.size) + val attributes = StructType(fields).toAttributes + assert(fields.size == table.names.size) val newRows: Seq[InternalRow] = table.rows.map { row => - new InterpretedProjection(row.zipWithIndex.map { case (e, ci) => - val targetType = targetTypes(ci) - if (e.dataType.sameType(targetType)) e else Cast(e, targetType) - }).apply(null) + InternalRow.fromSeq(row.zipWithIndex.map { case (e, ci) => + val targetType = fields(ci).dataType + try { + if (e.dataType.sameType(targetType)) { + e.eval() + } else { + Cast(e, targetType).eval() + } + } catch { + case NonFatal(ex) => + table.failAnalysis(s"failed to evaluate expression ${e.sql}: ${ex.getMessage}") + } + }) } - val attributes = StructType(targetTypes.zip(table.names) - .map { case (typ, name) => StructField(name, typ) }).toAttributes LocalRelation(attributes, newRows) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala index 8e44f37cf5f4..920c6ea50f4b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/ResolveInlineTablesSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.{Literal, Rand} import org.apache.spark.sql.catalyst.expressions.aggregate.Count import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.types.LongType +import org.apache.spark.sql.types.{LongType, NullType} /** * Unit tests for [[ResolveInlineTables]]. Note that there are also test cases defined in @@ -38,20 +38,22 @@ class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { ResolveInlineTables.validateInputEvaluable( UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(lit(1))))) - // nondeterministic (rand) should be fine - ResolveInlineTables.validateInputEvaluable( - UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Rand(1))))) + // nondeterministic (rand) should not work + intercept[AnalysisException] { + ResolveInlineTables.validateInputEvaluable( + UnresolvedInlineTable(Seq("c1"), Seq(Seq(Rand(1))))) + } // aggregate should not work intercept[AnalysisException] { ResolveInlineTables.validateInputEvaluable( - UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(Count(lit(1)))))) + UnresolvedInlineTable(Seq("c1"), Seq(Seq(Count(lit(1)))))) } // unresolved attribute should not work intercept[AnalysisException] { ResolveInlineTables.validateInputEvaluable( - UnresolvedInlineTable(Seq("c1", "c2"), Seq(Seq(UnresolvedAttribute("A"))))) + UnresolvedInlineTable(Seq("c1"), Seq(Seq(UnresolvedAttribute("A"))))) } } @@ -86,4 +88,14 @@ class ResolveInlineTablesSuite extends PlanTest with BeforeAndAfter { assert(converted.data(0).getLong(0) == 1L) assert(converted.data(1).getLong(0) == 2L) } + + test("nullability inference in convert") { + val table1 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(lit(2L)))) + val converted1 = ResolveInlineTables.convert(table1) + assert(!converted1.schema.fields(0).nullable) + + val table2 = UnresolvedInlineTable(Seq("c1"), Seq(Seq(lit(1)), Seq(Literal(null, NullType)))) + val converted2 = ResolveInlineTables.convert(table2) + assert(converted2.schema.fields(0).nullable) + } } diff --git a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out index 72e6dc9aaa3b..de6f01b8de77 100644 --- a/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/inline-table.sql.out @@ -94,10 +94,10 @@ two 3.0 -- !query 10 select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b) -- !query 10 schema -struct +struct<> -- !query 10 output -one 0.087440518337355 -two 3.0 +org.apache.spark.sql.AnalysisException +cannot evaluate expression rand(5) in inline table definition; line 1 pos 29 -- !query 11 From 285b9416850274ece5ceca2565904934a0f4ab6a Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Thu, 18 Aug 2016 12:25:07 -0700 Subject: [PATCH 12/13] updated test case comment --- sql/core/src/test/resources/sql-tests/inputs/inline-table.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql index ffe0da3ace9e..5107fa4d5553 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/inline-table.sql @@ -29,7 +29,7 @@ select * from values ("one", array(0, 1)), ("two", array(2, 3)) as data(a, b); -- decimal and double coercion select * from values ("one", 2.0), ("two", 3.0D) as data(a, b); --- nondeterministic function rand +-- error reporting: nondeterministic function rand select * from values ("one", rand(5)), ("two", 3.0D) as data(a, b); -- error reporting: different number of columns From 88e727260f80819c58bb3cef3ba79d9bf86990fe Mon Sep 17 00:00:00 2001 From: petermaxlee Date: Thu, 18 Aug 2016 15:55:17 -0700 Subject: [PATCH 13/13] Fix merge bug --- .../org/apache/spark/sql/catalyst/analysis/unresolved.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index 4e1de5cbcdea..235ae0478245 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -62,6 +62,7 @@ case class UnresolvedInlineTable( extends LeafNode { lazy val expressionsResolved: Boolean = rows.forall(_.forall(_.resolved)) + override lazy val resolved = false override def output: Seq[Attribute] = Nil }