1717
1818package org .apache .spark .sql .catalyst .analysis
1919
20+ import scala .util .control .NonFatal
21+
2022import org .apache .spark .sql .catalyst .InternalRow
21- import org .apache .spark .sql .catalyst .expressions .{ Cast , InterpretedProjection , Unevaluable }
23+ import org .apache .spark .sql .catalyst .expressions .Cast
2224import org .apache .spark .sql .catalyst .plans .logical .{LocalRelation , LogicalPlan }
2325import org .apache .spark .sql .catalyst .rules .Rule
2426import org .apache .spark .sql .types .{StructField , StructType }
@@ -34,26 +36,6 @@ object ResolveInlineTables extends Rule[LogicalPlan] {
3436 convert(table)
3537 }
3638
37- /**
38- * Validates that all inline table data are valid expressions that can be evaluated.
39- *
40- * This is package visible for unit testing.
41- */
42- private [analysis] def validateInputEvaluable (table : UnresolvedInlineTable ): Unit = {
43- table.rows.foreach { row =>
44- row.foreach { e =>
45- // We want to support foldable expressions and nondeterministic expressions (e.g. rand)
46- // that are evaluable.
47- // Note that there are expressions that are evaluable but not actually valid for inline
48- // tables. One example is AttributeReference. However, since UnresolvedInlineTable is a
49- // leaf node, the analyzer would not resolve UnresolvedAttribute into AttributeReference.
50- if (! e.resolved || e.isInstanceOf [Unevaluable ]) {
51- e.failAnalysis(s " cannot evaluate expression ${e.sql} in inline table definition " )
52- }
53- }
54- }
55- }
56-
5739 /**
5840 * Validates the input data dimension:
5941 * 1. All rows have the same cardinality.
@@ -72,6 +54,23 @@ object ResolveInlineTables extends Rule[LogicalPlan] {
7254 }
7355 }
7456
57+ /**
58+ * Validates that all inline table data are valid expressions that can be evaluated
59+ * (in this they must be foldable).
60+ *
61+ * This is package visible for unit testing.
62+ */
63+ private [analysis] def validateInputEvaluable (table : UnresolvedInlineTable ): Unit = {
64+ table.rows.foreach { row =>
65+ row.foreach { e =>
66+ // Note that nondeterministic expressions are not supported since they are not foldable.
67+ if (! e.resolved || ! e.foldable) {
68+ e.failAnalysis(s " cannot evaluate expression ${e.sql} in inline table definition " )
69+ }
70+ }
71+ }
72+ }
73+
7574 /**
7675 * Convert a valid (with right shape and foldable inputs) [[UnresolvedInlineTable ]]
7776 * into a [[LocalRelation ]].
@@ -81,24 +80,33 @@ object ResolveInlineTables extends Rule[LogicalPlan] {
8180 * This is package visible for unit testing.
8281 */
8382 private [analysis] def convert (table : UnresolvedInlineTable ): LocalRelation = {
84- // For each column, traverse all the values and find a common data type.
85- val targetTypes = table.rows.transpose.zip(table.names).map { case (column, name) =>
83+ // For each column, traverse all the values and find a common data type and nullability .
84+ val fields = table.rows.transpose.zip(table.names).map { case (column, name) =>
8685 val inputTypes = column.map(_.dataType)
87- TypeCoercion .findWiderTypeWithoutStringPromotion(inputTypes).getOrElse {
86+ val tpe = TypeCoercion .findWiderTypeWithoutStringPromotion(inputTypes).getOrElse {
8887 table.failAnalysis(s " incompatible types found in column $name for inline table " )
8988 }
89+ StructField (name, tpe, nullable = column.exists(_.nullable))
9090 }
91- assert(targetTypes.size == table.names.size)
91+ val attributes = StructType (fields).toAttributes
92+ assert(fields.size == table.names.size)
9293
9394 val newRows : Seq [InternalRow ] = table.rows.map { row =>
94- new InterpretedProjection (row.zipWithIndex.map { case (e, ci) =>
95- val targetType = targetTypes(ci)
96- if (e.dataType.sameType(targetType)) e else Cast (e, targetType)
97- }).apply(null )
95+ InternalRow .fromSeq(row.zipWithIndex.map { case (e, ci) =>
96+ val targetType = fields(ci).dataType
97+ try {
98+ if (e.dataType.sameType(targetType)) {
99+ e.eval()
100+ } else {
101+ Cast (e, targetType).eval()
102+ }
103+ } catch {
104+ case NonFatal (ex) =>
105+ table.failAnalysis(s " failed to evaluate expression ${e.sql}: ${ex.getMessage}" )
106+ }
107+ })
98108 }
99109
100- val attributes = StructType (targetTypes.zip(table.names)
101- .map { case (typ, name) => StructField (name, typ) }).toAttributes
102110 LocalRelation (attributes, newRows)
103111 }
104112}
0 commit comments