Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
address comments
  • Loading branch information
cloud-fan committed Oct 12, 2017
commit 5bdaf7d88475c64ca6d119e913eba73b1ac8e2e9
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,16 @@ import org.apache.spark.util.SerializableConfiguration
*/
trait DataWritingCommand extends RunnableCommand {

/**
* The input query plan that produces the data to be written.
*/
def query: LogicalPlan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add one line description for query?


// We make the input `query` an inner child instead of a child in order to hide it from the
// optimizer. This is because optimizer may change the output schema names, and we have to keep
// the original analyzed plan here so that we can pass the corrected schema to the writer. The
// schema of analyzed plan is what user expects(or specifies), so we should respect it when
// writing.
// optimizer. This is because optimizer may not preserve the output schema names' case, and we
// have to keep the original analyzed plan here so that we can pass the corrected schema to the
// writer. The schema of analyzed plan is what user expects(or specifies), so we should respect
// it when writing.
override protected def innerChildren: Seq[LogicalPlan] = query :: Nil

override lazy val metrics: Map[String, SQLMetric] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ object FileFormatWriter extends Logging {
job.setOutputValueClass(classOf[InternalRow])
FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))

val allColumns = queryExecution.logical.output
// Pick the attributes from analyzed plan, as optimizer may not preserve the output schema
// names' case.
val allColumns = queryExecution.analyzed.output
val partitionSet = AttributeSet(partitionColumns)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might need to double check the partitionColumns in all the other files are also from analyzed plans.

val dataColumns = allColumns.filterNot(partitionSet.contains)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,15 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
}

test("FileFormatWriter should respect the input query schema") {
withTable("t1", "t2") {
withTable("t1", "t2", "t3", "t4") {
spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also add another case here?

spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3")

spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")
checkAnswer(spark.table("t2"), Row(0, 0))

// Test picking part of the columns when writing.
spark.range(1).select('id, 'id as 'col1, 'id as 'col2).write.saveAsTable("t3")
spark.sql("select COL1, COL2 from t3").write.saveAsTable("t4")
checkAnswer(spark.table("t4"), Row(0, 0))
}
}
}