Skip to content
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
5244aaf
[SPARK-22897][CORE] Expose stageAttemptId in TaskContext
advancedxy Jan 2, 2018
b96a213
[SPARK-22938] Assert that SQLConf.get is accessed only on the driver.
juliuszsompolski Jan 3, 2018
a05e85e
[SPARK-22934][SQL] Make optional clauses order insensitive for CREATE…
gatorsmile Jan 3, 2018
b962488
[SPARK-20236][SQL] dynamic partition overwrite
cloud-fan Jan 3, 2018
27c949d
[SPARK-22932][SQL] Refactor AnalysisContext
gatorsmile Jan 2, 2018
79f7263
[SPARK-22896] Improvement in String interpolation
chetkhatri Jan 3, 2018
a51212b
[SPARK-20960][SQL] make ColumnVector public
cloud-fan Jan 3, 2018
f51c8fd
[SPARK-22944][SQL] improve FoldablePropagation
cloud-fan Jan 4, 2018
1860a43
[SPARK-22933][SPARKR] R Structured Streaming API for withWatermark, t…
felixcheung Jan 4, 2018
a7cfd6b
[SPARK-22950][SQL] Handle ChildFirstURLClassLoader's parent
yaooqinn Jan 4, 2018
eb99b8a
[SPARK-22945][SQL] add java UDF APIs in the functions object
cloud-fan Jan 4, 2018
1f5e354
[SPARK-22939][PYSPARK] Support Spark UDF in registerFunction
gatorsmile Jan 4, 2018
bcfeef5
[SPARK-22771][SQL] Add a missing return statement in Concat.checkInpu…
maropu Jan 4, 2018
cd92913
[SPARK-21475][CORE][2ND ATTEMPT] Change to use NIO's Files API for ex…
jerryshao Jan 4, 2018
bc4bef4
[SPARK-22850][CORE] Ensure queued events are delivered to all event q…
Jan 4, 2018
2ab4012
[SPARK-22948][K8S] Move SparkPodInitContainer to correct package.
Jan 4, 2018
84707f0
[SPARK-22953][K8S] Avoids adding duplicated secret volumes when init-…
liyinan926 Jan 4, 2018
ea9da61
[SPARK-22960][K8S] Make build-push-docker-images.sh more dev-friendly.
Jan 5, 2018
158f7e6
[SPARK-22957] ApproxQuantile breaks if the number of rows exceeds MaxInt
juliuszsompolski Jan 5, 2018
145820b
[SPARK-22825][SQL] Fix incorrect results of Casting Array to String
maropu Jan 5, 2018
5b524cc
[SPARK-22949][ML] Apply CrossValidator approach to Driver/Distributed…
MrBago Jan 5, 2018
f9dcdbc
[SPARK-22757][K8S] Enable spark.jars and spark.files in KUBERNETES mode
liyinan926 Jan 5, 2018
fd4e304
[SPARK-22961][REGRESSION] Constant columns should generate QueryPlanC…
adrian-ionescu Jan 5, 2018
0a30e93
[SPARK-22940][SQL] HiveExternalCatalogVersionsSuite should succeed on…
bersprockets Jan 5, 2018
d1f422c
[SPARK-13030][ML] Follow-up cleanups for OneHotEncoderEstimator
jkbradley Jan 5, 2018
55afac4
[SPARK-22914][DEPLOY] Register history.ui.port
gerashegalov Jan 6, 2018
bf85301
[SPARK-22937][SQL] SQL elt output binary for binary inputs
maropu Jan 6, 2018
3e3e938
[SPARK-22960][K8S] Revert use of ARG base_image in images
liyinan926 Jan 6, 2018
7236914
[SPARK-22930][PYTHON][SQL] Improve the description of Vectorized UDFs…
icexelloss Jan 6, 2018
e6449e8
[SPARK-22793][SQL] Memory leak in Spark Thrift Server
Jan 6, 2018
0377755
[SPARK-21786][SQL] When acquiring 'compressionCodecClassName' in 'Par…
fjh100456 Jan 6, 2018
b66700a
[SPARK-22901][PYTHON][FOLLOWUP] Adds the doc for asNondeterministic f…
HyukjinKwon Jan 6, 2018
f9e7b0c
[HOTFIX] Fix style checking failure
gatorsmile Jan 6, 2018
285d342
[SPARK-22973][SQL] Fix incorrect results of Casting Map to String
maropu Jan 7, 2018
516c0a1
Merge pull request #1 from apache/master
fjh100456 Jan 8, 2018
bd1a80a
Merge remote-tracking branch 'upstream/branch-2.3'
fjh100456 Jan 8, 2018
51f4418
Merge branch 'master' of https://github.com/fjh100456/spark
fjh100456 Jan 9, 2018
cf73803
Merge pull request #3 from apache/master
fjh100456 Apr 20, 2018
6515fb1
Merge remote-tracking branch 'origin/master'
fjh100456 Apr 20, 2018
0c39ead
Merge pull request #4 from apache/master
fjh100456 Aug 29, 2018
61a1028
Merge remote-tracking branch 'origin/master'
fjh100456 Aug 29, 2018
a98d1a1
[SPARK-21786][SQL][FOLLOWUP] Add compressionCodec test for CTAS
fjh100456 Aug 31, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
[SPARK-20236][SQL] dynamic partition overwrite
## What changes were proposed in this pull request?

When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.

data source table: delete all partition directories that match the static partition values provided in the insert statement.

hive table: only delete partition directories which have data written into it

This PR adds a new config to make users be able to choose hive's behavior.

## How was this patch tested?

new tests

Author: Wenchen Fan <[email protected]>

Closes #18714 from cloud-fan/overwrite-partition.

(cherry picked from commit a66fe36)
Signed-off-by: gatorsmile <[email protected]>
  • Loading branch information
cloud-fan authored and gatorsmile committed Jan 3, 2018
commit b96248862589bae1ddcdb14ce4c802789a001306
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ import org.apache.spark.util.Utils
*
* 1. Implementations must be serializable, as the committer instance instantiated on the driver
* will be used for tasks on executors.
* 2. Implementations should have a constructor with 2 arguments:
* (jobId: String, path: String)
* 2. Implementations should have a constructor with 2 or 3 arguments:
* (jobId: String, path: String) or
* (jobId: String, path: String, dynamicPartitionOverwrite: Boolean)
* 3. A committer should not be reused across multiple Spark jobs.
*
* The proper call sequence is:
Expand Down Expand Up @@ -139,10 +140,22 @@ object FileCommitProtocol {
/**
* Instantiates a FileCommitProtocol using the given className.
*/
def instantiate(className: String, jobId: String, outputPath: String)
: FileCommitProtocol = {
def instantiate(
className: String,
jobId: String,
outputPath: String,
dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
// First try the constructor with arguments (jobId: String, outputPath: String,
// dynamicPartitionOverwrite: Boolean).
// If that doesn't exist, try the one with (jobId: string, outputPath: String).
try {
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
} catch {
case _: NoSuchMethodException =>
val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
ctor.newInstance(jobId, outputPath)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,19 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
*
* @param jobId the job's or stage's id
* @param path the job's output path, or null if committer acts as a noop
* @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
* dynamically, i.e., we first write files under a staging
* directory with partition path, e.g.
* /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
* we first clean up the corresponding partition directories at
* destination path, e.g. /path/to/destination/a=1/b=1, and move
* files from staging directory to the corresponding partition
* directories under destination path.
*/
class HadoopMapReduceCommitProtocol(jobId: String, path: String)
class HadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends FileCommitProtocol with Serializable with Logging {

import FileCommitProtocol._
Expand All @@ -67,9 +78,17 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
@transient private var addedAbsPathFiles: mutable.Map[String, String] = null

/**
* The staging directory for all files committed with absolute output paths.
* Tracks partitions with default path that have new files written into them by this task,
* e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
* destination directory at the end, if `dynamicPartitionOverwrite` is true.
*/
private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
@transient private var partitionPaths: mutable.Set[String] = null

/**
* The staging directory of this write job. Spark uses it to deal with files with absolute output
* path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
*/
private def stagingDir = new Path(path, ".spark-staging-" + jobId)

protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
val format = context.getOutputFormatClass.newInstance()
Expand All @@ -85,11 +104,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
val filename = getFilename(taskContext, ext)

val stagingDir: String = committer match {
val stagingDir: Path = committer match {
case _ if dynamicPartitionOverwrite =>
assert(dir.isDefined,
"The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
partitionPaths += dir.get
this.stagingDir
// For FileOutputCommitter it has its own staging path called "work path".
case f: FileOutputCommitter =>
Option(f.getWorkPath).map(_.toString).getOrElse(path)
case _ => path
new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
case _ => new Path(path)
}

dir.map { d =>
Expand All @@ -106,8 +130,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)

// Include a UUID here to prevent file collisions for one task writing to different dirs.
// In principle we could include hash(absoluteDir) instead but this is simpler.
val tmpOutputPath = new Path(
absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString
val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString

addedAbsPathFiles(tmpOutputPath) = absOutputPath
tmpOutputPath
Expand Down Expand Up @@ -141,37 +164,57 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)

override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
committer.commitJob(jobContext)
val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")

if (hasValidPath) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
val (allAbsPathFiles, allPartitionPaths) =
taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)

val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
logDebug(s"Committing files staged for absolute locations $filesToMove")
if (dynamicPartitionOverwrite) {
val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
absPartitionPaths.foreach(fs.delete(_, true))
}
for ((src, dst) <- filesToMove) {
fs.rename(new Path(src), new Path(dst))
}
fs.delete(absPathStagingDir, true)

if (dynamicPartitionOverwrite) {
val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
for (part <- partitionPaths) {
val finalPartPath = new Path(path, part)
fs.delete(finalPartPath, true)
fs.rename(new Path(stagingDir, part), finalPartPath)
}
}

fs.delete(stagingDir, true)
}
}

override def abortJob(jobContext: JobContext): Unit = {
committer.abortJob(jobContext, JobStatus.State.FAILED)
if (hasValidPath) {
val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(absPathStagingDir, true)
val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
fs.delete(stagingDir, true)
}
}

override def setupTask(taskContext: TaskAttemptContext): Unit = {
committer = setupCommitter(taskContext)
committer.setupTask(taskContext)
addedAbsPathFiles = mutable.Map[String, String]()
partitionPaths = mutable.Set[String]()
}

override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
val attemptId = taskContext.getTaskAttemptID
SparkHadoopMapRedUtil.commitTask(
committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
new TaskCommitMessage(addedAbsPathFiles.toMap)
new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
}

override def abortTask(taskContext: TaskAttemptContext): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1068,6 +1068,24 @@ object SQLConf {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(100)

object PartitionOverwriteMode extends Enumeration {
val STATIC, DYNAMIC = Value
}

val PARTITION_OVERWRITE_MODE =
buildConf("spark.sql.sources.partitionOverwriteMode")
.doc("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: " +
"static and dynamic. In static mode, Spark deletes all the partitions that match the " +
"partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before " +
"overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
"those partitions that have data written into it at runtime. By default we use static " +
"mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
"affect Hive serde tables, as they are always overwritten with dynamic mode.")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(PartitionOverwriteMode.values.map(_.toString))
.createWithDefault(PartitionOverwriteMode.STATIC.toString)

object Deprecated {
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
}
Expand Down Expand Up @@ -1394,6 +1412,9 @@ class SQLConf extends Serializable with Logging {

def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)

def partitionOverwriteMode: PartitionOverwriteMode.Value =
PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))

/** ********************** SQLConf functionality methods ************ */

/** Set Spark SQL configuration properties. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.util.SchemaUtils

/**
Expand Down Expand Up @@ -89,20 +90,29 @@ case class InsertIntoHadoopFsRelationCommand(
}

val pathExists = fs.exists(qualifiedOutputPath)
// If we are appending data to an existing dir.
val isAppend = pathExists && (mode == SaveMode.Append)

val enableDynamicOverwrite =
sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
// This config only makes sense when we are overwriting a partitioned dataset with dynamic
// partition columns.
val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
staticPartitions.size < partitionColumns.length

val committer = FileCommitProtocol.instantiate(
sparkSession.sessionState.conf.fileCommitProtocolClass,
jobId = java.util.UUID.randomUUID().toString,
outputPath = outputPath.toString)
outputPath = outputPath.toString,
dynamicPartitionOverwrite = dynamicPartitionOverwrite)

val doInsertion = (mode, pathExists) match {
case (SaveMode.ErrorIfExists, true) =>
throw new AnalysisException(s"path $qualifiedOutputPath already exists.")
case (SaveMode.Overwrite, true) =>
if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
false
} else if (dynamicPartitionOverwrite) {
// For dynamic partition overwrite, do not delete partition directories ahead.
true
} else {
deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
true
Expand All @@ -126,7 +136,9 @@ case class InsertIntoHadoopFsRelationCommand(
catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
ifNotExists = true).run(sparkSession)
}
if (mode == SaveMode.Overwrite) {
// For dynamic partition overwrite, we never remove partitions but only update existing
// ones.
if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
if (deletedPartitions.nonEmpty) {
AlterTableDropPartitionCommand(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ import org.apache.spark.sql.internal.SQLConf
* A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
* Hadoop output committer using an option specified in SQLConf.
*/
class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String)
extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
class SQLHadoopMapReduceCommitProtocol(
jobId: String,
path: String,
dynamicPartitionOverwrite: Boolean = false)
extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
with Serializable with Logging {

override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
var committer = super.setupCommitter(context)

val configuration = context.getConfiguration
val clazz =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File

import org.apache.spark.SparkException
import org.apache.spark.sql.{AnalysisException, Row}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -442,4 +444,80 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
assert(e.contains("Only Data Sources providing FileFormat are supported"))
}
}

test("SPARK-20236: dynamic partition overwrite without catalog table") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTempPath { path =>
Seq((1, 1, 1)).toDF("i", "part1", "part2")
.write.partitionBy("part1", "part2").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1, 1))

Seq((2, 1, 1)).toDF("i", "part1", "part2")
.write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1))

Seq((2, 2, 2)).toDF("i", "part1", "part2")
.write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
}
}
}

test("SPARK-20236: dynamic partition overwrite") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTable("t") {
sql(
"""
|create table t(i int, part1 int, part2 int) using parquet
|partitioned by (part1, part2)
""".stripMargin)

sql("insert into t partition(part1=1, part2=1) select 1")
checkAnswer(spark.table("t"), Row(1, 1, 1))

sql("insert overwrite table t partition(part1=1, part2=1) select 2")
checkAnswer(spark.table("t"), Row(2, 1, 1))

sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)

sql("insert overwrite table t partition(part1=1, part2=2) select 3")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
}
}
}

test("SPARK-20236: dynamic partition overwrite with customer partition path") {
withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
withTable("t") {
sql(
"""
|create table t(i int, part1 int, part2 int) using parquet
|partitioned by (part1, part2)
""".stripMargin)

val path1 = Utils.createTempDir()
sql(s"alter table t add partition(part1=1, part2=1) location '$path1'")
sql(s"insert into t partition(part1=1, part2=1) select 1")
checkAnswer(spark.table("t"), Row(1, 1, 1))

sql("insert overwrite table t partition(part1=1, part2=1) select 2")
checkAnswer(spark.table("t"), Row(2, 1, 1))

sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)

val path2 = Utils.createTempDir()
sql(s"alter table t add partition(part1=1, part2=2) location '$path2'")
sql("insert overwrite table t partition(part1=1, part2=2) select 3")
checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)

sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
}
}
}
}