From d258b47f49f34b7d1cc51203c34505cef92793f9 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 10 Mar 2016 18:51:47 +0800 Subject: [PATCH 01/10] DataFrame.col should return unresolved attribute --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../org/apache/spark/sql/DataFrame.scala | 61 +++++-------------- .../spark/sql/DataFrameNaFunctions.scala | 2 +- .../org/apache/spark/sql/SQLContext.scala | 12 ++-- .../spark/sql/ColumnExpressionSuite.scala | 2 +- .../apache/spark/sql/DataFrameJoinSuite.scala | 21 ------- .../org/apache/spark/sql/DataFrameSuite.scala | 20 +++++- .../spark/sql/execution/PlannerSuite.scala | 9 +-- .../sql/execution/joins/InnerJoinSuite.scala | 10 +-- .../sql/execution/joins/OuterJoinSuite.scala | 6 +- .../sql/execution/joins/SemiJoinSuite.scala | 6 +- .../sql/hive/HiveDataFrameJoinSuite.scala | 38 ------------ .../execution/ScriptTransformationSuite.scala | 8 +-- 13 files changed, 64 insertions(+), 133 deletions(-) delete mode 100644 sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 53ea3cfef678..c79d772f1de6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -580,7 +580,7 @@ class Analyzer( def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = { expressions.map { - case a: Alias => Alias(a.child, a.name)(isGenerated = a.isGenerated) + case a: Alias => a.newInstance() case other => other } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 1ea7db038868..5644d020f8e0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql import java.io.CharArrayWriter +import java.util.concurrent.atomic.AtomicLong import scala.collection.JavaConverters._ import scala.language.implicitConversions @@ -56,8 +57,12 @@ private[sql] object Dataset { def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { val qe = sqlContext.executePlan(logicalPlan) qe.assertAnalyzed() - new Dataset[Row](sqlContext, logicalPlan, RowEncoder(qe.analyzed.schema)) + new Dataset[Row](sqlContext, qe, RowEncoder(qe.analyzed.schema)) } + + private[this] val nextDataFrameId = new AtomicLong(0) + + def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" } /** @@ -249,7 +254,7 @@ class Dataset[T] private[sql]( s"New column names (${colNames.size}): " + colNames.mkString(", ")) val newCols = logicalPlan.output.zip(colNames).map { case (oldAttribute, newName) => - Column(oldAttribute).as(newName) + Column(Alias(oldAttribute, newName)(qualifiers = oldAttribute.qualifiers)) } select(newCols : _*) } @@ -551,47 +556,8 @@ class Dataset[T] private[sql]( * @group dfops * @since 1.3.0 */ - def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = { - // Note that in this function, we introduce a hack in the case of self-join to automatically - // resolve ambiguous join conditions into ones that might make sense [SPARK-6231]. - // Consider this case: df.join(df, df("key") === df("key")) - // Since df("key") === df("key") is a trivially true condition, this actually becomes a - // cartesian join. However, most likely users expect to perform a self join using "key". - // With that assumption, this hack turns the trivially true condition into equality on join - // keys that are resolved to both sides. - - // Trigger analysis so in the case of self-join, the analyzer will clone the plan. - // After the cloning, left and right side will have distinct expression ids. - val plan = withPlan( - Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr))) - .queryExecution.analyzed.asInstanceOf[Join] - - // If auto self join alias is disabled, return the plan. - if (!sqlContext.conf.dataFrameSelfJoinAutoResolveAmbiguity) { - return withPlan(plan) - } - - // If left/right have no output set intersection, return the plan. - val lanalyzed = withPlan(this.logicalPlan).queryExecution.analyzed - val ranalyzed = withPlan(right.logicalPlan).queryExecution.analyzed - if (lanalyzed.outputSet.intersect(ranalyzed.outputSet).isEmpty) { - return withPlan(plan) - } - - // Otherwise, find the trivially true predicates and automatically resolves them to both sides. - // By the time we get here, since we have already run analysis, all attributes should've been - // resolved and become AttributeReference. - val cond = plan.condition.map { _.transform { - case catalyst.expressions.EqualTo(a: AttributeReference, b: AttributeReference) - if a.sameRef(b) => - catalyst.expressions.EqualTo( - withPlan(plan.left).resolve(a.name), - withPlan(plan.right).resolve(b.name)) - }} - - withPlan { - plan.copy(condition = cond) - } + def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = withPlan { + Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)) } /** @@ -741,8 +707,11 @@ class Dataset[T] private[sql]( case "*" => Column(ResolvedStar(queryExecution.analyzed.output)) case _ => - val expr = resolve(colName) - Column(expr) + val col = resolve(colName) match { + case attr: Attribute => UnresolvedAttribute(attr.qualifiers :+ attr.name) + case Alias(child, _) => UnresolvedAttribute.quotedString(child.sql) + } + Column(col) } /** @@ -1451,7 +1420,7 @@ class Dataset[T] private[sql]( if (shouldRename) { val columns = output.map { col => if (resolver(col.name, existingName)) { - Column(col).as(newName) + Column(Alias(col, newName)(qualifiers = col.qualifiers)) } else { Column(col) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index f7be5f6b370a..811611471201 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -418,7 +418,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * TODO: This can be optimized to use broadcast join when replacementMap is large. */ private def replaceCol(col: StructField, replacementMap: Map[_, _]): Column = { - val keyExpr = df.col(col.name).expr + val keyExpr = df.resolve(col.name) def buildExpr(v: Any) = Cast(Literal(v), keyExpr.dataType) val branches = replacementMap.flatMap { case (source, target) => Seq(buildExpr(source), buildExpr(target)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 36fe57f78be1..df48b6a08e42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range, SubqueryAlias} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand @@ -374,7 +374,8 @@ class SQLContext private[sql]( val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) - Dataset.newDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self)) + val relation = LogicalRDD(attributeSeq, rowRDD)(self) + Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) } /** @@ -389,7 +390,8 @@ class SQLContext private[sql]( SQLContext.setActive(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes - Dataset.newDataFrame(self, LocalRelation.fromProduct(attributeSeq, data)) + val relation = LocalRelation.fromProduct(attributeSeq, data) + Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) } /** @@ -453,8 +455,8 @@ class SQLContext private[sql]( } else { rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)} } - val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) - Dataset.newDataFrame(this, logicalPlan) + val relation = LogicalRDD(schema.toAttributes, catalystRows)(self) + Dataset.newDataFrame(this, SubqueryAlias(Dataset.newDataFrameName, relation)) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala index c2434e46f7ec..6785ec89f3ae 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ColumnExpressionSuite.scala @@ -175,7 +175,7 @@ class ColumnExpressionSuite extends QueryTest with SharedSQLContext { val exploded = df.select(explode('intList).as('i)) checkAnswer( - exploded.join(exploded, exploded("i") === exploded("i")).agg(count("*")), + exploded.join(exploded, "i").agg(count("*")), Row(3) :: Nil) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala index 067a62d011ec..9aaea5deb318 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala @@ -115,27 +115,6 @@ class DataFrameJoinSuite extends QueryTest with SharedSQLContext { Row("1", 1) :: Row("2", 1) :: Row("3", 1) :: Nil) } - test("[SPARK-6231] join - self join auto resolve ambiguity") { - val df = Seq((1, "1"), (2, "2")).toDF("key", "value") - checkAnswer( - df.join(df, df("key") === df("key")), - Row(1, "1", 1, "1") :: Row(2, "2", 2, "2") :: Nil) - - checkAnswer( - df.join(df.filter($"value" === "2"), df("key") === df("key")), - Row(2, "2", 2, "2") :: Nil) - - checkAnswer( - df.join(df, df("key") === df("key") && df("value") === 1), - Row(1, "1", 1, "1") :: Nil) - - val left = df.groupBy("key").agg(count("*")) - val right = df.groupBy("key").agg(sum("key")) - checkAnswer( - left.join(right, left("key") === right("key")), - Row(1, 1, 1, 1) :: Row(2, 1, 2, 2) :: Nil) - } - test("broadcast join hint") { val df1 = Seq((1, "1"), (2, "2")).toDF("key", "value") val df2 = Seq((1, "1"), (2, "2")).toDF("key", "value") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e6e27ec413bb..c527b4421d18 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -53,7 +53,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("dataframe toString") { assert(testData.toString === "[key: int, value: string]") - assert(testData("key").toString === "key") + assert(testData("key").toString.endsWith("key")) assert($"test".toString === "test") } @@ -1375,4 +1375,22 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(e.getStackTrace.head.getClassName != classOf[QueryExecution].getName) } + + test("Un-direct self-join") { + val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") + val df2 = df.filter($"i" > 0) + + intercept[AnalysisException](df.join(df2, (df("i") + 1) === df2("i"))) + + val namedDf = df.as("x") + val namedDf2 = df2.as("y") + checkAnswer( + namedDf.join(namedDf2, (namedDf("i") + 1) === namedDf2("i")), + Row(1, "a", 2, "b") :: Nil + ) + checkAnswer( + namedDf.join(namedDf2, ($"x.i" + 1) === $"y.i"), + Row(1, "a", 2, "b") :: Nil + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index ab0a7ff62896..f8564ea60cd5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -53,18 +53,18 @@ class PlannerSuite extends SharedSQLContext { } test("count is partially aggregated") { - val query = testData.groupBy('value).agg(count('key)).queryExecution.analyzed + val query = testData.groupBy('value).agg(count('key)).queryExecution.optimizedPlan testPartialAggregationPlan(query) } test("count distinct is partially aggregated") { - val query = testData.groupBy('value).agg(countDistinct('key)).queryExecution.analyzed + val query = testData.groupBy('value).agg(countDistinct('key)).queryExecution.optimizedPlan testPartialAggregationPlan(query) } test("mixed aggregates are partially aggregated") { val query = - testData.groupBy('value).agg(count('value), countDistinct('key)).queryExecution.analyzed + testData.groupBy('value).agg(count('value), countDistinct('key)).queryExecution.optimizedPlan testPartialAggregationPlan(query) } @@ -167,7 +167,8 @@ class PlannerSuite extends SharedSQLContext { val query = testData.select('key, 'value).sort('key).limit(2) val planned = query.queryExecution.executedPlan assert(planned.isInstanceOf[execution.TakeOrderedAndProject]) - assert(planned.output === testData.select('key, 'value).logicalPlan.output) + assert(planned.output === + testData.select('key, 'value).logicalPlan.output.map(_.withQualifiers(Nil))) } test("terminal limit -> project -> sort should use TakeOrderedAndProject") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala index 7eb15249ebbd..d14d2fe68f2b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/InnerJoinSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{EqualNullSafe, EqualTo, Expression} import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.logical.Join @@ -179,7 +179,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { "inner join, one match per row", myUpperCaseData, myLowerCaseData, - () => (myUpperCaseData.col("N") === myLowerCaseData.col("n")).expr, + () => EqualTo(myUpperCaseData.resolve("N"), myLowerCaseData.resolve("n")), Seq( (1, "A", 1, "a"), (2, "B", 2, "b"), @@ -195,7 +195,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { "inner join, multiple matches", left, right, - () => (left.col("a") === right.col("a")).expr, + () => EqualTo(left.resolve("a"), right.resolve("a")), Seq( (1, 1, 1, 1), (1, 1, 1, 2), @@ -212,7 +212,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { "inner join, no matches", left, right, - () => (left.col("a") === right.col("a")).expr, + () => EqualTo(left.resolve("a"), right.resolve("a")), Seq.empty ) } @@ -224,7 +224,7 @@ class InnerJoinSuite extends SparkPlanTest with SharedSQLContext { "inner join, null safe", left, right, - () => (left.col("b") <=> right.col("b")).expr, + () => EqualNullSafe(left.resolve("b"), right.resolve("b")), Seq( (1, 0, 1, 0), (2, null, 2, null) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala index 0d1c29fe574a..e22c7bc7b56f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/OuterJoinSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan} +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, LessThan} import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.Join @@ -57,8 +57,8 @@ class OuterJoinSuite extends SparkPlanTest with SharedSQLContext { )), new StructType().add("c", IntegerType).add("d", DoubleType)) private lazy val condition = { - And((left.col("a") === right.col("c")).expr, - LessThan(left.col("b").expr, right.col("d").expr)) + And(EqualTo(left.resolve("a"), right.resolve("c")), + LessThan(left.resolve("b"), right.resolve("d"))) } // Note: the input dataframes and expression must be evaluated lazily because diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala index bc341db5571b..53e4ad4a1844 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/joins/SemiJoinSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.joins import org.apache.spark.sql.{DataFrame, Row} -import org.apache.spark.sql.catalyst.expressions.{And, Expression, LessThan} +import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Expression, LessThan} import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys import org.apache.spark.sql.catalyst.plans.{Inner, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.Join @@ -54,8 +54,8 @@ class SemiJoinSuite extends SparkPlanTest with SharedSQLContext { )), new StructType().add("c", IntegerType).add("d", DoubleType)) private lazy val condition = { - And((left.col("a") === right.col("c")).expr, - LessThan(left.col("b").expr, right.col("d").expr)) + And(EqualTo(left.resolve("a"), right.resolve("c")), + LessThan(left.resolve("b"), right.resolve("d"))) } // Note: the input dataframes and expression must be evaluated lazily because diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala deleted file mode 100644 index 63cf5030ab8b..000000000000 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDataFrameJoinSuite.scala +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.hive - -import org.apache.spark.sql.{QueryTest, Row} -import org.apache.spark.sql.hive.test.TestHiveSingleton - -class HiveDataFrameJoinSuite extends QueryTest with TestHiveSingleton { - import hiveContext.implicits._ - - // We should move this into SQL package if we make case sensitivity configurable in SQL. - test("join - self join auto resolve ambiguity with case insensitivity") { - val df = Seq((1, "1"), (2, "2")).toDF("key", "value") - checkAnswer( - df.join(df, df("key") === df("Key")), - Row(1, "1", 1, "1") :: Row(2, "2", 2, "2") :: Nil) - - checkAnswer( - df.join(df.filter($"value" === "2"), df("key") === df("Key")), - Row(2, "2", 2, "2") :: Nil) - } - -} diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala index 8f163f27c94c..2ebcdd7f4fa3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala @@ -53,7 +53,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { checkAnswer( rowsDf, (child: SparkPlan) => new ScriptTransformation( - input = Seq(rowsDf.col("a").expr), + input = Seq(rowsDf.resolve("a")), script = "cat", output = Seq(AttributeReference("a", StringType)()), child = child, @@ -67,7 +67,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { checkAnswer( rowsDf, (child: SparkPlan) => new ScriptTransformation( - input = Seq(rowsDf.col("a").expr), + input = Seq(rowsDf.resolve("a")), script = "cat", output = Seq(AttributeReference("a", StringType)()), child = child, @@ -82,7 +82,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { checkAnswer( rowsDf, (child: SparkPlan) => new ScriptTransformation( - input = Seq(rowsDf.col("a").expr), + input = Seq(rowsDf.resolve("a")), script = "cat", output = Seq(AttributeReference("a", StringType)()), child = ExceptionInjectingOperator(child), @@ -99,7 +99,7 @@ class ScriptTransformationSuite extends SparkPlanTest with TestHiveSingleton { checkAnswer( rowsDf, (child: SparkPlan) => new ScriptTransformation( - input = Seq(rowsDf.col("a").expr), + input = Seq(rowsDf.resolve("a")), script = "cat", output = Seq(AttributeReference("a", StringType)()), child = ExceptionInjectingOperator(child), From f3df4f62c966213f55bc19dd3579c6ca42f8446f Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 14 Mar 2016 18:01:27 +0800 Subject: [PATCH 02/10] fix python --- .../scala/org/apache/spark/sql/Column.scala | 18 ++++++++++++++---- .../scala/org/apache/spark/sql/DataFrame.scala | 4 +++- .../org/apache/spark/sql/SQLContext.scala | 4 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- 4 files changed, 20 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index f7ba61d2b804..635177cfaaf2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -133,7 +133,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { case jt: JsonTuple => MultiAlias(jt, Nil) - case func: UnresolvedFunction => UnresolvedAlias(func, Some(usePrettyExpression(func).sql)) + case func: UnresolvedFunction => UnresolvedAlias(func, Some(presentableExpression(func).sql)) // If we have a top level Cast, there is a chance to give it a better alias, if there is a // NamedExpression under this Cast. @@ -141,13 +141,23 @@ class Column(protected[sql] val expr: Expression) extends Logging { case Cast(ne: NamedExpression, to) => UnresolvedAlias(Cast(ne, to)) } match { case ne: NamedExpression => ne - case other => Alias(expr, usePrettyExpression(expr).sql)() + case other => Alias(expr, presentableExpression(expr).sql)() } - case expr: Expression => Alias(expr, usePrettyExpression(expr).sql)() + case expr: Expression => Alias(expr, presentableExpression(expr).sql)() } - override def toString: String = usePrettyExpression(expr).sql + override def toString: String = presentableExpression(expr).sql + + private def presentableExpression(expr: Expression): Expression = { + usePrettyExpression(expr transform { + case u: UnresolvedAttribute if u.nameParts.head.startsWith(Dataset.namePrefix) => + u.copy(nameParts = u.nameParts.drop(1)) + + case a: AttributeReference if a.qualifiers.length == 1 && + a.qualifiers.head.startsWith(Dataset.namePrefix) => a.withQualifiers(Nil) + }) + } override def equals(that: Any): Boolean = that match { case that: Column => that.expr.equals(this.expr) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 5644d020f8e0..f414119f6cd8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -62,7 +62,9 @@ private[sql] object Dataset { private[this] val nextDataFrameId = new AtomicLong(0) - def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" + val namePrefix: String = "dataframe_" + + def newDataFrameName: String = namePrefix + nextDataFrameId.getAndIncrement() } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index df48b6a08e42..de9340993308 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -895,9 +895,9 @@ class SQLContext private[sql]( protected[sql] def applySchemaToPythonRDD( rdd: RDD[Array[Any]], schema: StructType): DataFrame = { - val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow]) - Dataset.newDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) + val relation = LogicalRDD(schema.toAttributes, rowRdd)(self) + Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c527b4421d18..74bcf2eddc8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -53,7 +53,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("dataframe toString") { assert(testData.toString === "[key: int, value: string]") - assert(testData("key").toString.endsWith("key")) + assert(testData("key").toString === "key") assert($"test".toString === "test") } From 6faf40e554d28ef625d3d818328254fe16849a52 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 16 Mar 2016 10:25:34 +0800 Subject: [PATCH 03/10] fix toString --- .../spark/sql/catalyst/util/package.scala | 5 +++++ .../scala/org/apache/spark/sql/Column.scala | 18 ++++-------------- .../scala/org/apache/spark/sql/Dataset.scala | 4 +--- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index b11365b29718..6f5f2742b48a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst import java.io._ import java.nio.charset.StandardCharsets +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{NumericType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -137,6 +138,10 @@ package object util { // Replaces attributes, string literals, complex type extractors with their pretty form so that // generated column names don't contain back-ticks or double-quotes. def usePrettyExpression(e: Expression): Expression = e transform { + // For unresolved attributes that generated by `DataFrame.col`, we should ignore the generated + // qualifiers to not annoy users. + case u: UnresolvedAttribute if u.nameParts(0).startsWith("dataframe_") => + new PrettyAttribute(u.copy(nameParts = u.nameParts.drop(1))) case a: Attribute => new PrettyAttribute(a) case Literal(s: UTF8String, StringType) => PrettyAttribute(s.toString, StringType) case Literal(v, t: NumericType) if v != null => PrettyAttribute(v.toString, t) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index f0945e95bc3b..1751720a7db8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -133,7 +133,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { case jt: JsonTuple => MultiAlias(jt, Nil) - case func: UnresolvedFunction => UnresolvedAlias(func, Some(presentableExpression(func).sql)) + case func: UnresolvedFunction => UnresolvedAlias(func, Some(usePrettyExpression(func).sql)) // If we have a top level Cast, there is a chance to give it a better alias, if there is a // NamedExpression under this Cast. @@ -141,23 +141,13 @@ class Column(protected[sql] val expr: Expression) extends Logging { case Cast(ne: NamedExpression, to) => UnresolvedAlias(Cast(ne, to)) } match { case ne: NamedExpression => ne - case other => Alias(expr, presentableExpression(expr).sql)() + case other => Alias(expr, usePrettyExpression(expr).sql)() } - case expr: Expression => Alias(expr, presentableExpression(expr).sql)() + case expr: Expression => Alias(expr, usePrettyExpression(expr).sql)() } - override def toString: String = presentableExpression(expr).sql - - private def presentableExpression(expr: Expression): Expression = { - usePrettyExpression(expr transform { - case u: UnresolvedAttribute if u.nameParts.head.startsWith(Dataset.namePrefix) => - u.copy(nameParts = u.nameParts.drop(1)) - - case a: AttributeReference if a.qualifiers.length == 1 && - a.qualifiers.head.startsWith(Dataset.namePrefix) => a.withQualifiers(Nil) - }) - } + override def toString: String = usePrettyExpression(expr).sql override def equals(that: Any): Boolean = that match { case that: Column => that.expr.equals(this.expr) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 19d9de21269b..5533e2d05f73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -62,9 +62,7 @@ private[sql] object Dataset { private[this] val nextDataFrameId = new AtomicLong(0) - val namePrefix: String = "dataframe_" - - def newDataFrameName: String = namePrefix + nextDataFrameId.getAndIncrement() + def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" } /** From f0b653e21616a9f599404738fd81a70f1ae8c9b8 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 16 Mar 2016 14:45:29 +0800 Subject: [PATCH 04/10] update --- .../org/apache/spark/sql/DataFrameReader.scala | 4 ++-- .../scala/org/apache/spark/sql/Dataset.scala | 6 +++++- .../scala/org/apache/spark/sql/SQLContext.scala | 17 +++++++---------- 3 files changed, 14 insertions(+), 13 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 76b8d71ac935..62ab9395c218 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -128,7 +128,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) - Dataset.newDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) + Dataset.newNamedDataFrame(sqlContext, LogicalRelation(dataSource.resolveRelation())) } /** @@ -345,7 +345,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { InferSchema.infer(jsonRDD, sqlContext.conf.columnNameOfCorruptRecord, parsedOptions) } - Dataset.newDataFrame( + Dataset.newNamedDataFrame( sqlContext, LogicalRDD( schema.toAttributes, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 5533e2d05f73..a6e37fd23251 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -60,9 +60,13 @@ private[sql] object Dataset { new Dataset[Row](sqlContext, qe, RowEncoder(qe.analyzed.schema)) } + def newNamedDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { + newDataFrame(sqlContext, SubqueryAlias(newDataFrameName, logicalPlan)) + } + private[this] val nextDataFrameId = new AtomicLong(0) - def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" + private def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 413c757f9a84..17596911f643 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.parser.ParserInterface -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range, SubqueryAlias} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.command.ShowTablesCommand @@ -367,8 +367,7 @@ class SQLContext private[sql]( val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes val rowRDD = RDDConversions.productToRowRdd(rdd, schema.map(_.dataType)) - val relation = LogicalRDD(attributeSeq, rowRDD)(self) - Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) + Dataset.newNamedDataFrame(self, LogicalRDD(attributeSeq, rowRDD)(self)) } /** @@ -383,8 +382,7 @@ class SQLContext private[sql]( SQLContext.setActive(self) val schema = ScalaReflection.schemaFor[A].dataType.asInstanceOf[StructType] val attributeSeq = schema.toAttributes - val relation = LocalRelation.fromProduct(attributeSeq, data) - Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) + Dataset.newNamedDataFrame(self, LocalRelation.fromProduct(attributeSeq, data)) } /** @@ -394,7 +392,7 @@ class SQLContext private[sql]( * @since 1.3.0 */ def baseRelationToDataFrame(baseRelation: BaseRelation): DataFrame = { - Dataset.newDataFrame(this, LogicalRelation(baseRelation)) + Dataset.newNamedDataFrame(this, LogicalRelation(baseRelation)) } /** @@ -448,8 +446,8 @@ class SQLContext private[sql]( } else { rowRDD.map{r: Row => InternalRow.fromSeq(r.toSeq)} } - val relation = LogicalRDD(schema.toAttributes, catalystRows)(self) - Dataset.newDataFrame(this, SubqueryAlias(Dataset.newDataFrameName, relation)) + val logicalPlan = LogicalRDD(schema.toAttributes, catalystRows)(self) + Dataset.newNamedDataFrame(this, logicalPlan) } @@ -887,8 +885,7 @@ class SQLContext private[sql]( rdd: RDD[Array[Any]], schema: StructType): DataFrame = { val rowRdd = rdd.map(r => python.EvaluatePython.fromJava(r, schema).asInstanceOf[InternalRow]) - val relation = LogicalRDD(schema.toAttributes, rowRdd)(self) - Dataset.newDataFrame(self, SubqueryAlias(Dataset.newDataFrameName, relation)) + Dataset.newNamedDataFrame(this, LogicalRDD(schema.toAttributes, rowRdd)(self)) } /** From 387628f74743b839aaa3af808ca83fd4b67cda49 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 21 Mar 2016 17:12:41 +0800 Subject: [PATCH 05/10] cleanup --- .../spark/sql/catalyst/util/package.scala | 5 ----- .../scala/org/apache/spark/sql/Column.scala | 20 ++++++++++--------- .../spark/sql/RelationalGroupedDataset.scala | 9 +-------- 3 files changed, 12 insertions(+), 22 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 6f5f2742b48a..b11365b29718 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.catalyst import java.io._ import java.nio.charset.StandardCharsets -import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{NumericType, StringType} import org.apache.spark.unsafe.types.UTF8String @@ -138,10 +137,6 @@ package object util { // Replaces attributes, string literals, complex type extractors with their pretty form so that // generated column names don't contain back-ticks or double-quotes. def usePrettyExpression(e: Expression): Expression = e transform { - // For unresolved attributes that generated by `DataFrame.col`, we should ignore the generated - // qualifiers to not annoy users. - case u: UnresolvedAttribute if u.nameParts(0).startsWith("dataframe_") => - new PrettyAttribute(u.copy(nameParts = u.nameParts.drop(1))) case a: Attribute => new PrettyAttribute(a) case Literal(s: UTF8String, StringType) => PrettyAttribute(s.toString, StringType) case Literal(v, t: NumericType) if v != null => PrettyAttribute(v.toString, t) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 622a62abad89..87e7fdad46f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -118,11 +118,6 @@ class Column(protected[sql] val expr: Expression) extends Logging { * Returns the expression for this column either with an existing or auto assigned name. */ private[sql] def named: NamedExpression = expr match { - // Wrap UnresolvedAttribute with UnresolvedAlias, as when we resolve UnresolvedAttribute, we - // will remove intermediate Alias for ExtractValue chain, and we need to alias it again to - // make it a NamedExpression. - case u: UnresolvedAttribute => UnresolvedAlias(u) - case u: UnresolvedExtractValue => UnresolvedAlias(u) case expr: NamedExpression => expr @@ -133,7 +128,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { case jt: JsonTuple => MultiAlias(jt, Nil) - case func: UnresolvedFunction => UnresolvedAlias(func, Some(usePrettyExpression(func).sql)) + case func: UnresolvedFunction => UnresolvedAlias(func, Some(toPresentableString(func))) // If we have a top level Cast, there is a chance to give it a better alias, if there is a // NamedExpression under this Cast. @@ -141,13 +136,20 @@ class Column(protected[sql] val expr: Expression) extends Logging { case Cast(ne: NamedExpression, to) => UnresolvedAlias(Cast(ne, to)) } match { case ne: NamedExpression => ne - case other => Alias(expr, usePrettyExpression(expr).sql)() + case other => Alias(expr, toPresentableString(expr))() } - case expr: Expression => Alias(expr, usePrettyExpression(expr).sql)() + case expr: Expression => Alias(expr, toPresentableString(expr))() } - override def toString: String = usePrettyExpression(expr).sql + override def toString: String = toPresentableString(expr) + + private def toPresentableString(expr: Expression): String = usePrettyExpression(expr transform { + // For unresolved attributes that generated by `DataFrame.col`, we should ignore the generated + // qualifiers to not annoy users. + case u: UnresolvedAttribute if u.nameParts(0).startsWith("dataframe_") => + u.copy(nameParts = u.nameParts.drop(1)) + }).sql override def equals(that: Any): Boolean = that match { case that: Column => that.expr.equals(this.expr) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index 521032a8b3a8..6093e62b78a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -67,14 +67,7 @@ class RelationalGroupedDataset protected[sql]( } } - // Wrap UnresolvedAttribute with UnresolvedAlias, as when we resolve UnresolvedAttribute, we - // will remove intermediate Alias for ExtractValue chain, and we need to alias it again to - // make it a NamedExpression. - private[this] def alias(expr: Expression): NamedExpression = expr match { - case u: UnresolvedAttribute => UnresolvedAlias(u) - case expr: NamedExpression => expr - case expr: Expression => Alias(expr, usePrettyExpression(expr).sql)() - } + private[this] def alias(expr: Expression): NamedExpression = Column(expr).named private[this] def aggregateNumericColumns(colNames: String*)(f: Expression => AggregateFunction) : DataFrame = { From 9a5ce1941d25f8db1c2a2827990e225b5f9756e3 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 21 Mar 2016 18:34:51 +0800 Subject: [PATCH 06/10] better error message --- .../sql/catalyst/plans/logical/LogicalPlan.scala | 14 +++++++++++--- .../org/apache/spark/sql/DataFrameSuite.scala | 3 ++- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 0e02ad6057d1..c041546e7035 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -257,9 +257,17 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { // More than one match. case ambiguousReferences => - val referenceNames = ambiguousReferences.map(_._1).mkString(", ") - throw new AnalysisException( - s"Reference '$name' is ambiguous, could be: $referenceNames.") + val qualifiers = ambiguousReferences.flatMap(_._1.qualifiers) + if (qualifiers.nonEmpty && qualifiers.distinct.length == qualifiers.length) { + throw new AnalysisException(s"Reference '$name' is ambiguous, please add a qualifier " + + s"to distinguish it, e.g. '${qualifiers.head}.$name', available qualifiers: " + + qualifiers.mkString(", ")) + } else { + val qualifiedNames = ambiguousReferences.map(_._1.qualifiedName).mkString(", ") + throw new AnalysisException( + s"Input Attributes $qualifiedNames are ambiguous, please eliminate ambiguity " + + "from the inputs first, e.g. alias the left and right plan before join them.") + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 660ebe2aca7c..ed4ecf0c64dc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1381,7 +1381,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") val df2 = df.filter($"i" > 0) - intercept[AnalysisException](df.join(df2, (df("i") + 1) === df2("i"))) + val err = intercept[AnalysisException](df.join(df2, (df("i") + 1) === df2("i"))) + assert(err.message.contains("please eliminate ambiguity from the inputs first")) val namedDf = df.as("x") val namedDf2 = df2.as("y") From 7963244f064c4950781020b929f4fab5feedb2fe Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 23 Mar 2016 12:50:36 +0800 Subject: [PATCH 07/10] fix build --- .../spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 6 +++--- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index f953295afcea..1bff6d0113e1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -257,7 +257,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { // More than one match. case ambiguousReferences => - val qualifiers = ambiguousReferences.flatMap(_._1.qualifiers) + val qualifiers = ambiguousReferences.flatMap(_._1.qualifier) if (qualifiers.nonEmpty && qualifiers.distinct.length == qualifiers.length) { throw new AnalysisException(s"Reference '$name' is ambiguous, please add a qualifier " + s"to distinguish it, e.g. '${qualifiers.head}.$name', available qualifiers: " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 2c37cb3243e9..fdcb24476e63 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -314,7 +314,7 @@ class Dataset[T] private[sql]( s"New column names (${colNames.size}): " + colNames.mkString(", ")) val newCols = logicalPlan.output.zip(colNames).map { case (oldAttribute, newName) => - Column(Alias(oldAttribute, newName)(qualifiers = oldAttribute.qualifiers)) + Column(Alias(oldAttribute, newName)(qualifier = oldAttribute.qualifier)) } select(newCols : _*) } @@ -769,7 +769,7 @@ class Dataset[T] private[sql]( Column(ResolvedStar(queryExecution.analyzed.output)) case _ => val col = resolve(colName) match { - case attr: Attribute => UnresolvedAttribute(attr.qualifiers :+ attr.name) + case attr: Attribute => UnresolvedAttribute(attr.qualifier.toSeq :+ attr.name) case Alias(child, _) => UnresolvedAttribute.quotedString(child.sql) } Column(col) @@ -1585,7 +1585,7 @@ class Dataset[T] private[sql]( if (shouldRename) { val columns = output.map { col => if (resolver(col.name, existingName)) { - Column(Alias(col, newName)(qualifiers = col.qualifiers)) + Column(Alias(col, newName)(qualifier = col.qualifier)) } else { Column(col) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 94d10805ac92..db8f89cb83f2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -169,7 +169,7 @@ class PlannerSuite extends SharedSQLContext { val planned = query.queryExecution.executedPlan assert(planned.isInstanceOf[execution.TakeOrderedAndProject]) assert(planned.output === - testData.select('key, 'value).logicalPlan.output.map(_.withQualifiers(Nil))) + testData.select('key, 'value).logicalPlan.output.map(_.withQualifier(None))) } test("terminal limit -> project -> sort should use TakeOrderedAndProject") { From 94d26417983d364cb68b1b11dfc9260a02c0413e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Wed, 23 Mar 2016 15:23:31 +0800 Subject: [PATCH 08/10] update --- .../main/scala/org/apache/spark/sql/Column.scala | 6 +++--- .../main/scala/org/apache/spark/sql/Dataset.scala | 14 +++++++++++--- .../scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- 3 files changed, 16 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 87e7fdad46f9..30ea721f832a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -145,9 +145,9 @@ class Column(protected[sql] val expr: Expression) extends Logging { override def toString: String = toPresentableString(expr) private def toPresentableString(expr: Expression): String = usePrettyExpression(expr transform { - // For unresolved attributes that generated by `DataFrame.col`, we should ignore the generated - // qualifiers to not annoy users. - case u: UnresolvedAttribute if u.nameParts(0).startsWith("dataframe_") => + // For unresolved attributes that generated by `Dataset.col`, we should ignore the generated + // qualifier to not annoy users. + case u: UnresolvedAttribute if u.nameParts(0).startsWith("dataset_") => u.copy(nameParts = u.nameParts.drop(1)) }).sql diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index fdcb24476e63..af163c5305de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -54,6 +54,12 @@ private[sql] object Dataset { new Dataset(sqlContext, logicalPlan, implicitly[Encoder[T]]) } + def newNamedDataset[T : Encoder]( + sqlContext: SQLContext, + logicalPlan: LogicalPlan): Dataset[T] = { + apply(sqlContext, uniquelyAlias(logicalPlan)) + } + def newDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { val qe = sqlContext.executePlan(logicalPlan) qe.assertAnalyzed() @@ -61,12 +67,14 @@ private[sql] object Dataset { } def newNamedDataFrame(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { - newDataFrame(sqlContext, SubqueryAlias(newDataFrameName, logicalPlan)) + newDataFrame(sqlContext, uniquelyAlias(logicalPlan)) } - private[this] val nextDataFrameId = new AtomicLong(0) + private[this] val nextDatasetId = new AtomicLong(0) - private def newDataFrameName: String = s"dataframe_${nextDataFrameId.getAndIncrement()}" + private def uniquelyAlias(plan: LogicalPlan): SubqueryAlias = { + SubqueryAlias(s"dataset_${nextDatasetId.getAndIncrement()}", plan) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 6614d59a5b9d..6ec9c73f7c86 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -445,7 +445,7 @@ class SQLContext private[sql]( val encoded = data.map(d => enc.toRow(d).copy()) val plan = new LocalRelation(attributes, encoded) - Dataset[T](this, plan) + Dataset.newNamedDataset(self, plan) } def createDataset[T : Encoder](data: RDD[T]): Dataset[T] = { @@ -454,7 +454,7 @@ class SQLContext private[sql]( val encoded = data.map(d => enc.toRow(d)) val plan = LogicalRDD(attributes, encoded)(self) - Dataset[T](this, plan) + Dataset.newNamedDataset(self, plan) } def createDataset[T : Encoder](data: java.util.List[T]): Dataset[T] = { From 86a887747e27e0d1cdb515f39dc26589651e5a50 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Mon, 28 Mar 2016 21:57:00 +0800 Subject: [PATCH 09/10] alias every Dataset --- .../spark/ml/feature/StringIndexer.scala | 2 +- .../spark/ml/feature/OneHotEncoderSuite.scala | 10 +- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../expressions/namedExpressions.scala | 2 +- .../scala/org/apache/spark/sql/Column.scala | 2 +- .../scala/org/apache/spark/sql/Dataset.scala | 113 +++++++++--------- .../spark/sql/KeyValueGroupedDataset.scala | 42 +++---- .../spark/sql/RelationalGroupedDataset.scala | 24 ++-- .../org/apache/spark/sql/SQLContext.scala | 4 +- .../spark/sql/execution/CacheManager.scala | 25 ++-- .../org/apache/spark/sql/DataFrameSuite.scala | 14 +-- .../org/apache/spark/sql/JoinSuite.scala | 20 ++++ .../org/apache/spark/sql/SQLQuerySuite.scala | 3 +- .../spark/sql/execution/PlannerSuite.scala | 20 ++-- .../datasources/FileSourceStrategySuite.scala | 2 +- .../sql/util/DataFrameCallbackSuite.scala | 20 +++- 16 files changed, 161 insertions(+), 144 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala index faa0f6f407b3..217674019e04 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala @@ -172,7 +172,7 @@ class StringIndexerModel ( case _ => dataset } filteredDataset.select(col("*"), - indexer(dataset($(inputCol)).cast(StringType)).as($(outputCol), metadata)) + indexer(filteredDataset($(inputCol)).cast(StringType)).as($(outputCol), metadata)) } override def transformSchema(schema: StructType): StructType = { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala index 49803aef7158..9e59503b4ccd 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/OneHotEncoderSuite.scala @@ -116,11 +116,11 @@ class OneHotEncoderSuite test("OneHotEncoder with varying types") { val df = stringIndexed() val dfWithTypes = df - .withColumn("shortLabel", df("labelIndex").cast(ShortType)) - .withColumn("longLabel", df("labelIndex").cast(LongType)) - .withColumn("intLabel", df("labelIndex").cast(IntegerType)) - .withColumn("floatLabel", df("labelIndex").cast(FloatType)) - .withColumn("decimalLabel", df("labelIndex").cast(DecimalType(10, 0))) + .withColumn("shortLabel", col("labelIndex").cast(ShortType)) + .withColumn("longLabel", col("labelIndex").cast(LongType)) + .withColumn("intLabel", col("labelIndex").cast(IntegerType)) + .withColumn("floatLabel", col("labelIndex").cast(FloatType)) + .withColumn("decimalLabel", col("labelIndex").cast(DecimalType(10, 0))) val cols = Array("labelIndex", "shortLabel", "longLabel", "intLabel", "floatLabel", "decimalLabel") for (col <- cols) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index f4242ed4a521..bf275c88344e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -284,7 +284,7 @@ class Analyzer( s"grouping columns (${x.groupByExprs.mkString(",")})") } case Grouping(col: Expression) => - val idx = x.groupByExprs.indexOf(col) + val idx = x.groupByExprs.indexWhere(_ semanticEquals col) if (idx >= 0) { Cast(BitwiseAnd(ShiftRight(gid, Literal(x.groupByExprs.length - 1 - idx)), Literal(1)), ByteType) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index a5b575816727..b2d57e7cf2cb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -291,7 +291,7 @@ case class AttributeReference( exprId :: qualifier :: isGenerated :: Nil } - override def toString: String = s"$name#${exprId.id}$typeSuffix" + override def toString: String = s"$qualifiedName#${exprId.id}$typeSuffix" // Since the expression id is not in the first constructor it is missing from the default // tree string. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala index 3d580ad2c8f1..2eef40c9a7cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala @@ -147,7 +147,7 @@ class Column(protected[sql] val expr: Expression) extends Logging { private def toPresentableString(expr: Expression): String = usePrettyExpression(expr transform { // For unresolved attributes that generated by `Dataset.col`, we should ignore the generated // qualifier to not annoy users. - case u: UnresolvedAttribute if u.nameParts(0).startsWith("dataset_") => + case u: UnresolvedAttribute if u.nameParts(0).startsWith(Dataset.aliasPrefix) => u.copy(nameParts = u.nameParts.drop(1)) }).sql diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index f9114c2f7533..6c08865fe25e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -57,7 +57,7 @@ private[sql] object Dataset { def withAlias[T : Encoder]( sqlContext: SQLContext, logicalPlan: LogicalPlan): Dataset[T] = { - apply(sqlContext, uniquelyAlias(logicalPlan)) + apply(sqlContext, alias(logicalPlan)) } def ofRows(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { @@ -67,13 +67,15 @@ private[sql] object Dataset { } def ofRowsWithAlias(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { - ofRows(sqlContext, uniquelyAlias(logicalPlan)) + ofRows(sqlContext, alias(logicalPlan)) } private[this] val nextDatasetId = new AtomicLong(0) - private def uniquelyAlias(plan: LogicalPlan): SubqueryAlias = { - SubqueryAlias(s"dataset_${nextDatasetId.getAndIncrement()}", plan) + val aliasPrefix = "dataset_" + + private def alias(plan: LogicalPlan): LogicalPlan = { + SubqueryAlias(aliasPrefix + nextDatasetId.getAndIncrement(), plan) } } @@ -201,6 +203,8 @@ class Dataset[T] private[sql]( } } + private[sql] def originalLogicalPlan = removeGeneratedSubquery(logicalPlan) + /** * An unresolved version of the internal encoder for the type of this [[Dataset]]. This one is * marked implicit so that we can use it when constructing new [[Dataset]] objects that have the @@ -299,7 +303,7 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ @Experimental - def as[U : Encoder]: Dataset[U] = Dataset[U](sqlContext, logicalPlan) + def as[U : Encoder]: Dataset[U] = Dataset.withAlias(sqlContext, originalLogicalPlan) /** * Converts this strongly typed collection of data to generic `DataFrame` with columns renamed. @@ -322,7 +326,7 @@ class Dataset[T] private[sql]( s"New column names (${colNames.size}): " + colNames.mkString(", ")) val newCols = logicalPlan.output.zip(colNames).map { case (oldAttribute, newName) => - Column(Alias(oldAttribute, newName)(qualifier = oldAttribute.qualifier)) + Column(oldAttribute).as(newName) } select(newCols : _*) } @@ -393,7 +397,7 @@ class Dataset[T] private[sql]( * @group basic * @since 1.6.0 */ - def isLocal: Boolean = logicalPlan.isInstanceOf[LocalRelation] + def isLocal: Boolean = originalLogicalPlan.isInstanceOf[LocalRelation] /** * Displays the [[Dataset]] in a tabular form. Strings more than 20 characters will be truncated, @@ -489,9 +493,8 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: DataFrame): DataFrame = withPlan { - Join(logicalPlan, right.logicalPlan, joinType = Inner, None) - } + def join(right: DataFrame): DataFrame = Dataset.ofRows( + sqlContext, Join(logicalPlan, right.logicalPlan, joinType = Inner, None)) /** * Inner equi-join with another [[DataFrame]] using the given column. @@ -567,13 +570,13 @@ class Dataset[T] private[sql]( Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None)) .analyzed.asInstanceOf[Join] - withPlan { + val plan = Join( joined.left, joined.right, UsingJoin(JoinType(joinType), usingColumns.map(UnresolvedAttribute(_))), None) - } + Dataset.ofRows(sqlContext, plan) } /** @@ -611,8 +614,9 @@ class Dataset[T] private[sql]( * @group untypedrel * @since 2.0.0 */ - def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = withPlan { - Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)) + def join(right: DataFrame, joinExprs: Column, joinType: String): DataFrame = { + val plan = Join(logicalPlan, right.logicalPlan, JoinType(joinType), Some(joinExprs.expr)) + Dataset.ofRows(sqlContext, plan) } /** @@ -654,13 +658,11 @@ class Dataset[T] private[sql]( case _ => Alias(CreateStruct(rightOutput), "_2")() } - implicit val tuple2Encoder: Encoder[(T, U)] = + val tuple2Encoder: Encoder[(T, U)] = ExpressionEncoder.tuple(this.unresolvedTEncoder, other.unresolvedTEncoder) - withTypedPlan[(T, U)](other, encoderFor[(T, U)]) { (left, right) => - Project( - leftData :: rightData :: Nil, - joined.analyzed) - } + + val plan = Project(leftData :: rightData :: Nil, joined.analyzed) + new Dataset(sqlContext, plan, tuple2Encoder) } /** @@ -789,9 +791,8 @@ class Dataset[T] private[sql]( * @group typedrel * @since 1.6.0 */ - def as(alias: String): Dataset[T] = withTypedPlan { - SubqueryAlias(alias, logicalPlan) - } + def as(alias: String): Dataset[T] = + new Dataset(sqlContext, SubqueryAlias(alias, originalLogicalPlan), encoder) /** * (Scala-specific) Returns a new [[Dataset]] with an alias set. @@ -880,15 +881,12 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ @Experimental - def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = { - new Dataset[U1]( - sqlContext, - Project( - c1.withInputType( - boundTEncoder, - logicalPlan.output).named :: Nil, - logicalPlan), - implicitly[Encoder[U1]]) + def select[U1: Encoder](c1: TypedColumn[T, U1]): Dataset[U1] = withTypedPlan { + Project( + c1.withInputType( + boundTEncoder, + logicalPlan.output).named :: Nil, + logicalPlan) } /** @@ -900,9 +898,8 @@ class Dataset[T] private[sql]( val encoders = columns.map(_.encoder) val namedColumns = columns.map(_.withInputType(resolvedTEncoder, logicalPlan.output).named) - val execution = new QueryExecution(sqlContext, Project(namedColumns, logicalPlan)) - new Dataset(sqlContext, execution, ExpressionEncoder.tuple(encoders)) + withTypedPlan(Project(namedColumns, logicalPlan))(ExpressionEncoder.tuple(encoders)) } /** @@ -1149,12 +1146,11 @@ class Dataset[T] private[sql]( def groupByKey[K: Encoder](func: T => K): KeyValueGroupedDataset[K, T] = { val inputPlan = logicalPlan val withGroupingKey = AppendColumns(func, inputPlan) - val executed = sqlContext.executePlan(withGroupingKey) new KeyValueGroupedDataset( encoderFor[K], encoderFor[T], - executed, + Dataset.ofRows(sqlContext, withGroupingKey), inputPlan.output, withGroupingKey.newColumns) } @@ -1321,7 +1317,9 @@ class Dataset[T] private[sql]( def union(other: Dataset[T]): Dataset[T] = withTypedPlan { // This breaks caching, but it's usually ok because it addresses a very specific use case: // using union to union many files or partitions. - CombineUnions(Union(logicalPlan, other.logicalPlan)) + CombineUnions(Union( + removeGeneratedSubquery(logicalPlan), + removeGeneratedSubquery(other.logicalPlan))) } /** @@ -1397,8 +1395,9 @@ class Dataset[T] private[sql]( val sum = weights.sum val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _) normalizedCumWeights.sliding(2).map { x => - new Dataset[T]( - sqlContext, Sample(x(0), x(1), withReplacement = false, seed, sorted)(), encoder) + withTypedPlan { + Sample(x(0), x(1), withReplacement = false, seed, sorted)() + } }.toArray } @@ -1559,7 +1558,7 @@ class Dataset[T] private[sql]( if (shouldRename) { val columns = output.map { col => if (resolver(col.name, existingName)) { - Column(Alias(col, newName)(qualifier = col.qualifier)) + Column(col).as(newName) } else { Column(col) } @@ -1616,8 +1615,7 @@ class Dataset[T] private[sql]( u.name, sqlContext.sessionState.analyzer.resolver).getOrElse(u) case Column(expr: Expression) => expr } - val attrs = this.logicalPlan.output - val colsAfterDrop = attrs.filter { attr => + val colsAfterDrop = logicalPlan.output.filter { attr => attr != expression }.map(attr => Column(attr)) select(colsAfterDrop : _*) @@ -1816,11 +1814,8 @@ class Dataset[T] private[sql]( * @since 1.6.0 */ @Experimental - def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = { - new Dataset[U]( - sqlContext, - MapPartitions[T, U](func, logicalPlan), - implicitly[Encoder[U]]) + def mapPartitions[U : Encoder](func: Iterator[T] => Iterator[U]): Dataset[U] = withTypedPlan { + MapPartitions[T, U](func, logicalPlan) } /** @@ -2275,18 +2270,24 @@ class Dataset[T] private[sql]( } } + private def removeGeneratedSubquery(plan: LogicalPlan): LogicalPlan = { + val resolved = sqlContext.executePlan(plan).analyzed + resolved transformDown { + case SubqueryAlias(alias, child) if alias.startsWith(Dataset.aliasPrefix) => child + } + } + /** A convenient function to wrap a logical plan and produce a DataFrame. */ - @inline private def withPlan(logicalPlan: => LogicalPlan): DataFrame = { - Dataset.ofRows(sqlContext, logicalPlan) + @inline private[sql] def withPlan(logicalPlan: => LogicalPlan): DataFrame = { + Dataset.ofRowsWithAlias(sqlContext, removeGeneratedSubquery(logicalPlan)) } - /** A convenient function to wrap a logical plan and produce a Dataset. */ - @inline private def withTypedPlan(logicalPlan: => LogicalPlan): Dataset[T] = { - new Dataset[T](sqlContext, logicalPlan, encoder) + @inline private def withPlanNoAlias(logicalPlan: => LogicalPlan): DataFrame = { + Dataset.ofRows(sqlContext, removeGeneratedSubquery(logicalPlan)) } - private[sql] def withTypedPlan[R]( - other: Dataset[_], encoder: Encoder[R])( - f: (LogicalPlan, LogicalPlan) => LogicalPlan): Dataset[R] = - new Dataset[R](sqlContext, f(logicalPlan, other.logicalPlan), encoder) + /** A convenient function to wrap a logical plan and produce a Dataset. */ + @inline private[sql] def withTypedPlan[U : Encoder](logicalPlan: => LogicalPlan): Dataset[U] = { + Dataset.withAlias(sqlContext, removeGeneratedSubquery(logicalPlan)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala index 07aa1515f384..eef7b5e5baa7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala @@ -38,7 +38,7 @@ import org.apache.spark.sql.execution.QueryExecution class KeyValueGroupedDataset[K, V] private[sql]( kEncoder: Encoder[K], vEncoder: Encoder[V], - val queryExecution: QueryExecution, + val ds: Dataset[_], private val dataAttributes: Seq[Attribute], private val groupingAttributes: Seq[Attribute]) extends Serializable { @@ -54,8 +54,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( private val resolvedVEncoder = unresolvedVEncoder.resolve(dataAttributes, OuterScopes.outerScopes) - private def logicalPlan = queryExecution.analyzed - private def sqlContext = queryExecution.sqlContext + private def logicalPlan = ds.logicalPlan + private def sqlContext = ds.sqlContext private def groupedData = { new RelationalGroupedDataset( @@ -75,7 +75,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( new KeyValueGroupedDataset( encoderFor[L], unresolvedVEncoder, - queryExecution, + ds, dataAttributes, groupingAttributes) @@ -84,11 +84,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( * * @since 1.6.0 */ - def keys: Dataset[K] = { - Dataset[K]( - sqlContext, - Distinct( - Project(groupingAttributes, logicalPlan))) + def keys: Dataset[K] = ds.withTypedPlan { + Distinct(Project(groupingAttributes, logicalPlan)) } /** @@ -109,14 +106,13 @@ class KeyValueGroupedDataset[K, V] private[sql]( * * @since 1.6.0 */ - def flatMapGroups[U : Encoder](f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = { - Dataset[U]( - sqlContext, - MapGroups( - f, - groupingAttributes, - dataAttributes, - logicalPlan)) + def flatMapGroups[U : Encoder]( + f: (K, Iterator[V]) => TraversableOnce[U]): Dataset[U] = ds.withTypedPlan { + MapGroups( + f, + groupingAttributes, + dataAttributes, + logicalPlan) } /** @@ -231,12 +227,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( Alias(CreateStruct(groupingAttributes), "key")() } val aggregate = Aggregate(groupingAttributes, keyColumn +: namedColumns, logicalPlan) - val execution = new QueryExecution(sqlContext, aggregate) - new Dataset( - sqlContext, - execution, - ExpressionEncoder.tuple(unresolvedKEncoder +: encoders)) + ds.withTypedPlan(aggregate)(ExpressionEncoder.tuple(unresolvedKEncoder +: encoders)) } /** @@ -302,8 +294,7 @@ class KeyValueGroupedDataset[K, V] private[sql]( other: KeyValueGroupedDataset[K, U])( f: (K, Iterator[V], Iterator[U]) => TraversableOnce[R]): Dataset[R] = { implicit val uEncoder = other.unresolvedVEncoder - Dataset[R]( - sqlContext, + ds.withTypedPlan { CoGroup( f, this.groupingAttributes, @@ -311,7 +302,8 @@ class KeyValueGroupedDataset[K, V] private[sql]( this.dataAttributes, other.dataAttributes, this.logicalPlan, - other.logicalPlan)) + other.logicalPlan) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala index a2186841d13c..7e1af9a6080b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala @@ -51,19 +51,19 @@ class RelationalGroupedDataset protected[sql]( val aliasedAgg = aggregates.map(alias) groupType match { - case RelationalGroupedDataset.GroupByType => - Dataset.ofRows( - df.sqlContext, Aggregate(groupingExprs, aliasedAgg, df.logicalPlan)) - case RelationalGroupedDataset.RollupType => - Dataset.ofRows( - df.sqlContext, Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan)) - case RelationalGroupedDataset.CubeType => - Dataset.ofRows( - df.sqlContext, Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan)) - case RelationalGroupedDataset.PivotType(pivotCol, values) => + case RelationalGroupedDataset.GroupByType => df.withPlan { + Aggregate(groupingExprs, aliasedAgg, df.logicalPlan) + } + case RelationalGroupedDataset.RollupType => df.withPlan { + Aggregate(Seq(Rollup(groupingExprs)), aliasedAgg, df.logicalPlan) + } + case RelationalGroupedDataset.CubeType => df.withPlan { + Aggregate(Seq(Cube(groupingExprs)), aliasedAgg, df.logicalPlan) + } + case RelationalGroupedDataset.PivotType(pivotCol, values) => df.withPlan { val aliasedGrps = groupingExprs.map(alias) - Dataset.ofRows( - df.sqlContext, Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan)) + Pivot(aliasedGrps, pivotCol, values, aggExprs, df.logicalPlan) + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a387d81ce7a9..6968b7bdddd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -272,11 +272,11 @@ class SQLContext private[sql]( } /** - * Returns true if the [[Queryable]] is currently cached in-memory. + * Returns true if the [[Dataset]] is currently cached in-memory. * @group cachemgmt * @since 1.3.0 */ - private[sql] def isCached(qName: Queryable): Boolean = { + private[sql] def isCached(qName: Dataset[_]): Boolean = { cacheManager.lookupCachedData(qName).nonEmpty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 14b8b6fc3b38..7601cc5a505c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.columnar.InMemoryRelation +import org.apache.spark.sql.Dataset import org.apache.spark.storage.StorageLevel import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK @@ -74,15 +75,15 @@ private[sql] class CacheManager extends Logging { } /** - * Caches the data produced by the logical representation of the given [[Queryable]]. + * Caches the data produced by the logical representation of the given [[Dataset]]. * Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because * recomputing the in-memory columnar representation of the underlying table is expensive. */ private[sql] def cacheQuery( - query: Queryable, + query: Dataset[_], tableName: Option[String] = None, storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock { - val planToCache = query.queryExecution.analyzed + val planToCache = query.originalLogicalPlan if (lookupCachedData(planToCache).nonEmpty) { logWarning("Asked to cache already cached data.") } else { @@ -99,22 +100,22 @@ private[sql] class CacheManager extends Logging { } } - /** Removes the data for the given [[Queryable]] from the cache */ - private[sql] def uncacheQuery(query: Queryable, blocking: Boolean = true): Unit = writeLock { - val planToCache = query.queryExecution.analyzed + /** Removes the data for the given [[Dataset]] from the cache */ + private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock { + val planToCache = query.originalLogicalPlan val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.uncache(blocking) cachedData.remove(dataIndex) } - /** Tries to remove the data for the given [[Queryable]] from the cache + /** Tries to remove the data for the given [[Dataset]] from the cache * if it's cached */ private[sql] def tryUncacheQuery( - query: Queryable, + query: Dataset[_], blocking: Boolean = true): Boolean = writeLock { - val planToCache = query.queryExecution.analyzed + val planToCache = query.originalLogicalPlan val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan)) val found = dataIndex >= 0 if (found) { @@ -124,9 +125,9 @@ private[sql] class CacheManager extends Logging { found } - /** Optionally returns cached data for the given [[Queryable]] */ - private[sql] def lookupCachedData(query: Queryable): Option[CachedData] = readLock { - lookupCachedData(query.queryExecution.analyzed) + /** Optionally returns cached data for the given [[Dataset]] */ + private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock { + lookupCachedData(query.originalLogicalPlan) } /** Optionally returns cached data for the given [[LogicalPlan]]. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 01078e6c7298..1b6e28557c75 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -553,8 +553,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { case Row(id: Int, name: String, age: Int, idToDrop: Int, salary: Double) => Row(id, name, age, salary) }.toSeq) + assert(joinedDf.schema.map(_.name) === Seq("id", "name", "age", "id", "salary")) assert(df.schema.map(_.name) === Seq("id", "name", "age", "salary")) - assert(df("id") == person("id")) } test("withColumnRenamed") { @@ -1436,18 +1436,8 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { test("Un-direct self-join") { val df = Seq(1 -> "a", 2 -> "b").toDF("i", "j") val df2 = df.filter($"i" > 0) - - val err = intercept[AnalysisException](df.join(df2, (df("i") + 1) === df2("i"))) - assert(err.message.contains("please eliminate ambiguity from the inputs first")) - - val namedDf = df.as("x") - val namedDf2 = df2.as("y") - checkAnswer( - namedDf.join(namedDf2, (namedDf("i") + 1) === namedDf2("i")), - Row(1, "a", 2, "b") :: Nil - ) checkAnswer( - namedDf.join(namedDf2, ($"x.i" + 1) === $"y.i"), + df.join(df2, (df("i") + 1) === df2("i")), Row(1, "a", 2, "b") :: Nil ) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 5af1a4fcd776..1f78bcc6782b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -541,4 +541,24 @@ class JoinSuite extends QueryTest with SharedSQLContext { Row(3, 1) :: Row(3, 2) :: Nil) } + + test("friendly error message for self-join") { + withTempTable("tbl") { + val df = Seq(1 -> "a").toDF("k", "v") + df.registerTempTable("tbl") + + val e1 = intercept[AnalysisException](sql("SELECT k FROM tbl JOIN tbl")) + assert(e1.message == "Input Attributes tbl.k, tbl.k are ambiguous, please eliminate " + + "ambiguity from the inputs first, e.g. alias the left and right plan before join them.") + + val e2 = intercept[AnalysisException](sql("SELECT k FROM tbl t1 JOIN tbl t2")) + assert(e2.message == "Reference 'k' is ambiguous, please add a qualifier to distinguish " + + "it, e.g. 't1.k', available qualifiers: t1, t2") + + checkAnswer( + sql("SELECT t1.k FROM tbl t1 JOIN tbl t2"), + Row(1) + ) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c958eac266d6..012d25d916e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1834,8 +1834,7 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { // This test is for the fix of https://issues.apache.org/jira/browse/SPARK-10737. // This bug will be triggered when Tungsten is enabled and there are multiple // SortMergeJoin operators executed in the same task. - val confs = SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1" :: Nil - withSQLConf(confs: _*) { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "1") { val df1 = (1 to 50).map(i => (s"str_$i", i)).toDF("i", "j") val df2 = df1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 8adb170f57e6..657130c8e71d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD -import org.apache.spark.sql.{execution, Row} +import org.apache.spark.sql.{execution, DataFrame, Row} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder} import org.apache.spark.sql.catalyst.plans.Inner @@ -54,18 +54,18 @@ class PlannerSuite extends SharedSQLContext { } test("count is partially aggregated") { - val query = testData.groupBy('value).agg(count('key)).queryExecution.optimizedPlan + val query = testData.groupBy('value).agg(count('key)).originalLogicalPlan testPartialAggregationPlan(query) } test("count distinct is partially aggregated") { - val query = testData.groupBy('value).agg(countDistinct('key)).queryExecution.optimizedPlan + val query = testData.groupBy('value).agg(countDistinct('key)).originalLogicalPlan testPartialAggregationPlan(query) } test("mixed aggregates are partially aggregated") { val query = - testData.groupBy('value).agg(count('value), countDistinct('key)).queryExecution.optimizedPlan + testData.groupBy('value).agg(count('value), countDistinct('key)).originalLogicalPlan testPartialAggregationPlan(query) } @@ -164,26 +164,30 @@ class PlannerSuite extends SharedSQLContext { } } + private def checkOutput(planned: SparkPlan, df: DataFrame): Unit = { + assert(planned.output.map(_.withQualifier(None)) === + df.logicalPlan.output.map(_.withQualifier(None))) + } + test("efficient terminal limit -> sort should use TakeOrderedAndProject") { val query = testData.select('key, 'value).sort('key).limit(2) val planned = query.queryExecution.executedPlan assert(planned.isInstanceOf[execution.TakeOrderedAndProject]) - assert(planned.output === - testData.select('key, 'value).logicalPlan.output.map(_.withQualifier(None))) + checkOutput(planned, testData.select('key, 'value)) } test("terminal limit -> project -> sort should use TakeOrderedAndProject") { val query = testData.select('key, 'value).sort('key).select('value, 'key).limit(2) val planned = query.queryExecution.executedPlan assert(planned.isInstanceOf[execution.TakeOrderedAndProject]) - assert(planned.output === testData.select('value, 'key).logicalPlan.output) + checkOutput(planned, testData.select('value, 'key)) } test("terminal limits that are not handled by TakeOrderedAndProject should use CollectLimit") { val query = testData.select('value).limit(2) val planned = query.queryExecution.sparkPlan assert(planned.isInstanceOf[CollectLimit]) - assert(planned.output === testData.select('value).logicalPlan.output) + checkOutput(planned, testData.select('value)) } test("TakeOrderedAndProject can appear in the middle of plans") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala index 1fa15730bc2e..76fdf2899116 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala @@ -216,7 +216,7 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi /** Returns a resolved expression for `str` in the context of `df`. */ def resolve(df: DataFrame, str: String): Expression = { - df.select(expr(str)).queryExecution.analyzed.expressions.head.children.head + df.select(expr(str)).originalLogicalPlan.expressions.head.children.head } /** Returns a set with all the filters present in the physical plan. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala index e7d2b5ad9682..596dc7e1ed49 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/util/DataFrameCallbackSuite.scala @@ -20,8 +20,8 @@ package org.apache.spark.sql.util import scala.collection.mutable.ArrayBuffer import org.apache.spark._ -import org.apache.spark.sql.{functions, QueryTest} -import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Project} +import org.apache.spark.sql.{functions, Dataset, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project, SubqueryAlias} import org.apache.spark.sql.execution.{QueryExecution, WholeStageCodegen} import org.apache.spark.sql.test.SharedSQLContext @@ -29,6 +29,16 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { import testImplicits._ import functions._ + private def removeGeneratedSubquery(plan: LogicalPlan): LogicalPlan = { + plan transformDown { + case SubqueryAlias(alias, child) if alias.startsWith(Dataset.aliasPrefix) => child + } + } + + private def checkPlan[T <: LogicalPlan](qe: QueryExecution): Unit = { + assert(removeGeneratedSubquery(qe.analyzed).isInstanceOf[T]) + } + test("execute callback functions when a DataFrame action finished successfully") { val metrics = ArrayBuffer.empty[(String, QueryExecution, Long)] val listener = new QueryExecutionListener { @@ -48,11 +58,11 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics.length == 2) assert(metrics(0)._1 == "collect") - assert(metrics(0)._2.analyzed.isInstanceOf[Project]) + checkPlan[Project](metrics(0)._2) assert(metrics(0)._3 > 0) assert(metrics(1)._1 == "count") - assert(metrics(1)._2.analyzed.isInstanceOf[Aggregate]) + checkPlan[Aggregate](metrics(1)._2) assert(metrics(1)._3 > 0) sqlContext.listenerManager.unregister(listener) @@ -79,7 +89,7 @@ class DataFrameCallbackSuite extends QueryTest with SharedSQLContext { assert(metrics.length == 1) assert(metrics(0)._1 == "collect") - assert(metrics(0)._2.analyzed.isInstanceOf[Project]) + checkPlan[Project](metrics(0)._2) assert(metrics(0)._3.getMessage == e.getMessage) sqlContext.listenerManager.unregister(listener) From a1822dad0b9f9515a23fc853ca95d39524c0ffc4 Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Tue, 29 Mar 2016 10:16:31 +0800 Subject: [PATCH 10/10] update --- python/pyspark/sql/tests.py | 2 +- .../apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 1a5d422af953..a7abe30a0359 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -636,7 +636,7 @@ def test_column_select(self): df = self.df self.assertEqual(self.testData, df.select("*").collect()) self.assertEqual(self.testData, df.select(df.key, df.value).collect()) - self.assertEqual([Row(value='1')], df.where(df.key == 1).select(df.value).collect()) + self.assertEqual([Row(value='1')], df.where(df.key == 1).select("value").collect()) def test_freqItems(self): vals = [Row(a=1, b=-2.0) if i % 2 == 0 else Row(a=i, b=i * 1.0) for i in range(100)] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 7ce4ed817fb0..ff36a8dc7559 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -252,7 +252,7 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging { // No matches. case Seq() => - logTrace(s"Could not find $name in ${input.mkString(", ")}") + logTrace(s"Could not find $name in ${input.map(_.qualifiedName).mkString(", ")}") None // More than one match.