Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
fix.
  • Loading branch information
gatorsmile committed Jun 23, 2017
commit 08015c83c251cb44b52762b162a1cdaa40130820
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,24 @@ object FileFormatWriter extends Logging {
// Get the actual partition columns as attributes after matching them by name with
// the given columns names.
val partitionColumns = partitionColumnNames.map { col =>
allColumns.find(f => f.name.equalsIgnoreCase(col)).getOrElse {
val nameEquality = sparkSession.sessionState.conf.resolver
allColumns.find(f => nameEquality(f.name, col)).getOrElse {
throw new RuntimeException(
s"Partition column $col not found in schema ${queryExecution.executedPlan.schema}")
}
}
val partitionSet = AttributeSet(partitionColumns)
val dataColumns = allColumns.filterNot(partitionSet.contains)

queryExecution.executedPlan.output.zip(queryExecution.logical.output).foreach {
case (fieldExecuted, fieldAnalyzed) =>
if (fieldAnalyzed.name != fieldExecuted.name) {
// scalastyle:off println
println(s"analyzed: ${fieldAnalyzed.name}; executed: ${fieldExecuted.name}")
// scalastyle:on println
}
}

val bucketIdExpression = bucketSpec.map { spec =>
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
// Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,11 +127,11 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
val resolver = sparkSession.sessionState.conf.resolver
val tableCols = existingTable.schema.map(_.name)

// As we are inserting into an existing table, we should respect the existing schema and
// adjust the column order of the given dataframe according to it, or throw exception
// if the column names do not match.
// As we are inserting into an existing table, we should respect the existing schema, preserve
// the case and adjust the column order of the given DataFrame according to it, or throw
// an exception if the column names do not match.
val adjustedColumns = tableCols.map { col =>
query.resolve(Seq(col), resolver).getOrElse {
query.resolve(Seq(col), resolver).map(Alias(_, col)()).getOrElse {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to add an alias for enforcing the query to preserve the original name of table schema, whose case could be different from the underlying query schema

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good catch!

val inputColumns = query.schema.map(_.name).mkString(", ")
throw new AnalysisException(
s"cannot resolve '$col' given input columns: [$inputColumns]")
Expand Down Expand Up @@ -168,15 +168,9 @@ case class PreprocessTableCreation(sparkSession: SparkSession) extends Rule[Logi
""".stripMargin)
}

val newQuery = if (adjustedColumns != query.output) {
Project(adjustedColumns, query)
} else {
query
}

c.copy(
tableDesc = existingTable,
query = Some(newQuery))
query = Some(Project(adjustedColumns, query)))

// Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
// config, and do various checks:
Expand Down