Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 42 additions & 31 deletions sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,23 @@ package org.apache.spark.sql.sources

import java.util.{Date, UUID}

import scala.collection.JavaConversions.asScalaIterator

import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce._
import org.apache.hadoop.mapreduce.lib.output.{FileOutputCommitter => MapReduceFileOutputCommitter, FileOutputFormat}

import org.apache.spark._
import org.apache.spark.mapred.SparkHadoopMapRedUtil
import org.apache.spark.mapreduce.SparkHadoopMapReduceUtil
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateProjection
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.execution.RunnableCommand
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, Row, SQLConf, SQLContext, SaveMode}
import org.apache.spark.sql.types.StringType
import org.apache.spark.util.SerializableConfiguration

private[sql] case class InsertIntoDataSource(
Expand Down Expand Up @@ -170,14 +171,14 @@ private[sql] case class InsertIntoHadoopFsRelation(
try {
writerContainer.executorSideSetup(taskContext)

val converter = if (needsConversion) {
val converter: InternalRow => Row = if (needsConversion) {
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
} else {
r: InternalRow => r.asInstanceOf[Row]
}
while (iterator.hasNext) {
val row = converter(iterator.next())
writerContainer.outputWriterForRow(row).write(row)
val internalRow = iterator.next()
writerContainer.outputWriterForRow(internalRow).write(converter(internalRow))
}

writerContainer.commitTask()
Expand Down Expand Up @@ -239,23 +240,21 @@ private[sql] case class InsertIntoHadoopFsRelation(
try {
writerContainer.executorSideSetup(taskContext)

val partitionProj = newProjection(codegenEnabled, partitionOutput, output)
// Projects all partition columns and casts them to strings to build partition directories.
val partitionCasts = partitionOutput.map(Cast(_, StringType))
val partitionProj = newProjection(codegenEnabled, partitionCasts, output)
val dataProj = newProjection(codegenEnabled, dataOutput, output)

val dataConverter: InternalRow => Row = if (needsConversion) {
CatalystTypeConverters.createToScalaConverter(dataSchema).asInstanceOf[InternalRow => Row]
} else {
r: InternalRow => r.asInstanceOf[Row]
}
val partitionSchema = StructType.fromAttributes(partitionOutput)
val partConverter: InternalRow => Row =
CatalystTypeConverters.createToScalaConverter(partitionSchema)
.asInstanceOf[InternalRow => Row]

while (iterator.hasNext) {
val row = iterator.next()
val partitionPart = partConverter(partitionProj(row))
val dataPart = dataConverter(dataProj(row))
val internalRow = iterator.next()
val partitionPart = partitionProj(internalRow)
val dataPart = dataConverter(dataProj(internalRow))
writerContainer.outputWriterForRow(partitionPart).write(dataPart)
}

Expand Down Expand Up @@ -424,7 +423,7 @@ private[sql] abstract class BaseWriterContainer(
}

// Called on executor side when writing rows
def outputWriterForRow(row: Row): OutputWriter
def outputWriterForRow(row: InternalRow): OutputWriter

protected def initWriters(): Unit

Expand Down Expand Up @@ -466,7 +465,7 @@ private[sql] class DefaultWriterContainer(
writer = outputWriterFactory.newInstance(getWorkPath, dataSchema, taskAttemptContext)
}

override def outputWriterForRow(row: Row): OutputWriter = writer
override def outputWriterForRow(row: InternalRow): OutputWriter = writer

override def commitTask(): Unit = {
try {
Expand Down Expand Up @@ -507,23 +506,36 @@ private[sql] class DynamicPartitionWriterContainer(
outputWriters = new java.util.HashMap[String, OutputWriter]
}

override def outputWriterForRow(row: Row): OutputWriter = {
// TODO (SPARK-8888): zip and all the stuff happening here is very inefficient.
val partitionPath = partitionColumns.zip(row.toSeq).map { case (col, rawValue) =>
val string = if (rawValue == null) null else String.valueOf(rawValue)
val valueString = if (string == null || string.isEmpty) {
defaultPartitionName
} else {
PartitioningUtils.escapePathName(string)
// The `row` argument is supposed to only contain partition column values which have been casted
// to strings.
override def outputWriterForRow(row: InternalRow): OutputWriter = {
val partitionPath = {
val partitionPathBuilder = new StringBuilder
var i = 0

while (i < partitionColumns.length) {
val col = partitionColumns(i)
val partitionValueString = {
val string = row.getString(i)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wouldn't this be a problem if the column is not a string column?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's already casted to string here.

if (string.eq(null)) defaultPartitionName else PartitioningUtils.escapePathName(string)
}

if (i > 0) {
partitionPathBuilder.append(Path.SEPARATOR_CHAR)
}

partitionPathBuilder.append(s"$col=$partitionValueString")
i += 1
}
s"/$col=$valueString"
}.mkString.stripPrefix(Path.SEPARATOR)

partitionPathBuilder.toString()
}

val writer = outputWriters.get(partitionPath)
if (writer.eq(null)) {
val path = new Path(getWorkPath, partitionPath)
taskAttemptContext.getConfiguration.set("spark.sql.sources.output.path",
new Path(outputPath, partitionPath).toString)
taskAttemptContext.getConfiguration.set(
"spark.sql.sources.output.path", new Path(outputPath, partitionPath).toString)
val newWriter = outputWriterFactory.newInstance(path.toString, dataSchema, taskAttemptContext)
outputWriters.put(partitionPath, newWriter)
newWriter
Expand All @@ -534,8 +546,7 @@ private[sql] class DynamicPartitionWriterContainer(

private def clearOutputWriters(): Unit = {
if (!outputWriters.isEmpty) {
val iter = scala.collection.JavaConversions.asScalaIterator(outputWriters.values().iterator())
iter.foreach(_.close())
asScalaIterator(outputWriters.values().iterator()).foreach(_.close())
outputWriters.clear()
}
}
Expand Down