Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,8 @@ object SqlParser extends AbstractSparkSQLParser with DataTypeParser {

protected lazy val baseExpression: Parser[Expression] =
( "*" ^^^ UnresolvedStar(None)
| (ident <~ "."). + <~ "*" ^^ { case target => { UnresolvedStar(Option(target)) }
} | primary
| (ident <~ "."). + <~ "*" ^^ { case target => UnresolvedStar(Option(target))}
| primary
)

protected lazy val signedPrimary: Parser[Expression] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,28 +183,16 @@ case class UnresolvedStar(target: Option[Seq[String]]) extends Star with Unevalu
case None => input.output
// If there is a table, pick out attributes that are part of this table.
case Some(t) => if (t.size == 1) {
input.output.filter(_.qualifiers.filter(resolver(_, t.head)).nonEmpty)
input.output.filter(_.qualifiers.exists(resolver(_, t.head)))
} else {
List()
}
}
if (!expandedAttributes.isEmpty) {
if (expandedAttributes.forall(_.isInstanceOf[NamedExpression])) {
return expandedAttributes
} else {
require(expandedAttributes.size == input.output.size)
expandedAttributes.zip(input.output).map {
case (e, originalAttribute) =>
Alias(e, originalAttribute.name)(qualifiers = originalAttribute.qualifiers)
}
}
return expandedAttributes
}

require(target.isDefined)
if (expandedAttributes.nonEmpty) return expandedAttributes

// Try to resolve it as a struct expansion. If there is a conflict and both are possible,
// (i.e. [name].* is both a table and a struct), the struct path can always be qualified.
require(target.isDefined)
val attribute = input.resolve(target.get, resolver)
if (attribute.isDefined) {
// This target resolved to an attribute in child. It must be a struct. Expand it.
Expand Down
6 changes: 4 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,10 @@ class Column(protected[sql] val expr: Expression) extends Logging {

def this(name: String) = this(name match {
case "*" => UnresolvedStar(None)
case _ if name.endsWith(".*") => UnresolvedStar(Some(UnresolvedAttribute.parseAttributeName(
name.substring(0, name.length - 2))))
case _ if name.endsWith(".*") => {
val parts = UnresolvedAttribute.parseAttributeName(name.substring(0, name.length - 2))
UnresolvedStar(Some(parts))
}
case _ => UnresolvedAttribute.quotedString(name)
})

Expand Down
79 changes: 46 additions & 33 deletions sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.DefaultParserDialect
import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
import org.apache.spark.sql.catalyst.errors.DialectException
import org.apache.spark.sql.execution.aggregate
import org.apache.spark.sql.execution.joins.{SortMergeJoin, CartesianProduct}
import org.apache.spark.sql.execution.joins.{CartesianProduct, SortMergeJoin}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext}
Expand Down Expand Up @@ -1956,7 +1956,8 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {

// Try with a registered table.
sql("select struct(a, b) as record from testData2").registerTempTable("structTable")
checkAnswer(sql("SELECT record.* FROM structTable"),
checkAnswer(
sql("SELECT record.* FROM structTable"),
Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: Row(2, 2) :: Row(3, 1) :: Row(3, 2) :: Nil)

checkAnswer(sql(
Expand Down Expand Up @@ -2019,50 +2020,62 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext {
Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: Row(2, 2) :: Row(3, 1) :: Row(3, 2) :: Nil)

// Try with a registered table
nestedStructData.registerTempTable("nestedStructTable")
checkAnswer(sql("SELECT record.* FROM nestedStructTable"),
nestedStructData.select($"record.*"))
checkAnswer(sql("SELECT record.r1 FROM nestedStructTable"),
nestedStructData.select($"record.r1"))
checkAnswer(sql("SELECT record.r1.* FROM nestedStructTable"),
nestedStructData.select($"record.r1.*"))

// Create paths with unusual characters.
withTempTable("nestedStructTable") {
nestedStructData.registerTempTable("nestedStructTable")
checkAnswer(
sql("SELECT record.* FROM nestedStructTable"),
nestedStructData.select($"record.*"))
checkAnswer(
sql("SELECT record.r1 FROM nestedStructTable"),
nestedStructData.select($"record.r1"))
checkAnswer(
sql("SELECT record.r1.* FROM nestedStructTable"),
nestedStructData.select($"record.r1.*"))

// Try resolving something not there.
assert(intercept[AnalysisException](sql("SELECT abc.* FROM nestedStructTable"))
.getMessage.contains("cannot resolve"))
}

// Create paths with unusual characters
val specialCharacterPath = sql(
"""
| SELECT struct(`col$.a_`, `a.b.c.`) as `r&&b.c` FROM
| (SELECT struct(a, b) as `col$.a_`, struct(b, a) as `a.b.c.` FROM testData2) tmp
""".stripMargin)
specialCharacterPath.registerTempTable("specialCharacterTable")
checkAnswer(specialCharacterPath.select($"`r&&b.c`.*"),
nestedStructData.select($"record.*"))
checkAnswer(sql("SELECT `r&&b.c`.`col$.a_` FROM specialCharacterTable"),
nestedStructData.select($"record.r1"))
checkAnswer(sql("SELECT `r&&b.c`.`a.b.c.` FROM specialCharacterTable"),
nestedStructData.select($"record.r2"))
checkAnswer(sql("SELECT `r&&b.c`.`col$.a_`.* FROM specialCharacterTable"),
nestedStructData.select($"record.r1.*"))
withTempTable("specialCharacterTable") {
specialCharacterPath.registerTempTable("specialCharacterTable")
checkAnswer(
specialCharacterPath.select($"`r&&b.c`.*"),
nestedStructData.select($"record.*"))
checkAnswer(
sql("SELECT `r&&b.c`.`col$.a_` FROM specialCharacterTable"),
nestedStructData.select($"record.r1"))
checkAnswer(
sql("SELECT `r&&b.c`.`a.b.c.` FROM specialCharacterTable"),
nestedStructData.select($"record.r2"))
checkAnswer(
sql("SELECT `r&&b.c`.`col$.a_`.* FROM specialCharacterTable"),
nestedStructData.select($"record.r1.*"))
}

// Try star expanding a scalar. This should fail.
assert(intercept[AnalysisException](sql("select a.* from testData2")).getMessage.contains(
"Can only star expand struct data types."))

// Try resolving something not there.
assert(intercept[AnalysisException](sql("SELECT abc.* FROM nestedStructTable"))
.getMessage.contains("cannot resolve"))
}


test("Struct Star Expansion - Name conflict") {
// Create a data set that contains a naming conflict
val nameConflict = sql("SELECT struct(a, b) as nameConflict, a as a FROM testData2")
nameConflict.registerTempTable("nameConflict")
// Unqualified should resolve to table.
checkAnswer(sql("SELECT nameConflict.* FROM nameConflict"),
Row(Row(1, 1), 1) :: Row(Row(1, 2), 1) :: Row(Row(2, 1), 2) :: Row(Row(2, 2), 2) ::
Row(Row(3, 1), 3) :: Row(Row(3, 2), 3) :: Nil)
// Qualify the struct type with the table name.
checkAnswer(sql("SELECT nameConflict.nameConflict.* FROM nameConflict"),
Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: Row(2, 2) :: Row(3, 1) :: Row(3, 2) :: Nil)
withTempTable("nameConflict") {
nameConflict.registerTempTable("nameConflict")
// Unqualified should resolve to table.
checkAnswer(sql("SELECT nameConflict.* FROM nameConflict"),
Row(Row(1, 1), 1) :: Row(Row(1, 2), 1) :: Row(Row(2, 1), 2) :: Row(Row(2, 2), 2) ::
Row(Row(3, 1), 3) :: Row(Row(3, 2), 3) :: Nil)
// Qualify the struct type with the table name.
checkAnswer(sql("SELECT nameConflict.nameConflict.* FROM nameConflict"),
Row(1, 1) :: Row(1, 2) :: Row(2, 1) :: Row(2, 2) :: Row(3, 1) :: Row(3, 2) :: Nil)
}
}
}